the old websocket server was pure trash. and even the core one was iffy in certain cases

This commit is contained in:
Adam D. Ruppe 2020-01-04 21:59:18 -05:00
parent b116c0e5a3
commit a0236624f2
1 changed files with 29 additions and 12 deletions

41
cgi.d
View File

@ -1586,6 +1586,7 @@ class Cgi {
immutable(ubyte)[] data; immutable(ubyte)[] data;
void rdo(const(ubyte)[] d) { void rdo(const(ubyte)[] d) {
//import std.stdio; writeln(d);
sendAll(ir.source, d); sendAll(ir.source, d);
} }
@ -3370,6 +3371,8 @@ void cgiMainImpl(alias fun, CustomCgi = Cgi, long maxContentLength = defaultMaxC
try { try {
fun(cgi); fun(cgi);
cgi.close(); cgi.close();
if(cgi.websocketMode)
closeConnection = true;
} catch(ConnectionException ce) { } catch(ConnectionException ce) {
closeConnection = true; closeConnection = true;
} catch(Throwable t) { } catch(Throwable t) {
@ -3665,6 +3668,8 @@ void doThreadHttpConnection(CustomCgi, alias fun)(Socket connection) {
try { try {
fun(cgi); fun(cgi);
cgi.close(); cgi.close();
if(cgi.websocketMode)
closeConnection = true;
} catch(ConnectionException ce) { } catch(ConnectionException ce) {
// broken pipe or something, just abort the connection // broken pipe or something, just abort the connection
closeConnection = true; closeConnection = true;
@ -3947,6 +3952,7 @@ class BufferedInputRange {
// we might have to grow the buffer // we might have to grow the buffer
if(minBytesToSettleFor > underlyingBuffer.length || view.length == underlyingBuffer.length) { if(minBytesToSettleFor > underlyingBuffer.length || view.length == underlyingBuffer.length) {
if(allowGrowth) { if(allowGrowth) {
import std.stdio; writeln("growth");
auto viewStart = view.ptr - underlyingBuffer.ptr; auto viewStart = view.ptr - underlyingBuffer.ptr;
size_t growth = 4096; size_t growth = 4096;
// make sure we have enough for what we're being asked for // make sure we have enough for what we're being asked for
@ -3959,7 +3965,7 @@ class BufferedInputRange {
} }
do { do {
auto freeSpace = underlyingBuffer[underlyingBuffer.ptr - view.ptr + view.length .. $]; auto freeSpace = underlyingBuffer[view.ptr - underlyingBuffer.ptr + view.length .. $];
try_again: try_again:
auto ret = source.receive(freeSpace); auto ret = source.receive(freeSpace);
if(ret == Socket.ERROR) { if(ret == Socket.ERROR) {
@ -3981,6 +3987,7 @@ class BufferedInputRange {
return; return;
} }
//import std.stdio; writeln(view.ptr); writeln(underlyingBuffer.ptr); writeln(view.length, " ", ret, " = ", view.length + ret);
view = underlyingBuffer[view.ptr - underlyingBuffer.ptr .. view.length + ret]; view = underlyingBuffer[view.ptr - underlyingBuffer.ptr .. view.length + ret];
} while(view.length < minBytesToSettleFor); } while(view.length < minBytesToSettleFor);
} }
@ -3992,6 +3999,7 @@ class BufferedInputRange {
/// You do not need to call this if you always want to wait for more data when you /// You do not need to call this if you always want to wait for more data when you
/// consume some. /// consume some.
ubyte[] consume(size_t bytes) { ubyte[] consume(size_t bytes) {
//import std.stdio; writeln("consuime ", bytes, "/", view.length);
view = view[bytes > $ ? $ : bytes .. $]; view = view[bytes > $ ? $ : bytes .. $];
if(view.length == 0) { if(view.length == 0) {
view = underlyingBuffer[0 .. 0]; // go ahead and reuse the beginning view = underlyingBuffer[0 .. 0]; // go ahead and reuse the beginning
@ -4639,18 +4647,13 @@ version(cgi_with_websocket) {
// note: this blocks // note: this blocks
WebSocketMessage recv() { WebSocketMessage recv() {
// FIXME: should we automatically handle pings and pongs? // FIXME: should we automatically handle pings and pongs?
assert(!cgi.idlol.empty()); if(cgi.idlol.empty())
throw new Exception("remote side disconnected");
cgi.idlol.popFront(0); cgi.idlol.popFront(0);
WebSocketMessage message; WebSocketMessage message;
auto info = cgi.idlol.front(); message = WebSocketMessage.read(cgi.idlol);
// FIXME: read should prolly take the whole range so it can request more if needed
// read should also go ahead and consume the range
message = WebSocketMessage.read(info);
cgi.idlol.consume(info.length);
return message; return message;
} }
@ -4700,7 +4703,7 @@ version(cgi_with_websocket) {
WebSocket acceptWebsocket(Cgi cgi) { WebSocket acceptWebsocket(Cgi cgi) {
assert(!cgi.closed); assert(!cgi.closed);
assert(!cgi.outputtedResponseData); assert(!cgi.outputtedResponseData);
cgi.setResponseStatus("101 Web Socket Protocol Handshake"); cgi.setResponseStatus("101 Switching Protocols");
cgi.header("Upgrade: WebSocket"); cgi.header("Upgrade: WebSocket");
cgi.header("Connection: upgrade"); cgi.header("Connection: upgrade");
@ -4834,7 +4837,15 @@ version(cgi_with_websocket) {
cgi.flush(); cgi.flush();
} }
static WebSocketMessage read(ubyte[] d) { static WebSocketMessage read(BufferedInputRange ir) {
auto d = ir.front();
while(d.length < 2) {
ir.popFront();
d = ir.front();
}
auto start = d;
WebSocketMessage msg; WebSocketMessage msg;
assert(d.length >= 2); assert(d.length >= 2);
@ -4882,7 +4893,11 @@ version(cgi_with_websocket) {
d = d[4 .. $]; d = d[4 .. $];
} }
msg.data = d[0 .. $]; //if(d.length < msg.realLength) {
//}
msg.data = d[0 .. msg.realLength];
d = d[msg.realLength .. $];
if(msg.masked) { if(msg.masked) {
// let's just unmask it now // let's just unmask it now
@ -4896,6 +4911,8 @@ version(cgi_with_websocket) {
} }
} }
ir.consume(start.length - d.length);
return msg; return msg;
} }