ketmar postTimeout API patch

This commit is contained in:
Adam D. Ruppe 2017-02-13 23:30:19 -05:00
parent bd1bed0495
commit a1213557d1
1 changed files with 127 additions and 43 deletions

View File

@ -1363,8 +1363,6 @@ class SimpleWindow : CapableOfHandlingNativeEvent {
/// code duplication here (ironically). /// code duplication here (ironically).
mixin NativeSimpleWindowImplementation!() impl; mixin NativeSimpleWindowImplementation!() impl;
//private import std.traits : isCallable;
/** /**
This is in-process one-way (from anything to window) event sending mechanics. This is in-process one-way (from anything to window) event sending mechanics.
It is thread-safe, so it can be used in multi-threaded applications to send, It is thread-safe, so it can be used in multi-threaded applications to send,
@ -1382,8 +1380,9 @@ public:
*/ */
@property bool eventQueueEmpty() () { @property bool eventQueueEmpty() () {
synchronized(this) { synchronized(this) {
return (eventQueueUsed != 0); foreach (const ref o; eventQueue[0..eventQueueUsed]) if (!o.doProcess) return true;
} }
return false;
} }
/** Does our custom event queue contains at least one with the given type? /** Does our custom event queue contains at least one with the given type?
@ -1393,7 +1392,11 @@ public:
*/ */
@property bool eventQueued(ET:Object) () { @property bool eventQueued(ET:Object) () {
synchronized(this) { synchronized(this) {
foreach (Object o; eventQueue[0..eventQueueUsed]) if (cast(ET)o) return true; foreach (const ref o; eventQueue[0..eventQueueUsed]) {
if (!o.doProcess) {
if (cast(ET)(o.evt)) return true;
}
}
} }
return false; return false;
} }
@ -1436,68 +1439,149 @@ public:
} }
/// Post event to queue. It is safe to call this from non-UI threads. /// Post event to queue. It is safe to call this from non-UI threads.
bool postEvent(ET:Object) (ET evt) { /// If `timeoutmsecs` is greater than zero, the event will be delayed for at least `timeoutmsecs` milliseconds.
bool postTimeout(ET:Object) (ET evt, uint timeoutmsecs) {
if (evt is null) return false; // ignore empty events, they can't be handled anyway if (evt is null) return false; // ignore empty events, they can't be handled anyway
// add events even if no event FD/event object created yet // add events even if no event FD/event object created yet
synchronized(this) { synchronized(this) {
if (eventQueueUsed == uint.max) return false; // just in case if (eventQueueUsed == uint.max) return false; // just in case
if (eventQueueUsed < eventQueue.length) { if (eventQueueUsed < eventQueue.length) {
eventQueue[eventQueueUsed++] = evt; eventQueue[eventQueueUsed++] = QueuedEvent(evt, timeoutmsecs);
} else { } else {
eventQueue ~= evt; eventQueue ~= QueuedEvent(evt, timeoutmsecs);
++eventQueueUsed; ++eventQueueUsed;
assert(eventQueueUsed == eventQueue.length);
} }
version(X11) { if (!eventWakeUp()) {
// wake up eventfd // can't wake up event processor, so there is no reason to keep the event
{ eventQueue[--eventQueueUsed].evt = null;
import core.sys.posix.unistd : write;
ulong n = 1;
write(customEventFD, &n, n.sizeof);
}
return true;
} else version(Windows) {
if (customEventH !is null) SetEvent(customEventH);
return true;
} else {
// not implemented for other OSes
--eventQueueUsed;
return false; return false;
} }
return true;
} }
} }
/// Post event to queue. It is safe to call this from non-UI threads.
bool postEvent(ET:Object) (ET evt) {
return postTimeout!ET(evt, 0);
}
private: private:
private import core.time : MonoTime;
version(X11) { version(X11) {
int customEventFD = -1; int customEventFD = -1;
} else version(Windows) { } else version(Windows) {
HANDLE customEventH = null; HANDLE customEventH = null;
} }
// wake up event processor
bool eventWakeUp () {
version(X11) {
import core.sys.posix.unistd : write;
ulong n = 1;
if (customEventFD >= 0) write(customEventFD, &n, n.sizeof);
return true;
} else version(Windows) {
if (customEventH !is null) SetEvent(customEventH);
return true;
} else {
// not implemented for other OSes
return false;
}
}
static struct QueuedEvent {
Object evt;
bool timed = false;
MonoTime hittime = MonoTime.zero;
bool doProcess = false; // process event at the current iteration (internal flag)
this (Object aevt, uint toutmsecs) {
evt = aevt;
if (toutmsecs > 0) {
import core.time : msecs;
timed = true;
hittime = MonoTime.currTime+toutmsecs.msecs;
}
}
}
alias CustomEventHandler = bool delegate (Object o) nothrow; alias CustomEventHandler = bool delegate (Object o) nothrow;
uint lastUsentHandlerId; uint lastUsentHandlerId;
CustomEventHandler[uint] eventHandlers; CustomEventHandler[uint] eventHandlers;
Object[] eventQueue; QueuedEvent[] eventQueue;
uint eventQueueUsed; // to avoid `.assumeSafeAppend` and length changes uint eventQueueUsed; // to avoid `.assumeSafeAppend` and length changes
// call all custom event handlers (if any) // process queued events and call custom event handlers
// this will not process events posted from called handlers (such events are postponed for the next iteration)
void processCustomEvents () { void processCustomEvents () {
// don't lock and re-lock on each iteration, or other threads may spam event queue // don't lock and re-lock on each iteration, or other threads may spam event queue
synchronized(this) { synchronized(this) {
// user may want to post new events from an event handler; process 'em on next iteration uint ecount = eventQueueUsed; // user may want to post new events from an event handler; process 'em on next iteration
for (uint ecount = eventQueueUsed; ecount > 0; --ecount) { auto ctt = MonoTime.currTime;
// mark events to process (this is required for `eventQueued()`)
foreach (ref qe; eventQueue[0..ecount]) {
if (qe.timed) {
qe.doProcess = (qe.hittime <= ctt);
} else {
qe.doProcess = true;
}
}
// process marked events
uint efree = 0; // non-processed events will be put at this index
foreach (immutable eidx; 0..ecount) {
import core.stdc.string : memmove; import core.stdc.string : memmove;
if (eventQueueUsed == 0) break; if (!eventQueue[eidx].doProcess) {
Object evt = eventQueue[0]; // skip this event
// do memmove on each step, it is cheap assert(efree <= eidx);
if (--eventQueueUsed > 0) memmove(eventQueue.ptr, eventQueue.ptr+1, eventQueueUsed*eventQueue[0].sizeof); if (efree != eidx) {
eventQueue[eventQueueUsed] = null; // so GC will eventually collect event object // copy this event to queue start
assert(evt !is null); // just in case eventQueue[efree] = eventQueue[eidx];
eventQueue[eidx].evt = null; // just in case
}
++efree;
continue;
}
auto evt = eventQueue[eidx].evt;
eventQueue[eidx].evt = null; // in case event handler will hit GC
if (evt is null) continue; // just in case
// try all handlers; this can be slow, but meh... // try all handlers; this can be slow, but meh...
foreach (ref evhan; eventHandlers.byValue) { foreach (ref evhan; eventHandlers.byValue) {
assert(evhan !is null); if (evhan !is null) evhan(evt);
evhan(evt);
} }
} }
// move all unprocessed events to queue top; efree holds first "free index"
foreach (immutable eidx; ecount..eventQueueUsed) {
assert(efree <= eidx);
if (efree != eidx) eventQueue[efree] = eventQueue[eidx];
++efree;
}
eventQueueUsed = efree;
// wake up event processor on next event loop iteration if we have more queued events
foreach (const ref qe; eventQueue[0..eventQueueUsed]) {
if (!qe.timed) { eventWakeUp(); break; }
}
}
}
// 0: infinite (i.e. no scheduled events in queue)
uint eventQueueTimeoutMSecs () {
synchronized(this) {
if (eventQueueUsed == 0) return 0;
uint res = int.max;
auto ctt = MonoTime.currTime;
foreach (const ref qe; eventQueue[0..eventQueueUsed]) {
if (qe.evt is null) assert(0, "WUTAFUUUUUUU..."); // the thing that should not be. ABSOLUTELY! (c)
if (qe.doProcess) continue; // just in case
if (!qe.timed) return 1; // minimal
if (qe.hittime <= ctt) return 1; // minimal
auto tms = (qe.hittime-ctt).total!"msecs";
if (tms < 1) tms = 1; // safety net
if (tms >= int.max) tms = int.max-1; // and another safety net
if (res > tms) res = cast(uint)tms;
}
return (res >= int.max ? 0 : res);
} }
} }
} }
@ -4538,19 +4622,17 @@ version(Windows) {
processCustomEvents(); // process events added before event object creation processCustomEvents(); // process events added before event object creation
while(ret != 0) { while(ret != 0) {
auto wto = eventQueueTimeoutMSecs();
auto waitResult = MsgWaitForMultipleObjectsEx( auto waitResult = MsgWaitForMultipleObjectsEx(
cast(int) handles.length, handles.ptr, cast(int) handles.length, handles.ptr,
INFINITE, /* timeout */ (wto == 0 ? INFINITE : wto), /* timeout */
0x04FF, /* QS_ALLINPUT */ 0x04FF, /* QS_ALLINPUT */
0x0002 /* MWMO_ALERTABLE */ | 0x0004 /* MWMO_INPUTAVAILABLE */); 0x0002 /* MWMO_ALERTABLE */ | 0x0004 /* MWMO_INPUTAVAILABLE */);
processCustomEvents(); // anyway
enum WAIT_OBJECT_0 = 0; enum WAIT_OBJECT_0 = 0;
if(waitResult >= WAIT_OBJECT_0 && waitResult < handles.length + WAIT_OBJECT_0) { if(waitResult >= WAIT_OBJECT_0 && waitResult < handles.length + WAIT_OBJECT_0) {
// process handles[waitResult - WAIT_OBJECT_0]; // process handles[waitResult - WAIT_OBJECT_0];
if (handles[waitResult-WAIT_OBJECT_0] is customEventH) {
//{ import core.stdc.stdio; printf("WIN: EVENT SIGNAL!\n"); }
processCustomEvents();
}
} else if(waitResult == handles.length + WAIT_OBJECT_0) { } else if(waitResult == handles.length + WAIT_OBJECT_0) {
// message ready // message ready
if((ret = GetMessage(&message, null, 0, 0)) != 0) { if((ret = GetMessage(&message, null, 0, 0)) != 0) {
@ -5801,7 +5883,8 @@ version(X11) {
processCustomEvents(); // process events added before event FD creation processCustomEvents(); // process events added before event FD creation
while(!done) { while(!done) {
auto nfds = ep.epoll_wait(epollFd, events.ptr, events.length, -1); auto wto = eventQueueTimeoutMSecs();
auto nfds = ep.epoll_wait(epollFd, events.ptr, events.length, (wto == 0 || wto >= int.max ? -1 : cast(int)wto));
if(nfds == -1) { if(nfds == -1) {
if(err.errno == err.EINTR) { if(err.errno == err.EINTR) {
continue; // interrupted by signal, just try again continue; // interrupted by signal, just try again
@ -5809,6 +5892,7 @@ version(X11) {
throw new Exception("epoll wait failure"); throw new Exception("epoll wait failure");
} }
processCustomEvents(); // anyway
//version(sdddd) { import std.stdio; writeln("nfds=", nfds, "; [0]=", events[0].data.fd); } //version(sdddd) { import std.stdio; writeln("nfds=", nfds, "; [0]=", events[0].data.fd); }
foreach(idx; 0 .. nfds) { foreach(idx; 0 .. nfds) {
if(done) break; if(done) break;
@ -5842,7 +5926,7 @@ version(X11) {
ulong n; ulong n;
read(customEventFD, &n, n.sizeof); // reset counter value to zero again read(customEventFD, &n, n.sizeof); // reset counter value to zero again
//{ import core.stdc.stdio; printf("custom event! count=%u\n", eventQueueUsed); } //{ import core.stdc.stdio; printf("custom event! count=%u\n", eventQueueUsed); }
processCustomEvents(); //processCustomEvents();
} else { } else {
// some other timer // some other timer
version(sdddd) { import std.stdio; writeln("unknown fd: ", fd); } version(sdddd) { import std.stdio; writeln("unknown fd: ", fd); }