blocking queue

This commit is contained in:
Vadim Lopatin 2015-11-16 09:31:15 +03:00
parent 87f12a9435
commit 46256ca650
1 changed files with 82 additions and 0 deletions

View File

@ -7,20 +7,102 @@ class BlockingQueue(T) {
private Mutex _mutex;
private Condition _condition;
private T[] _buffer;
private int _readPos;
private int _writePos;
this() {
_mutex = new Mutex();
_condition = new Condition(_mutex);
_readPos = 0;
_writePos = 0;
}
void close() {
if (_condition) {
destroy(_condition);
_condition = null;
}
if (_mutex) {
destroy(_mutex);
_mutex = null;
}
}
~this() {
// TODO: destroy mutex?
close();
}
private void move() {
if (_readPos > 1024 && _readPos > _buffer.length * 3 / 4) {
// move buffer data
for (int i = 0; _readPos + i < _writePos; i++)
_buffer[i] = _buffer[_readPos + i];
_writePos -= _readPos;
_readPos = 0;
}
}
private void append(ref T item) {
if (_writePos >= _buffer.length) {
move();
_buffer.length = _buffer.length == 0 ? 64 : _buffer.length * 2;
}
_buffer[_writePos++] = item;
}
void put(T item) {
synchronized(_mutex) {
append(item);
_condition.notifyAll();
}
}
void put(T[] items) {
synchronized(_mutex) {
foreach(ref item; items) {
append(item);
}
append(item);
_condition.notifyAll();
}
}
bool get(ref T value, int timeoutMillis) {
synchronized(_mutex) {
if (_readPos < _writePos) {
value = _buffer[_readPos++];
return true;
}
if (timeoutMillis <= 0)
_condition.wait(); // no timeout
else if (!_condition.wait(dur!msecs(timeoutMillis)))
return false; // timeout
if (_readPos < _writePos) {
value = _buffer[_readPos++];
return true;
}
}
return false;
}
bool getAll(ref T[] values, int timeoutMillis) {
synchronized(_mutex) {
values.length = 0;
while (_readPos < _writePos)
values ~= _buffer[_readPos++];
if (values.length > 0)
return true;
if (timeoutMillis <= 0)
_condition.wait(); // no timeout
else if (!_condition.wait(dur!msecs(timeoutMillis)))
return false; // timeout
while (_readPos < _writePos)
values ~= _buffer[_readPos++];
if (values.length > 0)
return true;
}
return false;
}
}