initial websocket client code, wip

This commit is contained in:
Adam D. Ruppe 2020-01-04 21:49:20 -05:00
parent ee3b087f3e
commit b116c0e5a3
1 changed files with 575 additions and 6 deletions

581
http2.d
View File

@ -1658,8 +1658,8 @@ version(use_openssl) {
if(SSL_connect(ssl) == -1) {
ERR_print_errors_fp(core.stdc.stdio.stderr);
int i;
printf("wtf\n");
scanf("%d\n", i);
//printf("wtf\n");
//scanf("%d\n", i);
throw new Exception("ssl connect");
}
}
@ -1671,8 +1671,8 @@ version(use_openssl) {
if(retval == -1) {
ERR_print_errors_fp(core.stdc.stdio.stderr);
int i;
printf("wtf\n");
scanf("%d\n", i);
//printf("wtf\n");
//scanf("%d\n", i);
throw new Exception("ssl send");
}
return retval;
@ -1687,8 +1687,8 @@ version(use_openssl) {
if(retval == -1) {
ERR_print_errors_fp(core.stdc.stdio.stderr);
int i;
printf("wtf\n");
scanf("%d\n", i);
//printf("wtf\n");
//scanf("%d\n", i);
throw new Exception("ssl send");
}
return retval;
@ -2034,3 +2034,572 @@ class FormData {
}
}
private bool bicmp(in ubyte[] item, in char[] search) {
if(item.length != search.length) return false;
foreach(i; 0 .. item.length) {
ubyte a = item[i];
ubyte b = search[i];
if(a >= 'A' && a <= 'Z')
a += 32;
//if(b >= 'A' && b <= 'Z')
//b += 32;
if(a != b)
return false;
}
return true;
}
/++
WebSocket client, based on the browser api.
on receive
on frame
on message
+/
class WebSocket {
private Uri uri;
private string[string] cookies;
private string origin;
private string host;
private ushort port;
private bool ssl;
private int readyState_;
private Socket socket;
private ubyte[] receiveBuffer;
private size_t receiveBufferUsedLength;
private Config config;
enum CONNECTING = 0; /// Socket has been created. The connection is not yet open.
enum OPEN = 1; /// The connection is open and ready to communicate.
enum CLOSING = 2; /// The connection is in the process of closing.
enum CLOSED = 3; /// The connection is closed or couldn't be opened.
/++
+/
static struct Config {
/++
These control the size of the receive buffer.
It starts at the initial size, will temporarily
balloon up to the maximum size, and will reuse
a buffer up to the likely size.
Anything larger than the maximum size will cause
the connection to be aborted and an exception thrown.
This is to protect you against a peer trying to
exhaust your memory, while keeping the user-level
processing simple.
+/
size_t initialReceiveBufferSize = 4096;
size_t likelyReceiveBufferSize = 4096; /// ditto
size_t maximumReceiveBufferSize = 10 * 1024 * 1024; /// ditto
string[string] cookies; /// Cookies to send with the initial request. cookies[name] = value;
string origin; /// Origin URL to send with the handshake, if desired.
string protocol; /// the protocol header, if desired.
}
/++
Returns one of [CONNECTING], [OPEN], [CLOSING], or [CLOSED].
+/
int readyState() {
return readyState_;
}
/++
wss://echo.websocket.org
+/
this(Uri uri, Config config = Config.init)
in (uri.scheme == "ws" || uri.scheme == "wss")
{
this.uri = uri;
this.config = config;
this.receiveBuffer = new ubyte[](config.initialReceiveBufferSize);
host = uri.host;
ssl = uri.scheme == "wss";
port = cast(ushort) (uri.port ? uri.port : ssl ? 443 : 80);
if(ssl) {
version(with_openssl)
socket = new SslClientSocket(AddressFamily.INET, SocketType.STREAM);
else
throw new Exception("SSL not compiled in");
} else
socket = new Socket(AddressFamily.INET, SocketType.STREAM);
}
/++
+/
void connect() {
socket.connect(new InternetAddress(host, port)); // FIXME: ipv6 support...
// FIXME: websocket handshake could and really should be async too.
auto uri = this.uri.path.length ? this.uri.path : "/";
if(this.uri.query.length) {
uri ~= "?";
uri ~= this.uri.query;
}
// the headers really shouldn't be bigger than this, at least
// the chunks i need to process
ubyte[4096] buffer;
size_t pos;
void append(in char[][] items...) {
foreach(what; items) {
buffer[pos .. pos + what.length] = cast(ubyte[]) what[];
pos += what.length;
}
}
append("GET ", uri, " HTTP/1.1\r\n");
append("Host: ", this.uri.host, "\r\n");
append("Upgrade: websocket\r\n");
append("Connection: Upgrade\r\n");
append("Sec-WebSocket-Version: 13\r\n");
// FIXME: randomize this
append("Sec-WebSocket-Key: x3JEHMbDL1EzLkh9GBhXDw==\r\n");
if(config.protocol.length)
append("Sec-WebSocket-Protocol: ", config.protocol, "\r\n");
if(config.origin.length)
append("Origin: ", origin, "\r\n");
append("\r\n");
auto remaining = buffer[0 .. pos];
//import std.stdio; writeln(host, " " , port, " ", cast(string) remaining);
while(remaining.length) {
auto r = socket.send(remaining);
if(r < 0)
throw new Exception(lastSocketError());
if(r == 0)
throw new Exception("unexpected connection termination");
remaining = remaining[r .. $];
}
// the response shouldn't be especially large at this point, just
// headers for the most part. gonna try to get it in the stack buffer.
// then copy stuff after headers, if any, to the frame buffer.
ubyte[] used;
void more() {
auto r = socket.receive(buffer[used.length .. $]);
if(r < 0)
throw new Exception(lastSocketError());
if(r == 0)
throw new Exception("unexpected connection termination");
//import std.stdio;writef("%s", cast(string) buffer[used.length .. used.length + r]);
used = buffer[0 .. used.length + r];
}
more();
import std.algorithm;
if(!used.startsWith(cast(ubyte[]) "HTTP/1.1 101"))
throw new Exception("didn't get a websocket answer");
// skip the status line
while(used.length && used[0] != '\n')
used = used[1 .. $];
if(used.length == 0)
throw new Exception("wtf");
if(used.length < 1)
more();
used = used[1 .. $]; // skip the \n
if(used.length == 0)
more();
// checks on the protocol from ehaders
bool isWebsocket;
bool isUpgrade;
const(ubyte)[] protocol;
const(ubyte)[] accept;
while(used.length) {
if(used.length >= 2 && used[0] == '\r' && used[1] == '\n') {
used = used[2 .. $];
break; // all done
}
int idxColon;
while(idxColon < used.length && used[idxColon] != ':')
idxColon++;
if(idxColon == used.length)
more();
auto idxStart = idxColon + 1;
while(idxStart < used.length && used[idxStart] == ' ')
idxStart++;
if(idxStart == used.length)
more();
auto idxEnd = idxStart;
while(idxEnd < used.length && used[idxEnd] != '\r')
idxEnd++;
if(idxEnd == used.length)
more();
auto headerName = used[0 .. idxColon];
auto headerValue = used[idxStart .. idxEnd];
// move past this header
used = used[idxEnd .. $];
// and the \r\n
if(2 <= used.length)
used = used[2 .. $];
if(headerName.bicmp("upgrade")) {
if(headerValue.bicmp("websocket"))
isWebsocket = true;
} else if(headerName.bicmp("connection")) {
if(headerValue.bicmp("upgrade"))
isUpgrade = true;
} else if(headerName.bicmp("sec-websocket-accept")) {
accept = headerValue;
} else if(headerName.bicmp("sec-websocket-protocol")) {
protocol = headerValue;
}
if(!used.length) {
more();
}
}
if(!isWebsocket)
throw new Exception("didn't answer as websocket");
if(!isUpgrade)
throw new Exception("didn't answer as upgrade");
// FIXME: check protocol if config requested one
// FIXME: check accept
receiveBuffer[0 .. used.length] = used[];
receiveBufferUsedLength = used.length;
readyState_ = OPEN;
if(onopen)
onopen();
}
/++
+/
void close(int code = 0, string reason = null)
in (reason.length < 123)
{
WebSocketMessage wss;
wss.fin = true;
wss.opcode = WebSocketOpcode.close;
wss.data = cast(ubyte[]) reason;
wss.send(&llsend);
readyState_ = CLOSING;
socket.shutdown(SocketShutdown.SEND);
}
/++
+/
void ping() {
WebSocketMessage wss;
wss.fin = true;
wss.opcode = WebSocketOpcode.ping;
wss.send(&llsend);
}
// automatically handled....
void pong() {
WebSocketMessage wss;
wss.fin = true;
wss.opcode = WebSocketOpcode.pong;
wss.send(&llsend);
}
/++
+/
void send(in char[] textData) {
WebSocketMessage wss;
wss.fin = true;
wss.opcode = WebSocketOpcode.text;
wss.data = cast(ubyte[]) textData;
wss.send(&llsend);
}
private void llsend(ubyte[] d) {
while(d.length) {
auto r = socket.send(d);
if(r <= 0) throw new Exception("wtf");
d = d[r .. $];
}
}
/*private*/ bool llreceive() {
auto r = socket.receive(receiveBuffer[receiveBufferUsedLength .. $]);
if(r == 0)
return false;
if(r <= 0)
throw new Exception("wtf");
receiveBufferUsedLength += r;
return true;
}
public void autoprocess() {
do {
ubyte[] d = receiveBuffer[0 .. receiveBufferUsedLength];
auto s = d;
while(d.length) {
auto orig = d;
auto m = WebSocketMessage.read(d);
if(d is orig)
break;
if(m.opcode == WebSocketOpcode.close) {
readyState_ = CLOSED;
if(onclose)
onclose();
} else if(m.opcode == WebSocketOpcode.text) {
if(onmessage)
onmessage(m.textData);
} else {
// FIXME
}
}
receiveBufferUsedLength -= s.length - d.length;
} while(llreceive());
}
/++
+/
void send(in ubyte[] binaryData) {
WebSocketMessage wss;
wss.fin = true;
wss.opcode = WebSocketOpcode.binary;
wss.data = cast(ubyte[]) binaryData;
wss.send(&llsend);
}
void delegate() onclose; ///
void delegate() onerror; ///
void delegate(in char[]) onmessage; ///
void delegate() onopen; ///
/*
const int bufferedAmount // amount pending
const string extensions
const string protocol
const string url
*/
}
/* copy/paste from cgi.d */
private {
enum WebSocketOpcode : ubyte {
text = 1,
binary = 2,
// 3, 4, 5, 6, 7 RESERVED
close = 8,
ping = 9,
pong = 10,
// 11,12,13,14,15 RESERVED
}
struct WebSocketMessage {
bool fin;
bool rsv1;
bool rsv2;
bool rsv3;
WebSocketOpcode opcode; // 4 bits
bool masked;
ubyte lengthIndicator; // don't set this when building one to send
ulong realLength; // don't use when sending
ubyte[4] maskingKey; // don't set this when sending
ubyte[] data;
static WebSocketMessage simpleMessage(WebSocketOpcode opcode, void[] data) {
WebSocketMessage msg;
msg.fin = true;
msg.opcode = opcode;
msg.data = cast(ubyte[]) data;
return msg;
}
private void send(scope void delegate(ubyte[]) llsend) {
ubyte[64] headerScratch;
int headerScratchPos = 0;
realLength = data.length;
{
ubyte b1;
b1 |= cast(ubyte) opcode;
b1 |= rsv3 ? (1 << 4) : 0;
b1 |= rsv2 ? (1 << 5) : 0;
b1 |= rsv1 ? (1 << 6) : 0;
b1 |= fin ? (1 << 7) : 0;
headerScratch[0] = b1;
headerScratchPos++;
}
{
headerScratchPos++; // we'll set header[1] at the end of this
auto rlc = realLength;
ubyte b2;
b2 |= masked ? (1 << 7) : 0;
assert(headerScratchPos == 2);
if(realLength > 65535) {
// use 64 bit length
b2 |= 0x7f;
// FIXME: double check endinaness
foreach(i; 0 .. 8) {
headerScratch[2 + 7 - i] = rlc & 0x0ff;
rlc >>>= 8;
}
headerScratchPos += 8;
} else if(realLength > 127) {
// use 16 bit length
b2 |= 0x7e;
// FIXME: double check endinaness
foreach(i; 0 .. 2) {
headerScratch[2 + 1 - i] = rlc & 0x0ff;
rlc >>>= 8;
}
headerScratchPos += 2;
} else {
// use 7 bit length
b2 |= realLength & 0b_0111_1111;
}
headerScratch[1] = b2;
}
//assert(!masked, "masking key not properly implemented");
if(masked) {
// FIXME: randomize this
headerScratch[headerScratchPos .. headerScratchPos + 4] = maskingKey[];
headerScratchPos += 4;
// we'll just mask it in place...
int keyIdx = 0;
foreach(i; 0 .. data.length) {
data[i] = data[i] ^ maskingKey[keyIdx];
if(keyIdx == 3)
keyIdx = 0;
else
keyIdx++;
}
}
//writeln("SENDING ", headerScratch[0 .. headerScratchPos], data);
llsend(headerScratch[0 .. headerScratchPos]);
llsend(data);
}
static WebSocketMessage read(ref ubyte[] d) {
WebSocketMessage msg;
assert(d.length >= 2);
auto orig = d;
ubyte b = d[0];
msg.opcode = cast(WebSocketOpcode) (b & 0x0f);
b >>= 4;
msg.rsv3 = b & 0x01;
b >>= 1;
msg.rsv2 = b & 0x01;
b >>= 1;
msg.rsv1 = b & 0x01;
b >>= 1;
msg.fin = b & 0x01;
b = d[1];
msg.masked = (b & 0b1000_0000) ? true : false;
msg.lengthIndicator = b & 0b0111_1111;
d = d[2 .. $];
if(msg.lengthIndicator == 0x7e) {
// 16 bit length
msg.realLength = 0;
foreach(i; 0 .. 2) {
msg.realLength |= d[0] << ((1-i) * 8);
d = d[1 .. $];
}
} else if(msg.lengthIndicator == 0x7f) {
// 64 bit length
msg.realLength = 0;
foreach(i; 0 .. 8) {
msg.realLength |= d[0] << ((7-i) * 8);
d = d[1 .. $];
}
} else {
// 7 bit length
msg.realLength = msg.lengthIndicator;
}
if(msg.masked) {
msg.maskingKey = d[0 .. 4];
d = d[4 .. $];
}
if(msg.realLength > d.length) {
d = orig;
return WebSocketMessage.init;
}
msg.data = d[0 .. msg.realLength];
d = d[msg.realLength .. $];
if(msg.masked) {
// let's just unmask it now
int keyIdx = 0;
foreach(i; 0 .. msg.data.length) {
msg.data[i] = msg.data[i] ^ msg.maskingKey[keyIdx];
if(keyIdx == 3)
keyIdx = 0;
else
keyIdx++;
}
}
return msg;
}
char[] textData() {
return cast(char[]) data;
}
}
}