diff --git a/http2.d b/http2.d index 509d314..7283611 100644 --- a/http2.d +++ b/http2.d @@ -14,6 +14,8 @@ +/ module arsd.http2; +// FIXME: I think I want to disable sigpipe here too. + import std.uri : encodeComponent; debug(arsd_http2_verbose) debug=arsd_http2; @@ -1094,6 +1096,9 @@ class HttpRequest { if(colon == -1) return; auto name = header[0 .. colon]; + if(colon + 1 == header.length) + return; // empty header, idk + assert(colon + 2 < header.length, header); auto value = header[colon + 2 .. $]; // skipping the colon itself and the following space switch(name) { @@ -2094,7 +2099,7 @@ private bool bicmp(in ubyte[] item, in char[] search) { The blocking API is best used when you only need basic functionality with a single connection. --- - WebSocketMessage msg; + WebSocketFrame msg; do { // FIXME good demo } while(msg); @@ -2167,6 +2172,11 @@ class WebSocket { size_t likelyReceiveBufferSize = 4096; /// ditto size_t maximumReceiveBufferSize = 10 * 1024 * 1024; /// ditto + /++ + Maximum combined size of a message. + +/ + size_t maximumMessageSize = 10 * 1024 * 1024; + string[string] cookies; /// Cookies to send with the initial request. cookies[name] = value; string origin; /// Origin URL to send with the handshake, if desired. string protocol; /// the protocol header, if desired. @@ -2368,6 +2378,8 @@ wss://echo.websocket.org if(onopen) onopen(); + + registerActiveSocket(this); } /++ @@ -2377,7 +2389,9 @@ wss://echo.websocket.org void close(int code = 0, string reason = null) in (reason.length < 123) { - WebSocketMessage wss; + if(readyState_ != OPEN) + return; // it cool, we done + WebSocketFrame wss; wss.fin = true; wss.opcode = WebSocketOpcode.close; wss.data = cast(ubyte[]) reason; @@ -2393,7 +2407,7 @@ wss://echo.websocket.org +/ /// Group: foundational void ping() { - WebSocketMessage wss; + WebSocketFrame wss; wss.fin = true; wss.opcode = WebSocketOpcode.ping; wss.send(&llsend); @@ -2401,7 +2415,7 @@ wss://echo.websocket.org // automatically handled.... void pong() { - WebSocketMessage wss; + WebSocketFrame wss; wss.fin = true; wss.opcode = WebSocketOpcode.pong; wss.send(&llsend); @@ -2412,7 +2426,7 @@ wss://echo.websocket.org +/ /// Group: foundational void send(in char[] textData) { - WebSocketMessage wss; + WebSocketFrame wss; wss.fin = true; wss.opcode = WebSocketOpcode.text; wss.data = cast(ubyte[]) textData; @@ -2424,7 +2438,7 @@ wss://echo.websocket.org +/ /// Group: foundational void send(in ubyte[] binaryData) { - WebSocketMessage wss; + WebSocketFrame wss; wss.fin = true; wss.opcode = WebSocketOpcode.binary; wss.data = cast(ubyte[]) binaryData; @@ -2463,14 +2477,14 @@ wss://echo.websocket.org this returns. +/ /// Group: blocking_api - public WebSocketMessage waitForNextMessage() { + public WebSocketFrame waitForNextMessage() { do { auto m = processOnce(); if(m.populated) return m; } while(lowLevelReceive()); - return WebSocketMessage.init; // FIXME? maybe. + return WebSocketFrame.init; // FIXME? maybe. } /++ @@ -2499,7 +2513,7 @@ wss://echo.websocket.org auto s = d; if(d.length) { auto orig = d; - auto m = WebSocketMessage.read(d); + auto m = WebSocketFrame.read(d); // that's how it indicates that it needs more data if(d !is orig) return true; @@ -2548,26 +2562,72 @@ wss://echo.websocket.org return false; } - private WebSocketMessage processOnce() { + private ubyte continuingType; + private ubyte[] continuingData; + //private size_t continuingDataLength; + + private WebSocketFrame processOnce() { ubyte[] d = receiveBuffer[0 .. receiveBufferUsedLength]; auto s = d; - // FIXME: handle continuation frames - WebSocketMessage m; + // FIXME: handle continuation frames more efficiently. it should really just reuse the receive buffer. + WebSocketFrame m; if(d.length) { auto orig = d; - m = WebSocketMessage.read(d); + m = WebSocketFrame.read(d); // that's how it indicates that it needs more data if(d is orig) - return WebSocketMessage.init; - if(m.opcode == WebSocketOpcode.close) { - readyState_ = CLOSED; - if(onclose) - onclose(); - } else if(m.opcode == WebSocketOpcode.text) { - if(ontextmessage) - ontextmessage(m.textData); - } else { - // FIXME + return WebSocketFrame.init; + switch(m.opcode) { + case WebSocketOpcode.continuation: + if(continuingData.length + m.data.length > config.maximumMessageSize) + throw new Exception("message size exceeded"); + + continuingData ~= m.data; + if(fin) { + if(ontextmessage) + ontextmessage(cast(char[]) continuingData); + if(onbinarymessage) + onbinarymessage(continuingData); + + continuingData = null; + } + break; + case WebSocketOpcode.text: + if(m.fin) { + if(ontextmessage) + ontextmessage(m.textData); + } else { + continuingType = m.opcode; + //continuingDataLength = 0; + continuingData = null; + continuingData ~= m.data; + } + break; + case WebSocketOpcode.binary: + if(m.fin) { + if(onbinarymessage) + onbinarymessage(m.data); + } else { + continuingType = m.opcode; + //continuingDataLength = 0; + continuingData = null; + continuingData ~= m.data; + } + break; + case WebSocketOpcode.close: + readyState_ = CLOSED; + if(onclose) + onclose(); + + unregisterActiveSocket(this); + break; + case WebSocketOpcode.ping: + pong(); + break; + case WebSocketOpcode.pong: + // just really references it is still alive, nbd. + break; + default: // ignore though i could and perhaps should throw too } } receiveBufferUsedLength -= s.length - d.length; @@ -2586,6 +2646,7 @@ wss://echo.websocket.org void delegate() onclose; /// void delegate() onerror; /// void delegate(in char[]) ontextmessage; /// + void delegate(in ubyte[]) onbinarymessage; /// void delegate() onopen; /// /++ @@ -2596,6 +2657,11 @@ wss://echo.websocket.org ontextmessage = dg; } + /// ditto + void onmessage(void delegate(in ubyte[]) dg) { + onbinarymessage = dg; + } + /* const int bufferedAmount // amount pending const string extensions @@ -2603,6 +2669,76 @@ wss://echo.websocket.org const string protocol const string url */ + + static { + /++ + + +/ + void eventLoop() { + + static SocketSet readSet; + + if(readSet is null) + readSet = new SocketSet(); + + outermost: while(!loopExited) { + readSet.reset(); + + bool hadAny; + foreach(sock; activeSockets) { + readSet.add(sock.socket); + hadAny = true; + } + + if(!hadAny) + return; + + tryAgain: + auto selectGot = Socket.select(readSet, null, null, 10.seconds /* timeout */); + if(selectGot == 0) { /* timeout */ + // timeout + goto tryAgain; + } else if(selectGot == -1) { /* interrupted */ + goto tryAgain; + } else { + foreach(sock; activeSockets) { + if(readSet.isSet(sock.socket)) { + if(!sock.lowLevelReceive()) { + sock.readyState_ = CLOSED; + unregisterActiveSocket(sock); + continue outermost; + } + while(sock.processOnce().populated) {} + selectGot--; + if(selectGot <= 0) + break; + } + } + } + } + } + + private bool loopExited; + /++ + + +/ + void exitEventLoop() { + loopExited = true; + } + + WebSocket[] activeSockets; + void registerActiveSocket(WebSocket s) { + activeSockets ~= s; + } + void unregisterActiveSocket(WebSocket s) { + foreach(i, a; activeSockets) + if(s is a) { + activeSockets[i] = activeSockets[$-1]; + activeSockets = activeSockets[0 .. $-1]; + break; + } + } + } } /* copy/paste from cgi.d */ @@ -2618,7 +2754,7 @@ private { // 11,12,13,14,15 RESERVED } - public struct WebSocketMessage { + public struct WebSocketFrame { private bool populated; bool fin; bool rsv1; @@ -2631,8 +2767,8 @@ private { ubyte[4] maskingKey; // don't set this when sending ubyte[] data; - static WebSocketMessage simpleMessage(WebSocketOpcode opcode, void[] data) { - WebSocketMessage msg; + static WebSocketFrame simpleMessage(WebSocketOpcode opcode, void[] data) { + WebSocketFrame msg; msg.fin = true; msg.opcode = opcode; msg.data = cast(ubyte[]) data; @@ -2718,8 +2854,8 @@ private { llsend(data); } - static WebSocketMessage read(ref ubyte[] d) { - WebSocketMessage msg; + static WebSocketFrame read(ref ubyte[] d) { + WebSocketFrame msg; assert(d.length >= 2); auto orig = d; @@ -2772,7 +2908,7 @@ private { if(msg.realLength > d.length) { d = orig; - return WebSocketMessage.init; + return WebSocketFrame.init; } msg.data = d[0 .. msg.realLength];