better websocket stuff

This commit is contained in:
Adam D. Ruppe 2020-01-11 19:47:41 -05:00
parent f6189fe119
commit 4702c672ca
1 changed files with 165 additions and 29 deletions

194
http2.d
View File

@ -14,6 +14,8 @@
+/ +/
module arsd.http2; module arsd.http2;
// FIXME: I think I want to disable sigpipe here too.
import std.uri : encodeComponent; import std.uri : encodeComponent;
debug(arsd_http2_verbose) debug=arsd_http2; debug(arsd_http2_verbose) debug=arsd_http2;
@ -1094,6 +1096,9 @@ class HttpRequest {
if(colon == -1) if(colon == -1)
return; return;
auto name = header[0 .. colon]; auto name = header[0 .. colon];
if(colon + 1 == header.length)
return; // empty header, idk
assert(colon + 2 < header.length, header);
auto value = header[colon + 2 .. $]; // skipping the colon itself and the following space auto value = header[colon + 2 .. $]; // skipping the colon itself and the following space
switch(name) { switch(name) {
@ -2094,7 +2099,7 @@ private bool bicmp(in ubyte[] item, in char[] search) {
The blocking API is best used when you only need basic functionality with a single connection. The blocking API is best used when you only need basic functionality with a single connection.
--- ---
WebSocketMessage msg; WebSocketFrame msg;
do { do {
// FIXME good demo // FIXME good demo
} while(msg); } while(msg);
@ -2167,6 +2172,11 @@ class WebSocket {
size_t likelyReceiveBufferSize = 4096; /// ditto size_t likelyReceiveBufferSize = 4096; /// ditto
size_t maximumReceiveBufferSize = 10 * 1024 * 1024; /// ditto size_t maximumReceiveBufferSize = 10 * 1024 * 1024; /// ditto
/++
Maximum combined size of a message.
+/
size_t maximumMessageSize = 10 * 1024 * 1024;
string[string] cookies; /// Cookies to send with the initial request. cookies[name] = value; string[string] cookies; /// Cookies to send with the initial request. cookies[name] = value;
string origin; /// Origin URL to send with the handshake, if desired. string origin; /// Origin URL to send with the handshake, if desired.
string protocol; /// the protocol header, if desired. string protocol; /// the protocol header, if desired.
@ -2368,6 +2378,8 @@ wss://echo.websocket.org
if(onopen) if(onopen)
onopen(); onopen();
registerActiveSocket(this);
} }
/++ /++
@ -2377,7 +2389,9 @@ wss://echo.websocket.org
void close(int code = 0, string reason = null) void close(int code = 0, string reason = null)
in (reason.length < 123) in (reason.length < 123)
{ {
WebSocketMessage wss; if(readyState_ != OPEN)
return; // it cool, we done
WebSocketFrame wss;
wss.fin = true; wss.fin = true;
wss.opcode = WebSocketOpcode.close; wss.opcode = WebSocketOpcode.close;
wss.data = cast(ubyte[]) reason; wss.data = cast(ubyte[]) reason;
@ -2393,7 +2407,7 @@ wss://echo.websocket.org
+/ +/
/// Group: foundational /// Group: foundational
void ping() { void ping() {
WebSocketMessage wss; WebSocketFrame wss;
wss.fin = true; wss.fin = true;
wss.opcode = WebSocketOpcode.ping; wss.opcode = WebSocketOpcode.ping;
wss.send(&llsend); wss.send(&llsend);
@ -2401,7 +2415,7 @@ wss://echo.websocket.org
// automatically handled.... // automatically handled....
void pong() { void pong() {
WebSocketMessage wss; WebSocketFrame wss;
wss.fin = true; wss.fin = true;
wss.opcode = WebSocketOpcode.pong; wss.opcode = WebSocketOpcode.pong;
wss.send(&llsend); wss.send(&llsend);
@ -2412,7 +2426,7 @@ wss://echo.websocket.org
+/ +/
/// Group: foundational /// Group: foundational
void send(in char[] textData) { void send(in char[] textData) {
WebSocketMessage wss; WebSocketFrame wss;
wss.fin = true; wss.fin = true;
wss.opcode = WebSocketOpcode.text; wss.opcode = WebSocketOpcode.text;
wss.data = cast(ubyte[]) textData; wss.data = cast(ubyte[]) textData;
@ -2424,7 +2438,7 @@ wss://echo.websocket.org
+/ +/
/// Group: foundational /// Group: foundational
void send(in ubyte[] binaryData) { void send(in ubyte[] binaryData) {
WebSocketMessage wss; WebSocketFrame wss;
wss.fin = true; wss.fin = true;
wss.opcode = WebSocketOpcode.binary; wss.opcode = WebSocketOpcode.binary;
wss.data = cast(ubyte[]) binaryData; wss.data = cast(ubyte[]) binaryData;
@ -2463,14 +2477,14 @@ wss://echo.websocket.org
this returns. this returns.
+/ +/
/// Group: blocking_api /// Group: blocking_api
public WebSocketMessage waitForNextMessage() { public WebSocketFrame waitForNextMessage() {
do { do {
auto m = processOnce(); auto m = processOnce();
if(m.populated) if(m.populated)
return m; return m;
} while(lowLevelReceive()); } while(lowLevelReceive());
return WebSocketMessage.init; // FIXME? maybe. return WebSocketFrame.init; // FIXME? maybe.
} }
/++ /++
@ -2499,7 +2513,7 @@ wss://echo.websocket.org
auto s = d; auto s = d;
if(d.length) { if(d.length) {
auto orig = d; auto orig = d;
auto m = WebSocketMessage.read(d); auto m = WebSocketFrame.read(d);
// that's how it indicates that it needs more data // that's how it indicates that it needs more data
if(d !is orig) if(d !is orig)
return true; return true;
@ -2548,26 +2562,72 @@ wss://echo.websocket.org
return false; return false;
} }
private WebSocketMessage processOnce() { private ubyte continuingType;
private ubyte[] continuingData;
//private size_t continuingDataLength;
private WebSocketFrame processOnce() {
ubyte[] d = receiveBuffer[0 .. receiveBufferUsedLength]; ubyte[] d = receiveBuffer[0 .. receiveBufferUsedLength];
auto s = d; auto s = d;
// FIXME: handle continuation frames // FIXME: handle continuation frames more efficiently. it should really just reuse the receive buffer.
WebSocketMessage m; WebSocketFrame m;
if(d.length) { if(d.length) {
auto orig = d; auto orig = d;
m = WebSocketMessage.read(d); m = WebSocketFrame.read(d);
// that's how it indicates that it needs more data // that's how it indicates that it needs more data
if(d is orig) if(d is orig)
return WebSocketMessage.init; return WebSocketFrame.init;
if(m.opcode == WebSocketOpcode.close) { switch(m.opcode) {
readyState_ = CLOSED; case WebSocketOpcode.continuation:
if(onclose) if(continuingData.length + m.data.length > config.maximumMessageSize)
onclose(); throw new Exception("message size exceeded");
} else if(m.opcode == WebSocketOpcode.text) {
if(ontextmessage) continuingData ~= m.data;
ontextmessage(m.textData); if(fin) {
} else { if(ontextmessage)
// FIXME ontextmessage(cast(char[]) continuingData);
if(onbinarymessage)
onbinarymessage(continuingData);
continuingData = null;
}
break;
case WebSocketOpcode.text:
if(m.fin) {
if(ontextmessage)
ontextmessage(m.textData);
} else {
continuingType = m.opcode;
//continuingDataLength = 0;
continuingData = null;
continuingData ~= m.data;
}
break;
case WebSocketOpcode.binary:
if(m.fin) {
if(onbinarymessage)
onbinarymessage(m.data);
} else {
continuingType = m.opcode;
//continuingDataLength = 0;
continuingData = null;
continuingData ~= m.data;
}
break;
case WebSocketOpcode.close:
readyState_ = CLOSED;
if(onclose)
onclose();
unregisterActiveSocket(this);
break;
case WebSocketOpcode.ping:
pong();
break;
case WebSocketOpcode.pong:
// just really references it is still alive, nbd.
break;
default: // ignore though i could and perhaps should throw too
} }
} }
receiveBufferUsedLength -= s.length - d.length; receiveBufferUsedLength -= s.length - d.length;
@ -2586,6 +2646,7 @@ wss://echo.websocket.org
void delegate() onclose; /// void delegate() onclose; ///
void delegate() onerror; /// void delegate() onerror; ///
void delegate(in char[]) ontextmessage; /// void delegate(in char[]) ontextmessage; ///
void delegate(in ubyte[]) onbinarymessage; ///
void delegate() onopen; /// void delegate() onopen; ///
/++ /++
@ -2596,6 +2657,11 @@ wss://echo.websocket.org
ontextmessage = dg; ontextmessage = dg;
} }
/// ditto
void onmessage(void delegate(in ubyte[]) dg) {
onbinarymessage = dg;
}
/* /*
const int bufferedAmount // amount pending const int bufferedAmount // amount pending
const string extensions const string extensions
@ -2603,6 +2669,76 @@ wss://echo.websocket.org
const string protocol const string protocol
const string url const string url
*/ */
static {
/++
+/
void eventLoop() {
static SocketSet readSet;
if(readSet is null)
readSet = new SocketSet();
outermost: while(!loopExited) {
readSet.reset();
bool hadAny;
foreach(sock; activeSockets) {
readSet.add(sock.socket);
hadAny = true;
}
if(!hadAny)
return;
tryAgain:
auto selectGot = Socket.select(readSet, null, null, 10.seconds /* timeout */);
if(selectGot == 0) { /* timeout */
// timeout
goto tryAgain;
} else if(selectGot == -1) { /* interrupted */
goto tryAgain;
} else {
foreach(sock; activeSockets) {
if(readSet.isSet(sock.socket)) {
if(!sock.lowLevelReceive()) {
sock.readyState_ = CLOSED;
unregisterActiveSocket(sock);
continue outermost;
}
while(sock.processOnce().populated) {}
selectGot--;
if(selectGot <= 0)
break;
}
}
}
}
}
private bool loopExited;
/++
+/
void exitEventLoop() {
loopExited = true;
}
WebSocket[] activeSockets;
void registerActiveSocket(WebSocket s) {
activeSockets ~= s;
}
void unregisterActiveSocket(WebSocket s) {
foreach(i, a; activeSockets)
if(s is a) {
activeSockets[i] = activeSockets[$-1];
activeSockets = activeSockets[0 .. $-1];
break;
}
}
}
} }
/* copy/paste from cgi.d */ /* copy/paste from cgi.d */
@ -2618,7 +2754,7 @@ private {
// 11,12,13,14,15 RESERVED // 11,12,13,14,15 RESERVED
} }
public struct WebSocketMessage { public struct WebSocketFrame {
private bool populated; private bool populated;
bool fin; bool fin;
bool rsv1; bool rsv1;
@ -2631,8 +2767,8 @@ private {
ubyte[4] maskingKey; // don't set this when sending ubyte[4] maskingKey; // don't set this when sending
ubyte[] data; ubyte[] data;
static WebSocketMessage simpleMessage(WebSocketOpcode opcode, void[] data) { static WebSocketFrame simpleMessage(WebSocketOpcode opcode, void[] data) {
WebSocketMessage msg; WebSocketFrame msg;
msg.fin = true; msg.fin = true;
msg.opcode = opcode; msg.opcode = opcode;
msg.data = cast(ubyte[]) data; msg.data = cast(ubyte[]) data;
@ -2718,8 +2854,8 @@ private {
llsend(data); llsend(data);
} }
static WebSocketMessage read(ref ubyte[] d) { static WebSocketFrame read(ref ubyte[] d) {
WebSocketMessage msg; WebSocketFrame msg;
assert(d.length >= 2); assert(d.length >= 2);
auto orig = d; auto orig = d;
@ -2772,7 +2908,7 @@ private {
if(msg.realLength > d.length) { if(msg.realLength > d.length) {
d = orig; d = orig;
return WebSocketMessage.init; return WebSocketFrame.init;
} }
msg.data = d[0 .. msg.realLength]; msg.data = d[0 .. msg.realLength];