diff --git a/http2.d b/http2.d index d6b7123..1b26096 100644 --- a/http2.d +++ b/http2.d @@ -4094,6 +4094,7 @@ class HttpApiClient() { string urlBase; string oauth2Token; string submittedContentType; + string authType = "Bearer"; /++ Params: @@ -4161,7 +4162,7 @@ class HttpApiClient() { auto req = httpClient.navigateTo(u, requestMethod); if(oauth2Token.length) - req.requestParameters.headers ~= "Authorization: Bearer " ~ oauth2Token; + req.requestParameters.headers ~= "Authorization: "~ authType ~" " ~ oauth2Token; req.requestParameters.contentType = submittedContentType; req.requestParameters.bodyData = bodyBytes; @@ -4911,6 +4912,8 @@ class WebSocket { while(d.length) { auto r = socket.send(d); if(r < 0 && wouldHaveBlocked()) { + // FIXME: i should register for a write wakeup + version(use_arsd_core) assert(0); import core.thread; Thread.sleep(1.msecs); continue; @@ -5365,6 +5368,59 @@ class WebSocket { /* } end copy/paste */ + // returns true if still active + private static bool readyToRead(WebSocket sock) { + sock.timeoutFromInactivity = MonoTime.currTime + sock.config.timeoutFromInactivity; + if(!sock.lowLevelReceive()) { + sock.readyState_ = CLOSED; + + if(sock.onerror) + sock.onerror(); + + if(sock.onclose) + sock.onclose(CloseEvent(CloseEvent.StandardCloseCodes.abnormalClosure, "Connection lost", false, lastSocketError())); + + unregisterActiveSocket(sock); + return false; + } + while(sock.processOnce().populated) {} + return true; + } + + // returns true if still active, false if not + private static bool timeoutAndPingCheck(WebSocket sock, MonoTime now, Duration* minimumTimeoutForSelect) { + auto diff = sock.timeoutFromInactivity - now; + if(diff <= 0.msecs) { + // it timed out + if(sock.onerror) + sock.onerror(); + + if(sock.onclose) + sock.onclose(CloseEvent(CloseEvent.StandardCloseCodes.abnormalClosure, "Connection timed out", false, null)); + + sock.socket.close(); + sock.readyState_ = CLOSED; + unregisterActiveSocket(sock); + return false; + } + + if(minimumTimeoutForSelect && diff < *minimumTimeoutForSelect) + *minimumTimeoutForSelect = diff; + + diff = sock.nextPing - now; + + if(diff <= 0.msecs) { + //sock.send(`{"action": "ping"}`); + sock.ping(); + sock.nextPing = now + sock.config.pingFrequency.msecs; + } else { + if(minimumTimeoutForSelect && diff < *minimumTimeoutForSelect) + *minimumTimeoutForSelect = diff; + } + + return true; + } + /* const int bufferedAmount // amount pending const string extensions @@ -5394,87 +5450,56 @@ class WebSocket { loopExited = false; // reset it so we can reenter } - static SocketSet readSet; + version(use_arsd_core) { + loopExited = false; - if(readSet is null) - readSet = new SocketSet(); + import arsd.core; + getThisThreadEventLoop().run(() => WebSocket.activeSockets.length == 0 || loopExited || (localLoopExited !is null && *localLoopExited == true)); + } else { + static SocketSet readSet; - loopExited = false; + if(readSet is null) + readSet = new SocketSet(); - outermost: while(!loopExited && (localLoopExited is null || (*localLoopExited == false))) { - readSet.reset(); + loopExited = false; - Duration timeout = 3.seconds; + outermost: while(!loopExited && (localLoopExited is null || (*localLoopExited == false))) { + readSet.reset(); - auto now = MonoTime.currTime; - bool hadAny; - foreach(sock; activeSockets) { - auto diff = sock.timeoutFromInactivity - now; - if(diff <= 0.msecs) { - // timeout - if(sock.onerror) - sock.onerror(); + Duration timeout = 3.seconds; - if(sock.onclose) - sock.onclose(CloseEvent(CloseEvent.StandardCloseCodes.abnormalClosure, "Connection timed out", false, null)); - - sock.socket.close(); - sock.readyState_ = CLOSED; - unregisterActiveSocket(sock); - continue outermost; - } - - if(diff < timeout) - timeout = diff; - - diff = sock.nextPing - now; - - if(diff <= 0.msecs) { - //sock.send(`{"action": "ping"}`); - sock.ping(); - sock.nextPing = now + sock.config.pingFrequency.msecs; - } else { - if(diff < timeout) - timeout = diff; - } - - readSet.add(sock.socket); - hadAny = true; - } - - if(!hadAny) { - // import std.stdio; writeln("had none"); - return; - } - - tryAgain: - // import std.stdio; writeln(timeout); - auto selectGot = Socket.select(readSet, null, null, timeout); - if(selectGot == 0) { /* timeout */ - // timeout - continue; // it will be handled at the top of the loop - } else if(selectGot == -1) { /* interrupted */ - goto tryAgain; - } else { + auto now = MonoTime.currTime; + bool hadAny; foreach(sock; activeSockets) { - if(readSet.isSet(sock.socket)) { - sock.timeoutFromInactivity = MonoTime.currTime + sock.config.timeoutFromInactivity; - if(!sock.lowLevelReceive()) { - sock.readyState_ = CLOSED; + if(!timeoutAndPingCheck(sock, now, &timeout)) + continue outermost; - if(sock.onerror) - sock.onerror(); + readSet.add(sock.socket); + hadAny = true; + } - if(sock.onclose) - sock.onclose(CloseEvent(CloseEvent.StandardCloseCodes.abnormalClosure, "Connection lost", false, lastSocketError())); + if(!hadAny) { + // import std.stdio; writeln("had none"); + return; + } - unregisterActiveSocket(sock); - continue outermost; + tryAgain: + // import std.stdio; writeln(timeout); + auto selectGot = Socket.select(readSet, null, null, timeout); + if(selectGot == 0) { /* timeout */ + // timeout + continue; // it will be handled at the top of the loop + } else if(selectGot == -1) { /* interrupted */ + goto tryAgain; + } else { + foreach(sock; activeSockets) { + if(readSet.isSet(sock.socket)) { + if(!readyToRead(sock)) + continue outermost; + selectGot--; + if(selectGot <= 0) + break; } - while(sock.processOnce().populated) {} - selectGot--; - if(selectGot <= 0) - break; } } } @@ -5501,20 +5526,37 @@ class WebSocket { void registerActiveSocket(WebSocket s) { // ensure it isn't already there... assert(s !is null); - foreach(i, a; activeSockets) - if(a is s) - return; + if(s.registered) + return; + s.activeSocketArrayIndex = activeSockets.length; activeSockets ~= s; + s.registered = true; + version(use_arsd_core) { + s.unregisterToken = arsd.core.getThisThreadEventLoop().addCallbackOnFdReadable(s.socket.handle, new arsd.core.CallbackHelper(() { s.readyToRead(s); })); + } } void unregisterActiveSocket(WebSocket s) { - foreach(i, a; activeSockets) - if(s is a) { - activeSockets[i] = activeSockets[$-1]; - activeSockets = activeSockets[0 .. $-1]; - break; - } + version(use_arsd_core) { + s.unregisterToken.unregister(); + } + + auto i = s.activeSocketArrayIndex; + assert(activeSockets[i] is s); + + activeSockets[i] = activeSockets[$-1]; + activeSockets[i].activeSocketArrayIndex = i; + activeSockets = activeSockets[0 .. $-1]; + activeSockets.assumeSafeAppend(); + s.registered = false; } } + + private bool registered; + private size_t activeSocketArrayIndex; + version(use_arsd_core) { + static import arsd.core; + arsd.core.ICoreEventLoop.UnregisterToken unregisterToken; + } } private template imported(string mod) { @@ -5527,6 +5569,8 @@ private template imported(string mod) { template addToSimpledisplayEventLoop() { import arsd.simpledisplay; void addToSimpledisplayEventLoop(WebSocket ws, imported!"arsd.simpledisplay".SimpleWindow window) { + version(use_arsd_core) + return; // already done implicitly version(Windows) auto event = WSACreateEvent();