multithreading fixes in event posting API

This commit is contained in:
Ketmar Dark 2018-03-17 14:21:08 +02:00 committed by Adam D. Ruppe
parent 5c5f2d932d
commit 819c3fdf90
1 changed files with 97 additions and 27 deletions

View File

@ -2098,13 +2098,20 @@ public:
* ---------------------
*
* Returns: 0 on failure (should never happen, so ignore it)
*
* $(WARNING Don't use this method in object destructors!)
*
* $(WARNING It is better to register all event handlers and don't remove 'em,
* 'cause if event handler id counter will overflow, you won't be able
* to register any more events.)
*/
uint addEventListener(ET:Object) (void delegate (ET) dg) {
if (dg is null) return 0; // ignore empty handlers
synchronized(this) {
//FIXME: abort on overflow?
if (++lastUsentHandlerId == 0) { --lastUsentHandlerId; return 0; } // alas, can't register more events. at all.
eventHandlers[lastUsentHandlerId] = delegate (Object o) {
if (++lastUsedHandlerId == 0) { --lastUsedHandlerId; return 0; } // alas, can't register more events. at all.
EventHandlerEntry e;
e.dg = delegate (Object o) {
if (auto co = cast(ET)o) {
try {
dg(co);
@ -2115,39 +2122,89 @@ public:
}
return false;
};
return lastUsentHandlerId;
e.id = lastUsedHandlerId;
auto optr = eventHandlers.ptr;
eventHandlers ~= e;
if (eventHandlers.ptr !is optr) {
import core.memory : GC;
if (eventHandlers.ptr is GC.addrOf(eventHandlers.ptr)) GC.setAttr(eventHandlers.ptr, GC.BlkAttr.NO_INTERIOR);
}
return lastUsedHandlerId;
}
}
/// Remove event listener. It is safe to pass invalid event id here.
/// $(WARNING Don't use this method in object destructors!)
void removeEventListener() (uint id) {
if (id == 0 || id > lastUsedHandlerId) return;
synchronized(this) {
if (id) eventHandlers.remove(id);
foreach (immutable idx; 0..eventHandlers.length) {
if (eventHandlers[idx].id == id) {
foreach (immutable c; idx+1..eventHandlers.length) eventHandlers[c-1] = eventHandlers[c];
eventHandlers[$-1].dg = null;
eventHandlers.length -= 1;
eventHandlers.assumeSafeAppend;
return;
}
}
}
}
/// Post event to queue. It is safe to call this from non-UI threads.
/// If `timeoutmsecs` is greater than zero, the event will be delayed for at least `timeoutmsecs` milliseconds.
bool postTimeout(ET:Object) (ET evt, uint timeoutmsecs) {
if (evt is null) return false; // ignore empty events, they can't be handled anyway
/// if `replace` is `true`, replace all existing events typed `ET` with the new one (if `evt` is empty, remove 'em all)
/// Returns `true` if event was queued. Always returns `false` if `evt` is null.
bool postTimeout(ET:Object) (ET evt, uint timeoutmsecs, bool replace=false) {
if (this.closed) return false; // closed windows can't handle events
// remove all events of type `ET`
void removeAllET () {
uint eidx = 0;
while (eidx < eventQueueUsed) {
if (cast(ET)eventQueue[eidx].evt !is null) {
// i found her!
foreach (immutable c; eidx+1..eventQueueUsed) eventQueue[c-1] = eventQueue[c];
--eventQueueUsed;
// clear last event (it is already copied)
eventQueue[eventQueueUsed].evt = null;
} else {
++eidx;
}
}
}
if (evt is null) {
if (replace) { synchronized(this) removeAllET(); }
// ignore empty events, they can't be handled anyway
return false;
}
// add events even if no event FD/event object created yet
synchronized(this) {
if (replace) removeAllET();
if (eventQueueUsed == uint.max) return false; // just in case
if (eventQueueUsed < eventQueue.length) {
eventQueue[eventQueueUsed++] = QueuedEvent(evt, timeoutmsecs);
} else {
auto optr = eventQueue.ptr;
eventQueue ~= QueuedEvent(evt, timeoutmsecs);
++eventQueueUsed;
assert(eventQueueUsed == eventQueue.length);
if (eventQueue.ptr !is optr) {
if (eventQueue.capacity == eventQueue.length) {
// need to reallocate; do a trick to ensure that old array is cleared
auto oarr = eventQueue;
eventQueue ~= QueuedEvent(evt, timeoutmsecs);
// just in case, do yet another check
if (oarr.length != 0 && oarr.ptr !is eventQueue.ptr) foreach (ref e; oarr[0..eventQueueUsed]) e.evt = null;
import core.memory : GC;
if (eventQueue.ptr is GC.addrOf(eventQueue.ptr)) GC.setAttr(eventQueue.ptr, GC.BlkAttr.NO_INTERIOR);
} else {
auto optr = eventQueue.ptr;
eventQueue ~= QueuedEvent(evt, timeoutmsecs);
assert(eventQueue.ptr is optr);
}
++eventQueueUsed;
assert(eventQueueUsed == eventQueue.length);
}
if (!eventWakeUp()) {
// can't wake up event processor, so there is no reason to keep the event
assert(eventQueueUsed > 0);
eventQueue[--eventQueueUsed].evt = null;
return false;
}
@ -2156,8 +2213,10 @@ public:
}
/// Post event to queue. It is safe to call this from non-UI threads.
bool postEvent(ET:Object) (ET evt) {
return postTimeout!ET(evt, 0);
/// if `replace` is `true`, replace all existing events typed `ET` with the new one (if `evt` is empty, remove 'em all)
/// Returns `true` if event was queued. Always returns `false` if `evt` is null.
bool postEvent(ET:Object) (ET evt, bool replace=false) {
return postTimeout!ET(evt, 0, replace);
}
private:
@ -2202,17 +2261,22 @@ private:
}
alias CustomEventHandler = bool delegate (Object o) nothrow;
uint lastUsentHandlerId;
CustomEventHandler[uint] eventHandlers;
QueuedEvent[] eventQueue;
uint eventQueueUsed; // to avoid `.assumeSafeAppend` and length changes
static struct EventHandlerEntry {
CustomEventHandler dg;
uint id;
}
uint lastUsedHandlerId;
EventHandlerEntry[] eventHandlers;
QueuedEvent[] eventQueue = null;
uint eventQueueUsed = 0; // to avoid `.assumeSafeAppend` and length changes
// process queued events and call custom event handlers
// this will not process events posted from called handlers (such events are postponed for the next iteration)
void processCustomEvents () {
// don't lock and re-lock on each iteration, or other threads may spam event queue
uint ecount;
synchronized(this) {
uint ecount = eventQueueUsed; // user may want to post new events from an event handler; process 'em on next iteration
ecount = eventQueueUsed; // user may want to post new events from an event handler; process 'em on next iteration
auto ctt = MonoTime.currTime;
// mark events to process (this is required for `eventQueued()`)
foreach (ref qe; eventQueue[0..ecount]) {
@ -2222,10 +2286,13 @@ private:
qe.doProcess = true;
}
}
// process marked events
uint efree = 0; // non-processed events will be put at this index
foreach (immutable eidx; 0..ecount) {
import core.stdc.string : memmove;
}
// process marked events
uint efree = 0; // non-processed events will be put at this index
EventHandlerEntry[] eh;
Object evt;
foreach (immutable eidx; 0..ecount) {
synchronized(this) {
if (!eventQueue[eidx].doProcess) {
// skip this event
assert(efree <= eidx);
@ -2237,14 +2304,17 @@ private:
++efree;
continue;
}
auto evt = eventQueue[eidx].evt;
evt = eventQueue[eidx].evt;
eventQueue[eidx].evt = null; // in case event handler will hit GC
if (evt is null) continue; // just in case
// try all handlers; this can be slow, but meh...
foreach (ref evhan; eventHandlers.byValue) {
if (evhan !is null) evhan(evt);
}
eh = eventHandlers;
}
foreach (ref evhan; eh) if (evhan.dg !is null) evhan.dg(evt);
evt = null;
eh = null;
}
synchronized(this) {
// move all unprocessed events to queue top; efree holds first "free index"
foreach (immutable eidx; ecount..eventQueueUsed) {
assert(efree <= eidx);