diff --git a/http2.d b/http2.d index 64120d5..1d78fe5 100644 --- a/http2.d +++ b/http2.d @@ -2052,11 +2052,70 @@ private bool bicmp(in ubyte[] item, in char[] search) { } /++ - WebSocket client, based on the browser api. + WebSocket client, based on the browser api, though also with other api options. - on receive - on frame - on message + --- + auto ws = new WebSocket(URI("ws://....")); + + ws.onmessage = (in char[] msg) { + ws.send("a reply"); + }; + + ws.connect(); + + WebSocket.eventLoop(); + --- + + Symbol_groups: + foundational = + Used with all API styles. + + browser_api = + API based on the standard in the browser. + + event_loop_integration = + Integrating with external event loops is done through static functions. You should + call these BEFORE doing anything else with the WebSocket module or class. + + $(PITFALL NOT IMPLEMENTED) + --- + WebSocket.setEventLoopProxy(arsd.simpledisplay.EventLoop.proxy.tupleof); + // or something like that. it is not implemented yet. + --- + $(PITFALL NOT IMPLEMENTED) + + blocking_api = + The blocking API is best used when you only need basic functionality with a single connection. + + --- + WebSocketMessage msg; + do { + // FIXME good demo + } while(msg); + --- + + Or to check for blocks before calling: + + --- + try_to_process_more: + while(ws.isMessageBuffered()) { + auto msg = ws.waitForNextMessage(); + // process msg + } + if(ws.isDataPending()) { + ws.lowLevelReceive(); + goto try_to_process_more; + } else { + // nothing ready, you can do other things + // or at least sleep a while before trying + // to process more. + if(ws.readyState == WebSocket.OPEN) { + Thread.sleep(1.seconds); + goto try_to_process_more; + } + } + --- + +/ class WebSocket { private Uri uri; @@ -2083,6 +2142,7 @@ class WebSocket { /++ +/ + /// Group: foundational static struct Config { /++ These control the size of the receive buffer. @@ -2104,6 +2164,8 @@ class WebSocket { string[string] cookies; /// Cookies to send with the initial request. cookies[name] = value; string origin; /// Origin URL to send with the handshake, if desired. string protocol; /// the protocol header, if desired. + + int pingFrequency = 5000; /// Amount of time (in msecs) of idleness after which to send an automatic ping } /++ @@ -2116,6 +2178,7 @@ class WebSocket { /++ wss://echo.websocket.org +/ + /// Group: foundational this(Uri uri, Config config = Config.init) in (uri.scheme == "ws" || uri.scheme == "wss") { @@ -2141,6 +2204,7 @@ wss://echo.websocket.org /++ +/ + /// Group: foundational void connect() { socket.connect(new InternetAddress(host, port)); // FIXME: ipv6 support... // FIXME: websocket handshake could and really should be async too. @@ -2289,7 +2353,7 @@ wss://echo.websocket.org // FIXME: check protocol if config requested one - // FIXME: check accept + // FIXME: check accept for the right hash receiveBuffer[0 .. used.length] = used[]; receiveBufferUsedLength = used.length; @@ -2301,8 +2365,9 @@ wss://echo.websocket.org } /++ - + Closes the connection, sending a graceful teardown message to the other side. +/ + /// Group: foundational void close(int code = 0, string reason = null) in (reason.length < 123) { @@ -2318,8 +2383,9 @@ wss://echo.websocket.org } /++ - + Sends a ping message to the server. This is done automatically by the library if you set a non-zero [Config.pingFrequency], but you can also send extra pings explicitly as well with this function. +/ + /// Group: foundational void ping() { WebSocketMessage wss; wss.fin = true; @@ -2336,8 +2402,9 @@ wss://echo.websocket.org } /++ - + Sends a text message through the websocket. +/ + /// Group: foundational void send(in char[] textData) { WebSocketMessage wss; wss.fin = true; @@ -2346,6 +2413,19 @@ wss://echo.websocket.org wss.send(&llsend); } + /++ + Sends a binary message through the websocket. + +/ + /// Group: foundational + void send(in ubyte[] binaryData) { + WebSocketMessage wss; + wss.fin = true; + wss.opcode = WebSocketOpcode.binary; + wss.data = cast(ubyte[]) binaryData; + wss.send(&llsend); + } + + private void llsend(ubyte[] d) { while(d.length) { auto r = socket.send(d); @@ -2354,7 +2434,13 @@ wss://echo.websocket.org } } - /*private*/ bool llreceive() { + /++ + Waits for more data off the low-level socket and adds it to the pending buffer. + + Returns `true` if the connection is still active. + +/ + /// Group: blocking_api + public bool lowLevelReceive() { auto r = socket.receive(receiveBuffer[receiveBufferUsedLength .. $]); if(r == 0) return false; @@ -2364,47 +2450,146 @@ wss://echo.websocket.org return true; } - public void autoprocess() { + /++ + Waits for and returns the next complete message on the socket. + + Note that the onmessage function is still called, right before + this returns. + +/ + /// Group: blocking_api + public WebSocketMessage waitForNextMessage() { do { - ubyte[] d = receiveBuffer[0 .. receiveBufferUsedLength]; - auto s = d; - while(d.length) { - auto orig = d; - auto m = WebSocketMessage.read(d); - if(d is orig) - break; - if(m.opcode == WebSocketOpcode.close) { - readyState_ = CLOSED; - if(onclose) - onclose(); - } else if(m.opcode == WebSocketOpcode.text) { - if(onmessage) - onmessage(m.textData); - } else { - // FIXME - } - } - receiveBufferUsedLength -= s.length - d.length; - } while(llreceive()); + auto m = processOnce(); + if(m.populated) + return m; + } while(lowLevelReceive()); + + return WebSocketMessage.init; // FIXME? maybe. } + /++ + Tells if [waitForNextMessage] would block. + +/ + /// Group: blocking_api + public bool waitForNextMessageWouldBlock() { + checkAgain: + if(isMessageBuffered()) + return false; + if(!isDataPending()) + return true; + while(isDataPending()) + lowLevelReceive(); + goto checkAgain; + } + + /++ + Is there a message in the buffer already? + If `true`, [waitForNextMessage] is guaranteed to return immediately. + If `false`, check [isDataPending] as the next step. + +/ + /// Group: blocking_api + public bool isMessageBuffered() { + ubyte[] d = receiveBuffer[0 .. receiveBufferUsedLength]; + auto s = d; + if(d.length) { + auto orig = d; + auto m = WebSocketMessage.read(d); + // that's how it indicates that it needs more data + if(d !is orig) + return true; + } + + return false; + } + + /++ + Is data pending on the socket? Also check [isMessageBuffered] to see if there + is already a message in memory too. + + If this returns `true`, you can call [lowLevelReceive], then try [isMessageBuffered] + again. + +/ + /// Group: blocking_api + public bool isDataPending() { + static SocketSet readSet; + if(readSet is null) + readSet = new SocketSet(); + + version(with_openssl) + if(auto s = cast(SslClientSocket) socket) { + // select doesn't handle the case with stuff + // left in the ssl buffer so i'm checking it separately + if(s.dataPending()) { + return true; + } + } + + readSet.add(socket); + + //tryAgain: + auto selectGot = Socket.select(readSet, null, null, 0.seconds /* timeout */); + if(selectGot == 0) { /* timeout */ + // timeout + return false; + } else if(selectGot == -1) { /* interrupted */ + return false; + } else { /* ready */ + if(readSet.isSet(socket)) { + return true; + } + } + + return false; + } + + private WebSocketMessage processOnce() { + ubyte[] d = receiveBuffer[0 .. receiveBufferUsedLength]; + auto s = d; + // FIXME: handle continuation frames + WebSocketMessage m; + if(d.length) { + auto orig = d; + m = WebSocketMessage.read(d); + // that's how it indicates that it needs more data + if(d is orig) + return WebSocketMessage.init; + if(m.opcode == WebSocketOpcode.close) { + readyState_ = CLOSED; + if(onclose) + onclose(); + } else if(m.opcode == WebSocketOpcode.text) { + if(ontextmessage) + ontextmessage(m.textData); + } else { + // FIXME + } + } + receiveBufferUsedLength -= s.length - d.length; + + return m; + } + + private void autoprocess() { + // FIXME + do { + processOnce(); + } while(lowLevelReceive()); + } + + + void delegate() onclose; /// + void delegate() onerror; /// + void delegate(in char[]) ontextmessage; /// + void delegate() onopen; /// /++ +/ - void send(in ubyte[] binaryData) { - WebSocketMessage wss; - wss.fin = true; - wss.opcode = WebSocketOpcode.binary; - wss.data = cast(ubyte[]) binaryData; - wss.send(&llsend); + /// Group: browser_api + void onmessage(void delegate(in char[]) dg) { + ontextmessage = dg; } - void delegate() onclose; /// - void delegate() onerror; /// - void delegate(in char[]) onmessage; /// - void delegate() onopen; /// - /* const int bufferedAmount // amount pending const string extensions @@ -2417,6 +2602,7 @@ wss://echo.websocket.org /* copy/paste from cgi.d */ private { enum WebSocketOpcode : ubyte { + continuation = 0, text = 1, binary = 2, // 3, 4, 5, 6, 7 RESERVED @@ -2426,7 +2612,8 @@ private { // 11,12,13,14,15 RESERVED } - struct WebSocketMessage { + public struct WebSocketMessage { + private bool populated; bool fin; bool rsv1; bool rsv2; @@ -2533,6 +2720,8 @@ private { ubyte b = d[0]; + msg.populated = true; + msg.opcode = cast(WebSocketOpcode) (b & 0x0f); b >>= 4; msg.rsv3 = b & 0x01;