From 46256ca65068dd5a61564c9ebe8c28b859fde916 Mon Sep 17 00:00:00 2001 From: Vadim Lopatin Date: Mon, 16 Nov 2015 09:31:15 +0300 Subject: [PATCH] blocking queue --- src/ddebug/common/queue.d | 82 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) diff --git a/src/ddebug/common/queue.d b/src/ddebug/common/queue.d index bac99ec..1a5ebf0 100644 --- a/src/ddebug/common/queue.d +++ b/src/ddebug/common/queue.d @@ -7,20 +7,102 @@ class BlockingQueue(T) { private Mutex _mutex; private Condition _condition; + private T[] _buffer; + private int _readPos; + private int _writePos; this() { _mutex = new Mutex(); _condition = new Condition(_mutex); + _readPos = 0; + _writePos = 0; + } + + void close() { + if (_condition) { + destroy(_condition); + _condition = null; + } + if (_mutex) { + destroy(_mutex); + _mutex = null; + } } ~this() { // TODO: destroy mutex? + close(); + } + + private void move() { + if (_readPos > 1024 && _readPos > _buffer.length * 3 / 4) { + // move buffer data + for (int i = 0; _readPos + i < _writePos; i++) + _buffer[i] = _buffer[_readPos + i]; + _writePos -= _readPos; + _readPos = 0; + } + } + + private void append(ref T item) { + if (_writePos >= _buffer.length) { + move(); + _buffer.length = _buffer.length == 0 ? 64 : _buffer.length * 2; + } + _buffer[_writePos++] = item; } void put(T item) { + synchronized(_mutex) { + append(item); + _condition.notifyAll(); + } } + void put(T[] items) { + synchronized(_mutex) { + foreach(ref item; items) { + append(item); + } + append(item); + _condition.notifyAll(); + } + } + bool get(ref T value, int timeoutMillis) { + synchronized(_mutex) { + if (_readPos < _writePos) { + value = _buffer[_readPos++]; + return true; + } + if (timeoutMillis <= 0) + _condition.wait(); // no timeout + else if (!_condition.wait(dur!msecs(timeoutMillis))) + return false; // timeout + if (_readPos < _writePos) { + value = _buffer[_readPos++]; + return true; + } + } + return false; + } + + bool getAll(ref T[] values, int timeoutMillis) { + synchronized(_mutex) { + values.length = 0; + while (_readPos < _writePos) + values ~= _buffer[_readPos++]; + if (values.length > 0) + return true; + if (timeoutMillis <= 0) + _condition.wait(); // no timeout + else if (!_condition.wait(dur!msecs(timeoutMillis))) + return false; // timeout + while (_readPos < _writePos) + values ~= _buffer[_readPos++]; + if (values.length > 0) + return true; + } return false; } }