diff --git a/cgi.d b/cgi.d index 697049f..d653110 100644 --- a/cgi.d +++ b/cgi.d @@ -1586,6 +1586,7 @@ class Cgi { immutable(ubyte)[] data; void rdo(const(ubyte)[] d) { + //import std.stdio; writeln(d); sendAll(ir.source, d); } @@ -3370,6 +3371,8 @@ void cgiMainImpl(alias fun, CustomCgi = Cgi, long maxContentLength = defaultMaxC try { fun(cgi); cgi.close(); + if(cgi.websocketMode) + closeConnection = true; } catch(ConnectionException ce) { closeConnection = true; } catch(Throwable t) { @@ -3665,6 +3668,8 @@ void doThreadHttpConnection(CustomCgi, alias fun)(Socket connection) { try { fun(cgi); cgi.close(); + if(cgi.websocketMode) + closeConnection = true; } catch(ConnectionException ce) { // broken pipe or something, just abort the connection closeConnection = true; @@ -3947,6 +3952,7 @@ class BufferedInputRange { // we might have to grow the buffer if(minBytesToSettleFor > underlyingBuffer.length || view.length == underlyingBuffer.length) { if(allowGrowth) { + import std.stdio; writeln("growth"); auto viewStart = view.ptr - underlyingBuffer.ptr; size_t growth = 4096; // make sure we have enough for what we're being asked for @@ -3959,7 +3965,7 @@ class BufferedInputRange { } do { - auto freeSpace = underlyingBuffer[underlyingBuffer.ptr - view.ptr + view.length .. $]; + auto freeSpace = underlyingBuffer[view.ptr - underlyingBuffer.ptr + view.length .. $]; try_again: auto ret = source.receive(freeSpace); if(ret == Socket.ERROR) { @@ -3981,6 +3987,7 @@ class BufferedInputRange { return; } + //import std.stdio; writeln(view.ptr); writeln(underlyingBuffer.ptr); writeln(view.length, " ", ret, " = ", view.length + ret); view = underlyingBuffer[view.ptr - underlyingBuffer.ptr .. view.length + ret]; } while(view.length < minBytesToSettleFor); } @@ -3992,6 +3999,7 @@ class BufferedInputRange { /// You do not need to call this if you always want to wait for more data when you /// consume some. ubyte[] consume(size_t bytes) { + //import std.stdio; writeln("consuime ", bytes, "/", view.length); view = view[bytes > $ ? $ : bytes .. $]; if(view.length == 0) { view = underlyingBuffer[0 .. 0]; // go ahead and reuse the beginning @@ -4639,18 +4647,13 @@ version(cgi_with_websocket) { // note: this blocks WebSocketMessage recv() { // FIXME: should we automatically handle pings and pongs? - assert(!cgi.idlol.empty()); + if(cgi.idlol.empty()) + throw new Exception("remote side disconnected"); cgi.idlol.popFront(0); WebSocketMessage message; - auto info = cgi.idlol.front(); - - // FIXME: read should prolly take the whole range so it can request more if needed - // read should also go ahead and consume the range - message = WebSocketMessage.read(info); - - cgi.idlol.consume(info.length); + message = WebSocketMessage.read(cgi.idlol); return message; } @@ -4700,7 +4703,7 @@ version(cgi_with_websocket) { WebSocket acceptWebsocket(Cgi cgi) { assert(!cgi.closed); assert(!cgi.outputtedResponseData); - cgi.setResponseStatus("101 Web Socket Protocol Handshake"); + cgi.setResponseStatus("101 Switching Protocols"); cgi.header("Upgrade: WebSocket"); cgi.header("Connection: upgrade"); @@ -4834,7 +4837,15 @@ version(cgi_with_websocket) { cgi.flush(); } - static WebSocketMessage read(ubyte[] d) { + static WebSocketMessage read(BufferedInputRange ir) { + + auto d = ir.front(); + while(d.length < 2) { + ir.popFront(); + d = ir.front(); + } + auto start = d; + WebSocketMessage msg; assert(d.length >= 2); @@ -4882,7 +4893,11 @@ version(cgi_with_websocket) { d = d[4 .. $]; } - msg.data = d[0 .. $]; + //if(d.length < msg.realLength) { + + //} + msg.data = d[0 .. msg.realLength]; + d = d[msg.realLength .. $]; if(msg.masked) { // let's just unmask it now @@ -4896,6 +4911,8 @@ version(cgi_with_websocket) { } } + ir.consume(start.length - d.length); + return msg; }