diff --git a/cgi.d b/cgi.d index 681fae4..caee996 100644 --- a/cgi.d +++ b/cgi.d @@ -4664,6 +4664,37 @@ version(cgi_with_websocket) { // returns true if data available, false if it timed out bool recvAvailable(Duration timeout = dur!"msecs"(0)) { + if(!waitForNextMessageWouldBlock()) + return true; + if(isDataPending(timeout)) + return true; // this is kinda a lie. + + return false; + } + + public bool lowLevelReceive() { + auto bfr = cgi.idlol; + top: + auto got = bfr.front; + if(got.length) { + if(receiveBuffer.length < receiveBufferUsedLength + got.length) + receiveBuffer.length += receiveBufferUsedLength + got.length; + + receiveBuffer[receiveBufferUsedLength .. receiveBufferUsedLength + got.length] = got[]; + receiveBufferUsedLength += got.length; + bfr.consume(got.length); + + return true; + } + + bfr.popFront(0); + if(bfr.sourceClosed) + return false; + goto top; + } + + + bool isDataPending(Duration timeout = 0.seconds) { Socket socket = cgi.idlol.source; auto check = new SocketSet(); @@ -4676,47 +4707,297 @@ version(cgi_with_websocket) { } // note: this blocks - WebSocketMessage recv() { - // FIXME: should we automatically handle pings and pongs? - if(cgi.idlol.empty()) - throw new Exception("remote side disconnected"); - cgi.idlol.popFront(0); - - WebSocketMessage message; - - message = WebSocketMessage.read(cgi.idlol); - - return message; + WebSocketFrame recv() { + return waitForNextMessage(); } - void send(in char[] text) { - // I cast away const here because I know this msg is private and it doesn't write - // to that buffer unless masking is set... which it isn't, so we're ok. - auto msg = WebSocketMessage.simpleMessage(WebSocketOpcode.text, cast(void[]) text); - msg.send(cgi); + + + + private void llclose() { + cgi.close(); } - void send(in ubyte[] binary) { - // I cast away const here because I know this msg is private and it doesn't write - // to that buffer unless masking is set... which it isn't, so we're ok. - auto msg = WebSocketMessage.simpleMessage(WebSocketOpcode.binary, cast(void[]) binary); - msg.send(cgi); + private void llsend(ubyte[] data) { + cgi.write(data); + cgi.flush(); } - void close() { - auto msg = WebSocketMessage.simpleMessage(WebSocketOpcode.close, null); - msg.send(cgi); + void unregisterActiveSocket(WebSocket) {} + + /* copy/paste section { */ + + private int readyState_; + private ubyte[] receiveBuffer; + private size_t receiveBufferUsedLength; + + private Config config; + + enum CONNECTING = 0; /// Socket has been created. The connection is not yet open. + enum OPEN = 1; /// The connection is open and ready to communicate. + enum CLOSING = 2; /// The connection is in the process of closing. + enum CLOSED = 3; /// The connection is closed or couldn't be opened. + + /++ + + +/ + /// Group: foundational + static struct Config { + /++ + These control the size of the receive buffer. + + It starts at the initial size, will temporarily + balloon up to the maximum size, and will reuse + a buffer up to the likely size. + + Anything larger than the maximum size will cause + the connection to be aborted and an exception thrown. + This is to protect you against a peer trying to + exhaust your memory, while keeping the user-level + processing simple. + +/ + size_t initialReceiveBufferSize = 4096; + size_t likelyReceiveBufferSize = 4096; /// ditto + size_t maximumReceiveBufferSize = 10 * 1024 * 1024; /// ditto + + /++ + Maximum combined size of a message. + +/ + size_t maximumMessageSize = 10 * 1024 * 1024; + + string[string] cookies; /// Cookies to send with the initial request. cookies[name] = value; + string origin; /// Origin URL to send with the handshake, if desired. + string protocol; /// the protocol header, if desired. + + int pingFrequency = 5000; /// Amount of time (in msecs) of idleness after which to send an automatic ping } + /++ + Returns one of [CONNECTING], [OPEN], [CLOSING], or [CLOSED]. + +/ + int readyState() { + return readyState_; + } + + /++ + Closes the connection, sending a graceful teardown message to the other side. + +/ + /// Group: foundational + void close(int code = 0, string reason = null) + //in (reason.length < 123) + in { assert(reason.length < 123); } do + { + if(readyState_ != OPEN) + return; // it cool, we done + WebSocketFrame wss; + wss.fin = true; + wss.opcode = WebSocketOpcode.close; + wss.data = cast(ubyte[]) reason; + wss.send(&llsend); + + readyState_ = CLOSING; + + llclose(); + } + + /++ + Sends a ping message to the server. This is done automatically by the library if you set a non-zero [Config.pingFrequency], but you can also send extra pings explicitly as well with this function. + +/ + /// Group: foundational void ping() { - auto msg = WebSocketMessage.simpleMessage(WebSocketOpcode.ping, null); - msg.send(cgi); + WebSocketFrame wss; + wss.fin = true; + wss.opcode = WebSocketOpcode.ping; + wss.send(&llsend); } + // automatically handled.... void pong() { - auto msg = WebSocketMessage.simpleMessage(WebSocketOpcode.pong, null); - msg.send(cgi); + WebSocketFrame wss; + wss.fin = true; + wss.opcode = WebSocketOpcode.pong; + wss.send(&llsend); } + + /++ + Sends a text message through the websocket. + +/ + /// Group: foundational + void send(in char[] textData) { + WebSocketFrame wss; + wss.fin = true; + wss.opcode = WebSocketOpcode.text; + wss.data = cast(ubyte[]) textData; + wss.send(&llsend); + } + + /++ + Sends a binary message through the websocket. + +/ + /// Group: foundational + void send(in ubyte[] binaryData) { + WebSocketFrame wss; + wss.fin = true; + wss.opcode = WebSocketOpcode.binary; + wss.data = cast(ubyte[]) binaryData; + wss.send(&llsend); + } + + /++ + Waits for and returns the next complete message on the socket. + + Note that the onmessage function is still called, right before + this returns. + +/ + /// Group: blocking_api + public WebSocketFrame waitForNextMessage() { + do { + auto m = processOnce(); + if(m.populated) + return m; + } while(lowLevelReceive()); + + return WebSocketFrame.init; // FIXME? maybe. + } + + /++ + Tells if [waitForNextMessage] would block. + +/ + /// Group: blocking_api + public bool waitForNextMessageWouldBlock() { + checkAgain: + if(isMessageBuffered()) + return false; + if(!isDataPending()) + return true; + while(isDataPending()) + lowLevelReceive(); + goto checkAgain; + } + + /++ + Is there a message in the buffer already? + If `true`, [waitForNextMessage] is guaranteed to return immediately. + If `false`, check [isDataPending] as the next step. + +/ + /// Group: blocking_api + public bool isMessageBuffered() { + ubyte[] d = receiveBuffer[0 .. receiveBufferUsedLength]; + auto s = d; + if(d.length) { + auto orig = d; + auto m = WebSocketFrame.read(d); + // that's how it indicates that it needs more data + if(d !is orig) + return true; + } + + return false; + } + + private ubyte continuingType; + private ubyte[] continuingData; + //private size_t continuingDataLength; + + private WebSocketFrame processOnce() { + ubyte[] d = receiveBuffer[0 .. receiveBufferUsedLength]; + auto s = d; + // FIXME: handle continuation frames more efficiently. it should really just reuse the receive buffer. + WebSocketFrame m; + if(d.length) { + auto orig = d; + m = WebSocketFrame.read(d); + // that's how it indicates that it needs more data + if(d is orig) + return WebSocketFrame.init; + switch(m.opcode) { + case WebSocketOpcode.continuation: + if(continuingData.length + m.data.length > config.maximumMessageSize) + throw new Exception("message size exceeded"); + + continuingData ~= m.data; + if(m.fin) { + if(ontextmessage) + ontextmessage(cast(char[]) continuingData); + if(onbinarymessage) + onbinarymessage(continuingData); + + continuingData = null; + } + break; + case WebSocketOpcode.text: + if(m.fin) { + if(ontextmessage) + ontextmessage(m.textData); + } else { + continuingType = m.opcode; + //continuingDataLength = 0; + continuingData = null; + continuingData ~= m.data; + } + break; + case WebSocketOpcode.binary: + if(m.fin) { + if(onbinarymessage) + onbinarymessage(m.data); + } else { + continuingType = m.opcode; + //continuingDataLength = 0; + continuingData = null; + continuingData ~= m.data; + } + break; + case WebSocketOpcode.close: + readyState_ = CLOSED; + if(onclose) + onclose(); + + unregisterActiveSocket(this); + break; + case WebSocketOpcode.ping: + pong(); + break; + case WebSocketOpcode.pong: + // just really references it is still alive, nbd. + break; + default: // ignore though i could and perhaps should throw too + } + } + receiveBufferUsedLength -= s.length - d.length; + + return m; + } + + private void autoprocess() { + // FIXME + do { + processOnce(); + } while(lowLevelReceive()); + } + + + void delegate() onclose; /// + void delegate() onerror; /// + void delegate(in char[]) ontextmessage; /// + void delegate(in ubyte[]) onbinarymessage; /// + void delegate() onopen; /// + + /++ + + +/ + /// Group: browser_api + void onmessage(void delegate(in char[]) dg) { + ontextmessage = dg; + } + + /// ditto + void onmessage(void delegate(in ubyte[]) dg) { + onbinarymessage = dg; + } + + /* } end copy/paste */ + + } bool websocketRequested(Cgi cgi) { @@ -4755,10 +5036,11 @@ version(cgi_with_websocket) { return new WebSocket(cgi); } - // FIXME: implement websocket extension frames - // get websocket to work on other modes, not just embedded_httpd + // FIXME get websocket to work on other modes, not just embedded_httpd + /* copy/paste in http2.d { */ enum WebSocketOpcode : ubyte { + continuation = 0, text = 1, binary = 2, // 3, 4, 5, 6, 7 RESERVED @@ -4768,7 +5050,7 @@ version(cgi_with_websocket) { // 11,12,13,14,15 RESERVED } - struct WebSocketMessage { + public struct WebSocketFrame { private bool populated; bool fin; bool rsv1; @@ -4781,8 +5063,8 @@ version(cgi_with_websocket) { ubyte[4] maskingKey; // don't set this when sending ubyte[] data; - static WebSocketMessage simpleMessage(WebSocketOpcode opcode, void[] data) { - WebSocketMessage msg; + static WebSocketFrame simpleMessage(WebSocketOpcode opcode, void[] data) { + WebSocketFrame msg; msg.fin = true; msg.opcode = opcode; msg.data = cast(ubyte[]) data; @@ -4790,7 +5072,7 @@ version(cgi_with_websocket) { return msg; } - private void send(Cgi cgi) { + private void send(scope void delegate(ubyte[]) llsend) { ubyte[64] headerScratch; int headerScratchPos = 0; @@ -4846,7 +5128,7 @@ version(cgi_with_websocket) { headerScratch[1] = b2; } - assert(!masked, "masking key not properly implemented"); + //assert(!masked, "masking key not properly implemented"); if(masked) { // FIXME: randomize this headerScratch[headerScratchPos .. headerScratchPos + 4] = maskingKey[]; @@ -4864,19 +5146,18 @@ version(cgi_with_websocket) { } //writeln("SENDING ", headerScratch[0 .. headerScratchPos], data); - cgi.write(headerScratch[0 .. headerScratchPos]); - cgi.write(data); - cgi.flush(); + llsend(headerScratch[0 .. headerScratchPos]); + llsend(data); } - static WebSocketMessage read(ref ubyte[] d) { - WebSocketMessage msg; + static WebSocketFrame read(ref ubyte[] d) { + WebSocketFrame msg; auto orig = d; - WebSocketMessage needsMoreData() { + WebSocketFrame needsMoreData() { d = orig; - return WebSocketMessage.init; + return WebSocketFrame.init; } if(d.length < 2) @@ -4957,22 +5238,11 @@ version(cgi_with_websocket) { return msg; } - static WebSocketMessage read(BufferedInputRange ir) { - readmore: - auto d = ir.front(); - auto m = read(d); - if(m is WebSocketMessage.init) { - ir.popFront(); - goto readmore; - } - return m; - } - char[] textData() { return cast(char[]) data; } } - + /* } */ } diff --git a/http2.d b/http2.d index 5107e87..4b69a1f 100644 --- a/http2.d +++ b/http2.d @@ -2192,62 +2192,8 @@ class WebSocket { private ushort port; private bool ssl; - private int readyState_; - - private Socket socket; - private ubyte[] receiveBuffer; - private size_t receiveBufferUsedLength; - - private Config config; - - enum CONNECTING = 0; /// Socket has been created. The connection is not yet open. - enum OPEN = 1; /// The connection is open and ready to communicate. - enum CLOSING = 2; /// The connection is in the process of closing. - enum CLOSED = 3; /// The connection is closed or couldn't be opened. - /++ - - +/ - /// Group: foundational - static struct Config { - /++ - These control the size of the receive buffer. - - It starts at the initial size, will temporarily - balloon up to the maximum size, and will reuse - a buffer up to the likely size. - - Anything larger than the maximum size will cause - the connection to be aborted and an exception thrown. - This is to protect you against a peer trying to - exhaust your memory, while keeping the user-level - processing simple. - +/ - size_t initialReceiveBufferSize = 4096; - size_t likelyReceiveBufferSize = 4096; /// ditto - size_t maximumReceiveBufferSize = 10 * 1024 * 1024; /// ditto - - /++ - Maximum combined size of a message. - +/ - size_t maximumMessageSize = 10 * 1024 * 1024; - - string[string] cookies; /// Cookies to send with the initial request. cookies[name] = value; - string origin; /// Origin URL to send with the handshake, if desired. - string protocol; /// the protocol header, if desired. - - int pingFrequency = 5000; /// Amount of time (in msecs) of idleness after which to send an automatic ping - } - - /++ - Returns one of [CONNECTING], [OPEN], [CLOSING], or [CLOSED]. - +/ - int readyState() { - return readyState_; - } - - /++ -wss://echo.websocket.org + wss://echo.websocket.org +/ /// Group: foundational this(Uri uri, Config config = Config.init) @@ -2441,6 +2387,130 @@ wss://echo.websocket.org registerActiveSocket(this); } + /++ + Is data pending on the socket? Also check [isMessageBuffered] to see if there + is already a message in memory too. + + If this returns `true`, you can call [lowLevelReceive], then try [isMessageBuffered] + again. + +/ + /// Group: blocking_api + public bool isDataPending(Duration timeout = 0.seconds) { + static SocketSet readSet; + if(readSet is null) + readSet = new SocketSet(); + + version(with_openssl) + if(auto s = cast(SslClientSocket) socket) { + // select doesn't handle the case with stuff + // left in the ssl buffer so i'm checking it separately + if(s.dataPending()) { + return true; + } + } + + readSet.add(socket); + + //tryAgain: + auto selectGot = Socket.select(readSet, null, null, timeout); + if(selectGot == 0) { /* timeout */ + // timeout + return false; + } else if(selectGot == -1) { /* interrupted */ + return false; + } else { /* ready */ + if(readSet.isSet(socket)) { + return true; + } + } + + return false; + } + + private void llsend(ubyte[] d) { + while(d.length) { + auto r = socket.send(d); + if(r <= 0) throw new Exception("wtf"); + d = d[r .. $]; + } + } + + private void llclose() { + socket.shutdown(SocketShutdown.SEND); + } + + /++ + Waits for more data off the low-level socket and adds it to the pending buffer. + + Returns `true` if the connection is still active. + +/ + /// Group: blocking_api + public bool lowLevelReceive() { + auto r = socket.receive(receiveBuffer[receiveBufferUsedLength .. $]); + if(r == 0) + return false; + if(r <= 0) + throw new Exception("wtf"); + receiveBufferUsedLength += r; + return true; + } + + private Socket socket; + + /* copy/paste section { */ + + private int readyState_; + private ubyte[] receiveBuffer; + private size_t receiveBufferUsedLength; + + private Config config; + + enum CONNECTING = 0; /// Socket has been created. The connection is not yet open. + enum OPEN = 1; /// The connection is open and ready to communicate. + enum CLOSING = 2; /// The connection is in the process of closing. + enum CLOSED = 3; /// The connection is closed or couldn't be opened. + + /++ + + +/ + /// Group: foundational + static struct Config { + /++ + These control the size of the receive buffer. + + It starts at the initial size, will temporarily + balloon up to the maximum size, and will reuse + a buffer up to the likely size. + + Anything larger than the maximum size will cause + the connection to be aborted and an exception thrown. + This is to protect you against a peer trying to + exhaust your memory, while keeping the user-level + processing simple. + +/ + size_t initialReceiveBufferSize = 4096; + size_t likelyReceiveBufferSize = 4096; /// ditto + size_t maximumReceiveBufferSize = 10 * 1024 * 1024; /// ditto + + /++ + Maximum combined size of a message. + +/ + size_t maximumMessageSize = 10 * 1024 * 1024; + + string[string] cookies; /// Cookies to send with the initial request. cookies[name] = value; + string origin; /// Origin URL to send with the handshake, if desired. + string protocol; /// the protocol header, if desired. + + int pingFrequency = 5000; /// Amount of time (in msecs) of idleness after which to send an automatic ping + } + + /++ + Returns one of [CONNECTING], [OPEN], [CLOSING], or [CLOSED]. + +/ + int readyState() { + return readyState_; + } + /++ Closes the connection, sending a graceful teardown message to the other side. +/ @@ -2459,7 +2529,7 @@ wss://echo.websocket.org readyState_ = CLOSING; - socket.shutdown(SocketShutdown.SEND); + llclose(); } /++ @@ -2505,31 +2575,6 @@ wss://echo.websocket.org wss.send(&llsend); } - - private void llsend(ubyte[] d) { - while(d.length) { - auto r = socket.send(d); - if(r <= 0) throw new Exception("wtf"); - d = d[r .. $]; - } - } - - /++ - Waits for more data off the low-level socket and adds it to the pending buffer. - - Returns `true` if the connection is still active. - +/ - /// Group: blocking_api - public bool lowLevelReceive() { - auto r = socket.receive(receiveBuffer[receiveBufferUsedLength .. $]); - if(r == 0) - return false; - if(r <= 0) - throw new Exception("wtf"); - receiveBufferUsedLength += r; - return true; - } - /++ Waits for and returns the next complete message on the socket. @@ -2582,46 +2627,6 @@ wss://echo.websocket.org return false; } - /++ - Is data pending on the socket? Also check [isMessageBuffered] to see if there - is already a message in memory too. - - If this returns `true`, you can call [lowLevelReceive], then try [isMessageBuffered] - again. - +/ - /// Group: blocking_api - public bool isDataPending() { - static SocketSet readSet; - if(readSet is null) - readSet = new SocketSet(); - - version(with_openssl) - if(auto s = cast(SslClientSocket) socket) { - // select doesn't handle the case with stuff - // left in the ssl buffer so i'm checking it separately - if(s.dataPending()) { - return true; - } - } - - readSet.add(socket); - - //tryAgain: - auto selectGot = Socket.select(readSet, null, null, 0.seconds /* timeout */); - if(selectGot == 0) { /* timeout */ - // timeout - return false; - } else if(selectGot == -1) { /* interrupted */ - return false; - } else { /* ready */ - if(readSet.isSet(socket)) { - return true; - } - } - - return false; - } - private ubyte continuingType; private ubyte[] continuingData; //private size_t continuingDataLength; @@ -2722,6 +2727,8 @@ wss://echo.websocket.org onbinarymessage = dg; } + /* } end copy/paste */ + /* const int bufferedAmount // amount pending const string extensions @@ -2802,7 +2809,7 @@ wss://echo.websocket.org } /* copy/paste from cgi.d */ -private { +public { enum WebSocketOpcode : ubyte { continuation = 0, text = 1, @@ -3007,3 +3014,36 @@ private { } } } + +/+ + so the url params are arguments. it knows the request + internally. other params are properties on the req + + names may have different paths... those will just add ForSomething i think. + + auto req = api.listMergeRequests + req.page = 10; + + or + req.page(1) + .bar("foo") + + req.execute(); + + + everything in the response is nullable access through the + dynamic object, just with property getters there. need to make + it static generated tho + + other messages may be: isPresent and getDynamic + + + AND/OR what about doing it like the rails objects + + BroadcastMessage.get(4) + // various properties + + // it lists what you updated + + BroadcastMessage.foo().bar().put(5) ++/