blocking queue

This commit is contained in:
Vadim Lopatin 2015-11-16 10:27:14 +03:00
parent 46256ca650
commit 0ebd6c2394
1 changed files with 26 additions and 0 deletions

View File

@ -10,6 +10,7 @@ class BlockingQueue(T) {
private T[] _buffer;
private int _readPos;
private int _writePos;
private shared bool _closed;
this() {
_mutex = new Mutex();
@ -19,6 +20,15 @@ class BlockingQueue(T) {
}
void close() {
if (_mutex && !_closed) {
synchronized(_mutex) {
_closed = true;
if (_condition !is null)
_condition.notifyAll();
}
} else {
_closed = true;
}
if (_condition) {
destroy(_condition);
_condition = null;
@ -53,14 +63,22 @@ class BlockingQueue(T) {
}
void put(T item) {
if (_closed)
return;
synchronized(_mutex) {
if (_closed)
return;
append(item);
_condition.notifyAll();
}
}
void put(T[] items) {
if (_closed)
return;
synchronized(_mutex) {
if (_closed)
return;
foreach(ref item; items) {
append(item);
}
@ -70,7 +88,11 @@ class BlockingQueue(T) {
}
bool get(ref T value, int timeoutMillis) {
if (_closed)
return false;
synchronized(_mutex) {
if (_closed)
return false;
if (_readPos < _writePos) {
value = _buffer[_readPos++];
return true;
@ -88,7 +110,11 @@ class BlockingQueue(T) {
}
bool getAll(ref T[] values, int timeoutMillis) {
if (_closed)
return false;
synchronized(_mutex) {
if (_closed)
return false;
values.length = 0;
while (_readPos < _writePos)
values ~= _buffer[_readPos++];