http websocket integrated event loop compat on linux

This commit is contained in:
Adam D. Ruppe 2024-05-09 10:28:13 -04:00
parent f406065507
commit bab31bab6c
1 changed files with 124 additions and 80 deletions

204
http2.d
View File

@ -4094,6 +4094,7 @@ class HttpApiClient() {
string urlBase; string urlBase;
string oauth2Token; string oauth2Token;
string submittedContentType; string submittedContentType;
string authType = "Bearer";
/++ /++
Params: Params:
@ -4161,7 +4162,7 @@ class HttpApiClient() {
auto req = httpClient.navigateTo(u, requestMethod); auto req = httpClient.navigateTo(u, requestMethod);
if(oauth2Token.length) if(oauth2Token.length)
req.requestParameters.headers ~= "Authorization: Bearer " ~ oauth2Token; req.requestParameters.headers ~= "Authorization: "~ authType ~" " ~ oauth2Token;
req.requestParameters.contentType = submittedContentType; req.requestParameters.contentType = submittedContentType;
req.requestParameters.bodyData = bodyBytes; req.requestParameters.bodyData = bodyBytes;
@ -4911,6 +4912,8 @@ class WebSocket {
while(d.length) { while(d.length) {
auto r = socket.send(d); auto r = socket.send(d);
if(r < 0 && wouldHaveBlocked()) { if(r < 0 && wouldHaveBlocked()) {
// FIXME: i should register for a write wakeup
version(use_arsd_core) assert(0);
import core.thread; import core.thread;
Thread.sleep(1.msecs); Thread.sleep(1.msecs);
continue; continue;
@ -5365,6 +5368,59 @@ class WebSocket {
/* } end copy/paste */ /* } end copy/paste */
// returns true if still active
private static bool readyToRead(WebSocket sock) {
sock.timeoutFromInactivity = MonoTime.currTime + sock.config.timeoutFromInactivity;
if(!sock.lowLevelReceive()) {
sock.readyState_ = CLOSED;
if(sock.onerror)
sock.onerror();
if(sock.onclose)
sock.onclose(CloseEvent(CloseEvent.StandardCloseCodes.abnormalClosure, "Connection lost", false, lastSocketError()));
unregisterActiveSocket(sock);
return false;
}
while(sock.processOnce().populated) {}
return true;
}
// returns true if still active, false if not
private static bool timeoutAndPingCheck(WebSocket sock, MonoTime now, Duration* minimumTimeoutForSelect) {
auto diff = sock.timeoutFromInactivity - now;
if(diff <= 0.msecs) {
// it timed out
if(sock.onerror)
sock.onerror();
if(sock.onclose)
sock.onclose(CloseEvent(CloseEvent.StandardCloseCodes.abnormalClosure, "Connection timed out", false, null));
sock.socket.close();
sock.readyState_ = CLOSED;
unregisterActiveSocket(sock);
return false;
}
if(minimumTimeoutForSelect && diff < *minimumTimeoutForSelect)
*minimumTimeoutForSelect = diff;
diff = sock.nextPing - now;
if(diff <= 0.msecs) {
//sock.send(`{"action": "ping"}`);
sock.ping();
sock.nextPing = now + sock.config.pingFrequency.msecs;
} else {
if(minimumTimeoutForSelect && diff < *minimumTimeoutForSelect)
*minimumTimeoutForSelect = diff;
}
return true;
}
/* /*
const int bufferedAmount // amount pending const int bufferedAmount // amount pending
const string extensions const string extensions
@ -5394,87 +5450,56 @@ class WebSocket {
loopExited = false; // reset it so we can reenter loopExited = false; // reset it so we can reenter
} }
static SocketSet readSet; version(use_arsd_core) {
loopExited = false;
if(readSet is null) import arsd.core;
readSet = new SocketSet(); getThisThreadEventLoop().run(() => WebSocket.activeSockets.length == 0 || loopExited || (localLoopExited !is null && *localLoopExited == true));
} else {
static SocketSet readSet;
loopExited = false; if(readSet is null)
readSet = new SocketSet();
outermost: while(!loopExited && (localLoopExited is null || (*localLoopExited == false))) { loopExited = false;
readSet.reset();
Duration timeout = 3.seconds; outermost: while(!loopExited && (localLoopExited is null || (*localLoopExited == false))) {
readSet.reset();
auto now = MonoTime.currTime; Duration timeout = 3.seconds;
bool hadAny;
foreach(sock; activeSockets) {
auto diff = sock.timeoutFromInactivity - now;
if(diff <= 0.msecs) {
// timeout
if(sock.onerror)
sock.onerror();
if(sock.onclose) auto now = MonoTime.currTime;
sock.onclose(CloseEvent(CloseEvent.StandardCloseCodes.abnormalClosure, "Connection timed out", false, null)); bool hadAny;
sock.socket.close();
sock.readyState_ = CLOSED;
unregisterActiveSocket(sock);
continue outermost;
}
if(diff < timeout)
timeout = diff;
diff = sock.nextPing - now;
if(diff <= 0.msecs) {
//sock.send(`{"action": "ping"}`);
sock.ping();
sock.nextPing = now + sock.config.pingFrequency.msecs;
} else {
if(diff < timeout)
timeout = diff;
}
readSet.add(sock.socket);
hadAny = true;
}
if(!hadAny) {
// import std.stdio; writeln("had none");
return;
}
tryAgain:
// import std.stdio; writeln(timeout);
auto selectGot = Socket.select(readSet, null, null, timeout);
if(selectGot == 0) { /* timeout */
// timeout
continue; // it will be handled at the top of the loop
} else if(selectGot == -1) { /* interrupted */
goto tryAgain;
} else {
foreach(sock; activeSockets) { foreach(sock; activeSockets) {
if(readSet.isSet(sock.socket)) { if(!timeoutAndPingCheck(sock, now, &timeout))
sock.timeoutFromInactivity = MonoTime.currTime + sock.config.timeoutFromInactivity; continue outermost;
if(!sock.lowLevelReceive()) {
sock.readyState_ = CLOSED;
if(sock.onerror) readSet.add(sock.socket);
sock.onerror(); hadAny = true;
}
if(sock.onclose) if(!hadAny) {
sock.onclose(CloseEvent(CloseEvent.StandardCloseCodes.abnormalClosure, "Connection lost", false, lastSocketError())); // import std.stdio; writeln("had none");
return;
}
unregisterActiveSocket(sock); tryAgain:
continue outermost; // import std.stdio; writeln(timeout);
auto selectGot = Socket.select(readSet, null, null, timeout);
if(selectGot == 0) { /* timeout */
// timeout
continue; // it will be handled at the top of the loop
} else if(selectGot == -1) { /* interrupted */
goto tryAgain;
} else {
foreach(sock; activeSockets) {
if(readSet.isSet(sock.socket)) {
if(!readyToRead(sock))
continue outermost;
selectGot--;
if(selectGot <= 0)
break;
} }
while(sock.processOnce().populated) {}
selectGot--;
if(selectGot <= 0)
break;
} }
} }
} }
@ -5501,20 +5526,37 @@ class WebSocket {
void registerActiveSocket(WebSocket s) { void registerActiveSocket(WebSocket s) {
// ensure it isn't already there... // ensure it isn't already there...
assert(s !is null); assert(s !is null);
foreach(i, a; activeSockets) if(s.registered)
if(a is s) return;
return; s.activeSocketArrayIndex = activeSockets.length;
activeSockets ~= s; activeSockets ~= s;
s.registered = true;
version(use_arsd_core) {
s.unregisterToken = arsd.core.getThisThreadEventLoop().addCallbackOnFdReadable(s.socket.handle, new arsd.core.CallbackHelper(() { s.readyToRead(s); }));
}
} }
void unregisterActiveSocket(WebSocket s) { void unregisterActiveSocket(WebSocket s) {
foreach(i, a; activeSockets) version(use_arsd_core) {
if(s is a) { s.unregisterToken.unregister();
activeSockets[i] = activeSockets[$-1]; }
activeSockets = activeSockets[0 .. $-1];
break; auto i = s.activeSocketArrayIndex;
} assert(activeSockets[i] is s);
activeSockets[i] = activeSockets[$-1];
activeSockets[i].activeSocketArrayIndex = i;
activeSockets = activeSockets[0 .. $-1];
activeSockets.assumeSafeAppend();
s.registered = false;
} }
} }
private bool registered;
private size_t activeSocketArrayIndex;
version(use_arsd_core) {
static import arsd.core;
arsd.core.ICoreEventLoop.UnregisterToken unregisterToken;
}
} }
private template imported(string mod) { private template imported(string mod) {
@ -5527,6 +5569,8 @@ private template imported(string mod) {
template addToSimpledisplayEventLoop() { template addToSimpledisplayEventLoop() {
import arsd.simpledisplay; import arsd.simpledisplay;
void addToSimpledisplayEventLoop(WebSocket ws, imported!"arsd.simpledisplay".SimpleWindow window) { void addToSimpledisplayEventLoop(WebSocket ws, imported!"arsd.simpledisplay".SimpleWindow window) {
version(use_arsd_core)
return; // already done implicitly
version(Windows) version(Windows)
auto event = WSACreateEvent(); auto event = WSACreateEvent();