diff --git a/src/ddebug/common/queue.d b/src/ddebug/common/queue.d index 1a5ebf0..2e2a643 100644 --- a/src/ddebug/common/queue.d +++ b/src/ddebug/common/queue.d @@ -10,6 +10,7 @@ class BlockingQueue(T) { private T[] _buffer; private int _readPos; private int _writePos; + private shared bool _closed; this() { _mutex = new Mutex(); @@ -19,6 +20,15 @@ class BlockingQueue(T) { } void close() { + if (_mutex && !_closed) { + synchronized(_mutex) { + _closed = true; + if (_condition !is null) + _condition.notifyAll(); + } + } else { + _closed = true; + } if (_condition) { destroy(_condition); _condition = null; @@ -53,14 +63,22 @@ class BlockingQueue(T) { } void put(T item) { + if (_closed) + return; synchronized(_mutex) { + if (_closed) + return; append(item); _condition.notifyAll(); } } void put(T[] items) { + if (_closed) + return; synchronized(_mutex) { + if (_closed) + return; foreach(ref item; items) { append(item); } @@ -70,7 +88,11 @@ class BlockingQueue(T) { } bool get(ref T value, int timeoutMillis) { + if (_closed) + return false; synchronized(_mutex) { + if (_closed) + return false; if (_readPos < _writePos) { value = _buffer[_readPos++]; return true; @@ -88,7 +110,11 @@ class BlockingQueue(T) { } bool getAll(ref T[] values, int timeoutMillis) { + if (_closed) + return false; synchronized(_mutex) { + if (_closed) + return false; values.length = 0; while (_readPos < _writePos) values ~= _buffer[_readPos++];