more websocket code sharing

This commit is contained in:
Adam D. Ruppe 2020-03-23 13:15:47 -04:00
parent 4a08ece9e4
commit a54dc55d22
2 changed files with 486 additions and 176 deletions

378
cgi.d
View File

@ -4664,6 +4664,37 @@ version(cgi_with_websocket) {
// returns true if data available, false if it timed out
bool recvAvailable(Duration timeout = dur!"msecs"(0)) {
if(!waitForNextMessageWouldBlock())
return true;
if(isDataPending(timeout))
return true; // this is kinda a lie.
return false;
}
public bool lowLevelReceive() {
auto bfr = cgi.idlol;
top:
auto got = bfr.front;
if(got.length) {
if(receiveBuffer.length < receiveBufferUsedLength + got.length)
receiveBuffer.length += receiveBufferUsedLength + got.length;
receiveBuffer[receiveBufferUsedLength .. receiveBufferUsedLength + got.length] = got[];
receiveBufferUsedLength += got.length;
bfr.consume(got.length);
return true;
}
bfr.popFront(0);
if(bfr.sourceClosed)
return false;
goto top;
}
bool isDataPending(Duration timeout = 0.seconds) {
Socket socket = cgi.idlol.source;
auto check = new SocketSet();
@ -4676,47 +4707,297 @@ version(cgi_with_websocket) {
}
// note: this blocks
WebSocketMessage recv() {
// FIXME: should we automatically handle pings and pongs?
if(cgi.idlol.empty())
throw new Exception("remote side disconnected");
cgi.idlol.popFront(0);
WebSocketMessage message;
message = WebSocketMessage.read(cgi.idlol);
return message;
WebSocketFrame recv() {
return waitForNextMessage();
}
void send(in char[] text) {
// I cast away const here because I know this msg is private and it doesn't write
// to that buffer unless masking is set... which it isn't, so we're ok.
auto msg = WebSocketMessage.simpleMessage(WebSocketOpcode.text, cast(void[]) text);
msg.send(cgi);
private void llclose() {
cgi.close();
}
void send(in ubyte[] binary) {
// I cast away const here because I know this msg is private and it doesn't write
// to that buffer unless masking is set... which it isn't, so we're ok.
auto msg = WebSocketMessage.simpleMessage(WebSocketOpcode.binary, cast(void[]) binary);
msg.send(cgi);
private void llsend(ubyte[] data) {
cgi.write(data);
cgi.flush();
}
void close() {
auto msg = WebSocketMessage.simpleMessage(WebSocketOpcode.close, null);
msg.send(cgi);
void unregisterActiveSocket(WebSocket) {}
/* copy/paste section { */
private int readyState_;
private ubyte[] receiveBuffer;
private size_t receiveBufferUsedLength;
private Config config;
enum CONNECTING = 0; /// Socket has been created. The connection is not yet open.
enum OPEN = 1; /// The connection is open and ready to communicate.
enum CLOSING = 2; /// The connection is in the process of closing.
enum CLOSED = 3; /// The connection is closed or couldn't be opened.
/++
+/
/// Group: foundational
static struct Config {
/++
These control the size of the receive buffer.
It starts at the initial size, will temporarily
balloon up to the maximum size, and will reuse
a buffer up to the likely size.
Anything larger than the maximum size will cause
the connection to be aborted and an exception thrown.
This is to protect you against a peer trying to
exhaust your memory, while keeping the user-level
processing simple.
+/
size_t initialReceiveBufferSize = 4096;
size_t likelyReceiveBufferSize = 4096; /// ditto
size_t maximumReceiveBufferSize = 10 * 1024 * 1024; /// ditto
/++
Maximum combined size of a message.
+/
size_t maximumMessageSize = 10 * 1024 * 1024;
string[string] cookies; /// Cookies to send with the initial request. cookies[name] = value;
string origin; /// Origin URL to send with the handshake, if desired.
string protocol; /// the protocol header, if desired.
int pingFrequency = 5000; /// Amount of time (in msecs) of idleness after which to send an automatic ping
}
/++
Returns one of [CONNECTING], [OPEN], [CLOSING], or [CLOSED].
+/
int readyState() {
return readyState_;
}
/++
Closes the connection, sending a graceful teardown message to the other side.
+/
/// Group: foundational
void close(int code = 0, string reason = null)
//in (reason.length < 123)
in { assert(reason.length < 123); } do
{
if(readyState_ != OPEN)
return; // it cool, we done
WebSocketFrame wss;
wss.fin = true;
wss.opcode = WebSocketOpcode.close;
wss.data = cast(ubyte[]) reason;
wss.send(&llsend);
readyState_ = CLOSING;
llclose();
}
/++
Sends a ping message to the server. This is done automatically by the library if you set a non-zero [Config.pingFrequency], but you can also send extra pings explicitly as well with this function.
+/
/// Group: foundational
void ping() {
auto msg = WebSocketMessage.simpleMessage(WebSocketOpcode.ping, null);
msg.send(cgi);
WebSocketFrame wss;
wss.fin = true;
wss.opcode = WebSocketOpcode.ping;
wss.send(&llsend);
}
// automatically handled....
void pong() {
auto msg = WebSocketMessage.simpleMessage(WebSocketOpcode.pong, null);
msg.send(cgi);
WebSocketFrame wss;
wss.fin = true;
wss.opcode = WebSocketOpcode.pong;
wss.send(&llsend);
}
/++
Sends a text message through the websocket.
+/
/// Group: foundational
void send(in char[] textData) {
WebSocketFrame wss;
wss.fin = true;
wss.opcode = WebSocketOpcode.text;
wss.data = cast(ubyte[]) textData;
wss.send(&llsend);
}
/++
Sends a binary message through the websocket.
+/
/// Group: foundational
void send(in ubyte[] binaryData) {
WebSocketFrame wss;
wss.fin = true;
wss.opcode = WebSocketOpcode.binary;
wss.data = cast(ubyte[]) binaryData;
wss.send(&llsend);
}
/++
Waits for and returns the next complete message on the socket.
Note that the onmessage function is still called, right before
this returns.
+/
/// Group: blocking_api
public WebSocketFrame waitForNextMessage() {
do {
auto m = processOnce();
if(m.populated)
return m;
} while(lowLevelReceive());
return WebSocketFrame.init; // FIXME? maybe.
}
/++
Tells if [waitForNextMessage] would block.
+/
/// Group: blocking_api
public bool waitForNextMessageWouldBlock() {
checkAgain:
if(isMessageBuffered())
return false;
if(!isDataPending())
return true;
while(isDataPending())
lowLevelReceive();
goto checkAgain;
}
/++
Is there a message in the buffer already?
If `true`, [waitForNextMessage] is guaranteed to return immediately.
If `false`, check [isDataPending] as the next step.
+/
/// Group: blocking_api
public bool isMessageBuffered() {
ubyte[] d = receiveBuffer[0 .. receiveBufferUsedLength];
auto s = d;
if(d.length) {
auto orig = d;
auto m = WebSocketFrame.read(d);
// that's how it indicates that it needs more data
if(d !is orig)
return true;
}
return false;
}
private ubyte continuingType;
private ubyte[] continuingData;
//private size_t continuingDataLength;
private WebSocketFrame processOnce() {
ubyte[] d = receiveBuffer[0 .. receiveBufferUsedLength];
auto s = d;
// FIXME: handle continuation frames more efficiently. it should really just reuse the receive buffer.
WebSocketFrame m;
if(d.length) {
auto orig = d;
m = WebSocketFrame.read(d);
// that's how it indicates that it needs more data
if(d is orig)
return WebSocketFrame.init;
switch(m.opcode) {
case WebSocketOpcode.continuation:
if(continuingData.length + m.data.length > config.maximumMessageSize)
throw new Exception("message size exceeded");
continuingData ~= m.data;
if(m.fin) {
if(ontextmessage)
ontextmessage(cast(char[]) continuingData);
if(onbinarymessage)
onbinarymessage(continuingData);
continuingData = null;
}
break;
case WebSocketOpcode.text:
if(m.fin) {
if(ontextmessage)
ontextmessage(m.textData);
} else {
continuingType = m.opcode;
//continuingDataLength = 0;
continuingData = null;
continuingData ~= m.data;
}
break;
case WebSocketOpcode.binary:
if(m.fin) {
if(onbinarymessage)
onbinarymessage(m.data);
} else {
continuingType = m.opcode;
//continuingDataLength = 0;
continuingData = null;
continuingData ~= m.data;
}
break;
case WebSocketOpcode.close:
readyState_ = CLOSED;
if(onclose)
onclose();
unregisterActiveSocket(this);
break;
case WebSocketOpcode.ping:
pong();
break;
case WebSocketOpcode.pong:
// just really references it is still alive, nbd.
break;
default: // ignore though i could and perhaps should throw too
}
}
receiveBufferUsedLength -= s.length - d.length;
return m;
}
private void autoprocess() {
// FIXME
do {
processOnce();
} while(lowLevelReceive());
}
void delegate() onclose; ///
void delegate() onerror; ///
void delegate(in char[]) ontextmessage; ///
void delegate(in ubyte[]) onbinarymessage; ///
void delegate() onopen; ///
/++
+/
/// Group: browser_api
void onmessage(void delegate(in char[]) dg) {
ontextmessage = dg;
}
/// ditto
void onmessage(void delegate(in ubyte[]) dg) {
onbinarymessage = dg;
}
/* } end copy/paste */
}
bool websocketRequested(Cgi cgi) {
@ -4755,10 +5036,11 @@ version(cgi_with_websocket) {
return new WebSocket(cgi);
}
// FIXME: implement websocket extension frames
// get websocket to work on other modes, not just embedded_httpd
// FIXME get websocket to work on other modes, not just embedded_httpd
/* copy/paste in http2.d { */
enum WebSocketOpcode : ubyte {
continuation = 0,
text = 1,
binary = 2,
// 3, 4, 5, 6, 7 RESERVED
@ -4768,7 +5050,7 @@ version(cgi_with_websocket) {
// 11,12,13,14,15 RESERVED
}
struct WebSocketMessage {
public struct WebSocketFrame {
private bool populated;
bool fin;
bool rsv1;
@ -4781,8 +5063,8 @@ version(cgi_with_websocket) {
ubyte[4] maskingKey; // don't set this when sending
ubyte[] data;
static WebSocketMessage simpleMessage(WebSocketOpcode opcode, void[] data) {
WebSocketMessage msg;
static WebSocketFrame simpleMessage(WebSocketOpcode opcode, void[] data) {
WebSocketFrame msg;
msg.fin = true;
msg.opcode = opcode;
msg.data = cast(ubyte[]) data;
@ -4790,7 +5072,7 @@ version(cgi_with_websocket) {
return msg;
}
private void send(Cgi cgi) {
private void send(scope void delegate(ubyte[]) llsend) {
ubyte[64] headerScratch;
int headerScratchPos = 0;
@ -4846,7 +5128,7 @@ version(cgi_with_websocket) {
headerScratch[1] = b2;
}
assert(!masked, "masking key not properly implemented");
//assert(!masked, "masking key not properly implemented");
if(masked) {
// FIXME: randomize this
headerScratch[headerScratchPos .. headerScratchPos + 4] = maskingKey[];
@ -4864,19 +5146,18 @@ version(cgi_with_websocket) {
}
//writeln("SENDING ", headerScratch[0 .. headerScratchPos], data);
cgi.write(headerScratch[0 .. headerScratchPos]);
cgi.write(data);
cgi.flush();
llsend(headerScratch[0 .. headerScratchPos]);
llsend(data);
}
static WebSocketMessage read(ref ubyte[] d) {
WebSocketMessage msg;
static WebSocketFrame read(ref ubyte[] d) {
WebSocketFrame msg;
auto orig = d;
WebSocketMessage needsMoreData() {
WebSocketFrame needsMoreData() {
d = orig;
return WebSocketMessage.init;
return WebSocketFrame.init;
}
if(d.length < 2)
@ -4957,22 +5238,11 @@ version(cgi_with_websocket) {
return msg;
}
static WebSocketMessage read(BufferedInputRange ir) {
readmore:
auto d = ir.front();
auto m = read(d);
if(m is WebSocketMessage.init) {
ir.popFront();
goto readmore;
}
return m;
}
char[] textData() {
return cast(char[]) data;
}
}
/* } */
}

284
http2.d
View File

@ -2192,62 +2192,8 @@ class WebSocket {
private ushort port;
private bool ssl;
private int readyState_;
private Socket socket;
private ubyte[] receiveBuffer;
private size_t receiveBufferUsedLength;
private Config config;
enum CONNECTING = 0; /// Socket has been created. The connection is not yet open.
enum OPEN = 1; /// The connection is open and ready to communicate.
enum CLOSING = 2; /// The connection is in the process of closing.
enum CLOSED = 3; /// The connection is closed or couldn't be opened.
/++
+/
/// Group: foundational
static struct Config {
/++
These control the size of the receive buffer.
It starts at the initial size, will temporarily
balloon up to the maximum size, and will reuse
a buffer up to the likely size.
Anything larger than the maximum size will cause
the connection to be aborted and an exception thrown.
This is to protect you against a peer trying to
exhaust your memory, while keeping the user-level
processing simple.
+/
size_t initialReceiveBufferSize = 4096;
size_t likelyReceiveBufferSize = 4096; /// ditto
size_t maximumReceiveBufferSize = 10 * 1024 * 1024; /// ditto
/++
Maximum combined size of a message.
+/
size_t maximumMessageSize = 10 * 1024 * 1024;
string[string] cookies; /// Cookies to send with the initial request. cookies[name] = value;
string origin; /// Origin URL to send with the handshake, if desired.
string protocol; /// the protocol header, if desired.
int pingFrequency = 5000; /// Amount of time (in msecs) of idleness after which to send an automatic ping
}
/++
Returns one of [CONNECTING], [OPEN], [CLOSING], or [CLOSED].
+/
int readyState() {
return readyState_;
}
/++
wss://echo.websocket.org
wss://echo.websocket.org
+/
/// Group: foundational
this(Uri uri, Config config = Config.init)
@ -2441,6 +2387,130 @@ wss://echo.websocket.org
registerActiveSocket(this);
}
/++
Is data pending on the socket? Also check [isMessageBuffered] to see if there
is already a message in memory too.
If this returns `true`, you can call [lowLevelReceive], then try [isMessageBuffered]
again.
+/
/// Group: blocking_api
public bool isDataPending(Duration timeout = 0.seconds) {
static SocketSet readSet;
if(readSet is null)
readSet = new SocketSet();
version(with_openssl)
if(auto s = cast(SslClientSocket) socket) {
// select doesn't handle the case with stuff
// left in the ssl buffer so i'm checking it separately
if(s.dataPending()) {
return true;
}
}
readSet.add(socket);
//tryAgain:
auto selectGot = Socket.select(readSet, null, null, timeout);
if(selectGot == 0) { /* timeout */
// timeout
return false;
} else if(selectGot == -1) { /* interrupted */
return false;
} else { /* ready */
if(readSet.isSet(socket)) {
return true;
}
}
return false;
}
private void llsend(ubyte[] d) {
while(d.length) {
auto r = socket.send(d);
if(r <= 0) throw new Exception("wtf");
d = d[r .. $];
}
}
private void llclose() {
socket.shutdown(SocketShutdown.SEND);
}
/++
Waits for more data off the low-level socket and adds it to the pending buffer.
Returns `true` if the connection is still active.
+/
/// Group: blocking_api
public bool lowLevelReceive() {
auto r = socket.receive(receiveBuffer[receiveBufferUsedLength .. $]);
if(r == 0)
return false;
if(r <= 0)
throw new Exception("wtf");
receiveBufferUsedLength += r;
return true;
}
private Socket socket;
/* copy/paste section { */
private int readyState_;
private ubyte[] receiveBuffer;
private size_t receiveBufferUsedLength;
private Config config;
enum CONNECTING = 0; /// Socket has been created. The connection is not yet open.
enum OPEN = 1; /// The connection is open and ready to communicate.
enum CLOSING = 2; /// The connection is in the process of closing.
enum CLOSED = 3; /// The connection is closed or couldn't be opened.
/++
+/
/// Group: foundational
static struct Config {
/++
These control the size of the receive buffer.
It starts at the initial size, will temporarily
balloon up to the maximum size, and will reuse
a buffer up to the likely size.
Anything larger than the maximum size will cause
the connection to be aborted and an exception thrown.
This is to protect you against a peer trying to
exhaust your memory, while keeping the user-level
processing simple.
+/
size_t initialReceiveBufferSize = 4096;
size_t likelyReceiveBufferSize = 4096; /// ditto
size_t maximumReceiveBufferSize = 10 * 1024 * 1024; /// ditto
/++
Maximum combined size of a message.
+/
size_t maximumMessageSize = 10 * 1024 * 1024;
string[string] cookies; /// Cookies to send with the initial request. cookies[name] = value;
string origin; /// Origin URL to send with the handshake, if desired.
string protocol; /// the protocol header, if desired.
int pingFrequency = 5000; /// Amount of time (in msecs) of idleness after which to send an automatic ping
}
/++
Returns one of [CONNECTING], [OPEN], [CLOSING], or [CLOSED].
+/
int readyState() {
return readyState_;
}
/++
Closes the connection, sending a graceful teardown message to the other side.
+/
@ -2459,7 +2529,7 @@ wss://echo.websocket.org
readyState_ = CLOSING;
socket.shutdown(SocketShutdown.SEND);
llclose();
}
/++
@ -2505,31 +2575,6 @@ wss://echo.websocket.org
wss.send(&llsend);
}
private void llsend(ubyte[] d) {
while(d.length) {
auto r = socket.send(d);
if(r <= 0) throw new Exception("wtf");
d = d[r .. $];
}
}
/++
Waits for more data off the low-level socket and adds it to the pending buffer.
Returns `true` if the connection is still active.
+/
/// Group: blocking_api
public bool lowLevelReceive() {
auto r = socket.receive(receiveBuffer[receiveBufferUsedLength .. $]);
if(r == 0)
return false;
if(r <= 0)
throw new Exception("wtf");
receiveBufferUsedLength += r;
return true;
}
/++
Waits for and returns the next complete message on the socket.
@ -2582,46 +2627,6 @@ wss://echo.websocket.org
return false;
}
/++
Is data pending on the socket? Also check [isMessageBuffered] to see if there
is already a message in memory too.
If this returns `true`, you can call [lowLevelReceive], then try [isMessageBuffered]
again.
+/
/// Group: blocking_api
public bool isDataPending() {
static SocketSet readSet;
if(readSet is null)
readSet = new SocketSet();
version(with_openssl)
if(auto s = cast(SslClientSocket) socket) {
// select doesn't handle the case with stuff
// left in the ssl buffer so i'm checking it separately
if(s.dataPending()) {
return true;
}
}
readSet.add(socket);
//tryAgain:
auto selectGot = Socket.select(readSet, null, null, 0.seconds /* timeout */);
if(selectGot == 0) { /* timeout */
// timeout
return false;
} else if(selectGot == -1) { /* interrupted */
return false;
} else { /* ready */
if(readSet.isSet(socket)) {
return true;
}
}
return false;
}
private ubyte continuingType;
private ubyte[] continuingData;
//private size_t continuingDataLength;
@ -2722,6 +2727,8 @@ wss://echo.websocket.org
onbinarymessage = dg;
}
/* } end copy/paste */
/*
const int bufferedAmount // amount pending
const string extensions
@ -2802,7 +2809,7 @@ wss://echo.websocket.org
}
/* copy/paste from cgi.d */
private {
public {
enum WebSocketOpcode : ubyte {
continuation = 0,
text = 1,
@ -3007,3 +3014,36 @@ private {
}
}
}
/+
so the url params are arguments. it knows the request
internally. other params are properties on the req
names may have different paths... those will just add ForSomething i think.
auto req = api.listMergeRequests
req.page = 10;
or
req.page(1)
.bar("foo")
req.execute();
everything in the response is nullable access through the
dynamic object, just with property getters there. need to make
it static generated tho
other messages may be: isPresent and getDynamic
AND/OR what about doing it like the rails objects
BroadcastMessage.get(4)
// various properties
// it lists what you updated
BroadcastMessage.foo().bar().put(5)
+/