From b116c0e5a39d1a119015ff6fcd1a2fa80bb327c3 Mon Sep 17 00:00:00 2001 From: "Adam D. Ruppe" Date: Sat, 4 Jan 2020 21:49:20 -0500 Subject: [PATCH] initial websocket client code, wip --- http2.d | 581 +++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 575 insertions(+), 6 deletions(-) diff --git a/http2.d b/http2.d index e141d1a..64120d5 100644 --- a/http2.d +++ b/http2.d @@ -1658,8 +1658,8 @@ version(use_openssl) { if(SSL_connect(ssl) == -1) { ERR_print_errors_fp(core.stdc.stdio.stderr); int i; - printf("wtf\n"); - scanf("%d\n", i); + //printf("wtf\n"); + //scanf("%d\n", i); throw new Exception("ssl connect"); } } @@ -1671,8 +1671,8 @@ version(use_openssl) { if(retval == -1) { ERR_print_errors_fp(core.stdc.stdio.stderr); int i; - printf("wtf\n"); - scanf("%d\n", i); + //printf("wtf\n"); + //scanf("%d\n", i); throw new Exception("ssl send"); } return retval; @@ -1687,8 +1687,8 @@ version(use_openssl) { if(retval == -1) { ERR_print_errors_fp(core.stdc.stdio.stderr); int i; - printf("wtf\n"); - scanf("%d\n", i); + //printf("wtf\n"); + //scanf("%d\n", i); throw new Exception("ssl send"); } return retval; @@ -2034,3 +2034,572 @@ class FormData { } } +private bool bicmp(in ubyte[] item, in char[] search) { + if(item.length != search.length) return false; + + foreach(i; 0 .. item.length) { + ubyte a = item[i]; + ubyte b = search[i]; + if(a >= 'A' && a <= 'Z') + a += 32; + //if(b >= 'A' && b <= 'Z') + //b += 32; + if(a != b) + return false; + } + + return true; +} + +/++ + WebSocket client, based on the browser api. + + on receive + on frame + on message ++/ +class WebSocket { + private Uri uri; + private string[string] cookies; + private string origin; + + private string host; + private ushort port; + private bool ssl; + + private int readyState_; + + private Socket socket; + private ubyte[] receiveBuffer; + private size_t receiveBufferUsedLength; + + private Config config; + + enum CONNECTING = 0; /// Socket has been created. The connection is not yet open. + enum OPEN = 1; /// The connection is open and ready to communicate. + enum CLOSING = 2; /// The connection is in the process of closing. + enum CLOSED = 3; /// The connection is closed or couldn't be opened. + + /++ + + +/ + static struct Config { + /++ + These control the size of the receive buffer. + + It starts at the initial size, will temporarily + balloon up to the maximum size, and will reuse + a buffer up to the likely size. + + Anything larger than the maximum size will cause + the connection to be aborted and an exception thrown. + This is to protect you against a peer trying to + exhaust your memory, while keeping the user-level + processing simple. + +/ + size_t initialReceiveBufferSize = 4096; + size_t likelyReceiveBufferSize = 4096; /// ditto + size_t maximumReceiveBufferSize = 10 * 1024 * 1024; /// ditto + + string[string] cookies; /// Cookies to send with the initial request. cookies[name] = value; + string origin; /// Origin URL to send with the handshake, if desired. + string protocol; /// the protocol header, if desired. + } + + /++ + Returns one of [CONNECTING], [OPEN], [CLOSING], or [CLOSED]. + +/ + int readyState() { + return readyState_; + } + + /++ +wss://echo.websocket.org + +/ + this(Uri uri, Config config = Config.init) + in (uri.scheme == "ws" || uri.scheme == "wss") + { + this.uri = uri; + this.config = config; + + this.receiveBuffer = new ubyte[](config.initialReceiveBufferSize); + + host = uri.host; + ssl = uri.scheme == "wss"; + port = cast(ushort) (uri.port ? uri.port : ssl ? 443 : 80); + + if(ssl) { + version(with_openssl) + socket = new SslClientSocket(AddressFamily.INET, SocketType.STREAM); + else + throw new Exception("SSL not compiled in"); + } else + socket = new Socket(AddressFamily.INET, SocketType.STREAM); + + } + + /++ + + +/ + void connect() { + socket.connect(new InternetAddress(host, port)); // FIXME: ipv6 support... + // FIXME: websocket handshake could and really should be async too. + + auto uri = this.uri.path.length ? this.uri.path : "/"; + if(this.uri.query.length) { + uri ~= "?"; + uri ~= this.uri.query; + } + + // the headers really shouldn't be bigger than this, at least + // the chunks i need to process + ubyte[4096] buffer; + size_t pos; + + void append(in char[][] items...) { + foreach(what; items) { + buffer[pos .. pos + what.length] = cast(ubyte[]) what[]; + pos += what.length; + } + } + + append("GET ", uri, " HTTP/1.1\r\n"); + append("Host: ", this.uri.host, "\r\n"); + + append("Upgrade: websocket\r\n"); + append("Connection: Upgrade\r\n"); + append("Sec-WebSocket-Version: 13\r\n"); + + // FIXME: randomize this + append("Sec-WebSocket-Key: x3JEHMbDL1EzLkh9GBhXDw==\r\n"); + + if(config.protocol.length) + append("Sec-WebSocket-Protocol: ", config.protocol, "\r\n"); + if(config.origin.length) + append("Origin: ", origin, "\r\n"); + + append("\r\n"); + + auto remaining = buffer[0 .. pos]; + //import std.stdio; writeln(host, " " , port, " ", cast(string) remaining); + while(remaining.length) { + auto r = socket.send(remaining); + if(r < 0) + throw new Exception(lastSocketError()); + if(r == 0) + throw new Exception("unexpected connection termination"); + remaining = remaining[r .. $]; + } + + // the response shouldn't be especially large at this point, just + // headers for the most part. gonna try to get it in the stack buffer. + // then copy stuff after headers, if any, to the frame buffer. + ubyte[] used; + + void more() { + auto r = socket.receive(buffer[used.length .. $]); + + if(r < 0) + throw new Exception(lastSocketError()); + if(r == 0) + throw new Exception("unexpected connection termination"); + //import std.stdio;writef("%s", cast(string) buffer[used.length .. used.length + r]); + + used = buffer[0 .. used.length + r]; + } + + more(); + + import std.algorithm; + if(!used.startsWith(cast(ubyte[]) "HTTP/1.1 101")) + throw new Exception("didn't get a websocket answer"); + // skip the status line + while(used.length && used[0] != '\n') + used = used[1 .. $]; + + if(used.length == 0) + throw new Exception("wtf"); + + if(used.length < 1) + more(); + + used = used[1 .. $]; // skip the \n + + if(used.length == 0) + more(); + + // checks on the protocol from ehaders + bool isWebsocket; + bool isUpgrade; + const(ubyte)[] protocol; + const(ubyte)[] accept; + + while(used.length) { + if(used.length >= 2 && used[0] == '\r' && used[1] == '\n') { + used = used[2 .. $]; + break; // all done + } + int idxColon; + while(idxColon < used.length && used[idxColon] != ':') + idxColon++; + if(idxColon == used.length) + more(); + auto idxStart = idxColon + 1; + while(idxStart < used.length && used[idxStart] == ' ') + idxStart++; + if(idxStart == used.length) + more(); + auto idxEnd = idxStart; + while(idxEnd < used.length && used[idxEnd] != '\r') + idxEnd++; + if(idxEnd == used.length) + more(); + + auto headerName = used[0 .. idxColon]; + auto headerValue = used[idxStart .. idxEnd]; + + // move past this header + used = used[idxEnd .. $]; + // and the \r\n + if(2 <= used.length) + used = used[2 .. $]; + + if(headerName.bicmp("upgrade")) { + if(headerValue.bicmp("websocket")) + isWebsocket = true; + } else if(headerName.bicmp("connection")) { + if(headerValue.bicmp("upgrade")) + isUpgrade = true; + } else if(headerName.bicmp("sec-websocket-accept")) { + accept = headerValue; + } else if(headerName.bicmp("sec-websocket-protocol")) { + protocol = headerValue; + } + + if(!used.length) { + more(); + } + } + + + if(!isWebsocket) + throw new Exception("didn't answer as websocket"); + if(!isUpgrade) + throw new Exception("didn't answer as upgrade"); + + + // FIXME: check protocol if config requested one + // FIXME: check accept + + receiveBuffer[0 .. used.length] = used[]; + receiveBufferUsedLength = used.length; + + readyState_ = OPEN; + + if(onopen) + onopen(); + } + + /++ + + +/ + void close(int code = 0, string reason = null) + in (reason.length < 123) + { + WebSocketMessage wss; + wss.fin = true; + wss.opcode = WebSocketOpcode.close; + wss.data = cast(ubyte[]) reason; + wss.send(&llsend); + + readyState_ = CLOSING; + + socket.shutdown(SocketShutdown.SEND); + } + + /++ + + +/ + void ping() { + WebSocketMessage wss; + wss.fin = true; + wss.opcode = WebSocketOpcode.ping; + wss.send(&llsend); + } + + // automatically handled.... + void pong() { + WebSocketMessage wss; + wss.fin = true; + wss.opcode = WebSocketOpcode.pong; + wss.send(&llsend); + } + + /++ + + +/ + void send(in char[] textData) { + WebSocketMessage wss; + wss.fin = true; + wss.opcode = WebSocketOpcode.text; + wss.data = cast(ubyte[]) textData; + wss.send(&llsend); + } + + private void llsend(ubyte[] d) { + while(d.length) { + auto r = socket.send(d); + if(r <= 0) throw new Exception("wtf"); + d = d[r .. $]; + } + } + + /*private*/ bool llreceive() { + auto r = socket.receive(receiveBuffer[receiveBufferUsedLength .. $]); + if(r == 0) + return false; + if(r <= 0) + throw new Exception("wtf"); + receiveBufferUsedLength += r; + return true; + } + + public void autoprocess() { + do { + ubyte[] d = receiveBuffer[0 .. receiveBufferUsedLength]; + auto s = d; + while(d.length) { + auto orig = d; + auto m = WebSocketMessage.read(d); + if(d is orig) + break; + if(m.opcode == WebSocketOpcode.close) { + readyState_ = CLOSED; + if(onclose) + onclose(); + } else if(m.opcode == WebSocketOpcode.text) { + if(onmessage) + onmessage(m.textData); + } else { + // FIXME + } + } + receiveBufferUsedLength -= s.length - d.length; + } while(llreceive()); + } + + + /++ + + +/ + void send(in ubyte[] binaryData) { + WebSocketMessage wss; + wss.fin = true; + wss.opcode = WebSocketOpcode.binary; + wss.data = cast(ubyte[]) binaryData; + wss.send(&llsend); + } + + void delegate() onclose; /// + void delegate() onerror; /// + void delegate(in char[]) onmessage; /// + void delegate() onopen; /// + + /* + const int bufferedAmount // amount pending + const string extensions + + const string protocol + const string url + */ +} + +/* copy/paste from cgi.d */ +private { + enum WebSocketOpcode : ubyte { + text = 1, + binary = 2, + // 3, 4, 5, 6, 7 RESERVED + close = 8, + ping = 9, + pong = 10, + // 11,12,13,14,15 RESERVED + } + + struct WebSocketMessage { + bool fin; + bool rsv1; + bool rsv2; + bool rsv3; + WebSocketOpcode opcode; // 4 bits + bool masked; + ubyte lengthIndicator; // don't set this when building one to send + ulong realLength; // don't use when sending + ubyte[4] maskingKey; // don't set this when sending + ubyte[] data; + + static WebSocketMessage simpleMessage(WebSocketOpcode opcode, void[] data) { + WebSocketMessage msg; + msg.fin = true; + msg.opcode = opcode; + msg.data = cast(ubyte[]) data; + + return msg; + } + + private void send(scope void delegate(ubyte[]) llsend) { + ubyte[64] headerScratch; + int headerScratchPos = 0; + + realLength = data.length; + + { + ubyte b1; + b1 |= cast(ubyte) opcode; + b1 |= rsv3 ? (1 << 4) : 0; + b1 |= rsv2 ? (1 << 5) : 0; + b1 |= rsv1 ? (1 << 6) : 0; + b1 |= fin ? (1 << 7) : 0; + + headerScratch[0] = b1; + headerScratchPos++; + } + + { + headerScratchPos++; // we'll set header[1] at the end of this + auto rlc = realLength; + ubyte b2; + b2 |= masked ? (1 << 7) : 0; + + assert(headerScratchPos == 2); + + if(realLength > 65535) { + // use 64 bit length + b2 |= 0x7f; + + // FIXME: double check endinaness + foreach(i; 0 .. 8) { + headerScratch[2 + 7 - i] = rlc & 0x0ff; + rlc >>>= 8; + } + + headerScratchPos += 8; + } else if(realLength > 127) { + // use 16 bit length + b2 |= 0x7e; + + // FIXME: double check endinaness + foreach(i; 0 .. 2) { + headerScratch[2 + 1 - i] = rlc & 0x0ff; + rlc >>>= 8; + } + + headerScratchPos += 2; + } else { + // use 7 bit length + b2 |= realLength & 0b_0111_1111; + } + + headerScratch[1] = b2; + } + + //assert(!masked, "masking key not properly implemented"); + if(masked) { + // FIXME: randomize this + headerScratch[headerScratchPos .. headerScratchPos + 4] = maskingKey[]; + headerScratchPos += 4; + + // we'll just mask it in place... + int keyIdx = 0; + foreach(i; 0 .. data.length) { + data[i] = data[i] ^ maskingKey[keyIdx]; + if(keyIdx == 3) + keyIdx = 0; + else + keyIdx++; + } + } + + //writeln("SENDING ", headerScratch[0 .. headerScratchPos], data); + llsend(headerScratch[0 .. headerScratchPos]); + llsend(data); + } + + static WebSocketMessage read(ref ubyte[] d) { + WebSocketMessage msg; + assert(d.length >= 2); + + auto orig = d; + + ubyte b = d[0]; + + msg.opcode = cast(WebSocketOpcode) (b & 0x0f); + b >>= 4; + msg.rsv3 = b & 0x01; + b >>= 1; + msg.rsv2 = b & 0x01; + b >>= 1; + msg.rsv1 = b & 0x01; + b >>= 1; + msg.fin = b & 0x01; + + b = d[1]; + msg.masked = (b & 0b1000_0000) ? true : false; + msg.lengthIndicator = b & 0b0111_1111; + + d = d[2 .. $]; + + if(msg.lengthIndicator == 0x7e) { + // 16 bit length + msg.realLength = 0; + + foreach(i; 0 .. 2) { + msg.realLength |= d[0] << ((1-i) * 8); + d = d[1 .. $]; + } + } else if(msg.lengthIndicator == 0x7f) { + // 64 bit length + msg.realLength = 0; + + foreach(i; 0 .. 8) { + msg.realLength |= d[0] << ((7-i) * 8); + d = d[1 .. $]; + } + } else { + // 7 bit length + msg.realLength = msg.lengthIndicator; + } + + if(msg.masked) { + msg.maskingKey = d[0 .. 4]; + d = d[4 .. $]; + } + + if(msg.realLength > d.length) { + d = orig; + return WebSocketMessage.init; + } + + msg.data = d[0 .. msg.realLength]; + d = d[msg.realLength .. $]; + + if(msg.masked) { + // let's just unmask it now + int keyIdx = 0; + foreach(i; 0 .. msg.data.length) { + msg.data[i] = msg.data[i] ^ msg.maskingKey[keyIdx]; + if(keyIdx == 3) + keyIdx = 0; + else + keyIdx++; + } + } + + return msg; + } + + char[] textData() { + return cast(char[]) data; + } + } +}