websocket polling/blocking api

This commit is contained in:
Adam D. Ruppe 2020-01-05 22:43:47 -05:00
parent 9ad7b08ce0
commit acb5825c7c
1 changed files with 231 additions and 42 deletions

273
http2.d
View File

@ -2052,11 +2052,70 @@ private bool bicmp(in ubyte[] item, in char[] search) {
}
/++
WebSocket client, based on the browser api.
WebSocket client, based on the browser api, though also with other api options.
on receive
on frame
on message
---
auto ws = new WebSocket(URI("ws://...."));
ws.onmessage = (in char[] msg) {
ws.send("a reply");
};
ws.connect();
WebSocket.eventLoop();
---
Symbol_groups:
foundational =
Used with all API styles.
browser_api =
API based on the standard in the browser.
event_loop_integration =
Integrating with external event loops is done through static functions. You should
call these BEFORE doing anything else with the WebSocket module or class.
$(PITFALL NOT IMPLEMENTED)
---
WebSocket.setEventLoopProxy(arsd.simpledisplay.EventLoop.proxy.tupleof);
// or something like that. it is not implemented yet.
---
$(PITFALL NOT IMPLEMENTED)
blocking_api =
The blocking API is best used when you only need basic functionality with a single connection.
---
WebSocketMessage msg;
do {
// FIXME good demo
} while(msg);
---
Or to check for blocks before calling:
---
try_to_process_more:
while(ws.isMessageBuffered()) {
auto msg = ws.waitForNextMessage();
// process msg
}
if(ws.isDataPending()) {
ws.lowLevelReceive();
goto try_to_process_more;
} else {
// nothing ready, you can do other things
// or at least sleep a while before trying
// to process more.
if(ws.readyState == WebSocket.OPEN) {
Thread.sleep(1.seconds);
goto try_to_process_more;
}
}
---
+/
class WebSocket {
private Uri uri;
@ -2083,6 +2142,7 @@ class WebSocket {
/++
+/
/// Group: foundational
static struct Config {
/++
These control the size of the receive buffer.
@ -2104,6 +2164,8 @@ class WebSocket {
string[string] cookies; /// Cookies to send with the initial request. cookies[name] = value;
string origin; /// Origin URL to send with the handshake, if desired.
string protocol; /// the protocol header, if desired.
int pingFrequency = 5000; /// Amount of time (in msecs) of idleness after which to send an automatic ping
}
/++
@ -2116,6 +2178,7 @@ class WebSocket {
/++
wss://echo.websocket.org
+/
/// Group: foundational
this(Uri uri, Config config = Config.init)
in (uri.scheme == "ws" || uri.scheme == "wss")
{
@ -2141,6 +2204,7 @@ wss://echo.websocket.org
/++
+/
/// Group: foundational
void connect() {
socket.connect(new InternetAddress(host, port)); // FIXME: ipv6 support...
// FIXME: websocket handshake could and really should be async too.
@ -2289,7 +2353,7 @@ wss://echo.websocket.org
// FIXME: check protocol if config requested one
// FIXME: check accept
// FIXME: check accept for the right hash
receiveBuffer[0 .. used.length] = used[];
receiveBufferUsedLength = used.length;
@ -2301,8 +2365,9 @@ wss://echo.websocket.org
}
/++
Closes the connection, sending a graceful teardown message to the other side.
+/
/// Group: foundational
void close(int code = 0, string reason = null)
in (reason.length < 123)
{
@ -2318,8 +2383,9 @@ wss://echo.websocket.org
}
/++
Sends a ping message to the server. This is done automatically by the library if you set a non-zero [Config.pingFrequency], but you can also send extra pings explicitly as well with this function.
+/
/// Group: foundational
void ping() {
WebSocketMessage wss;
wss.fin = true;
@ -2336,8 +2402,9 @@ wss://echo.websocket.org
}
/++
Sends a text message through the websocket.
+/
/// Group: foundational
void send(in char[] textData) {
WebSocketMessage wss;
wss.fin = true;
@ -2346,6 +2413,19 @@ wss://echo.websocket.org
wss.send(&llsend);
}
/++
Sends a binary message through the websocket.
+/
/// Group: foundational
void send(in ubyte[] binaryData) {
WebSocketMessage wss;
wss.fin = true;
wss.opcode = WebSocketOpcode.binary;
wss.data = cast(ubyte[]) binaryData;
wss.send(&llsend);
}
private void llsend(ubyte[] d) {
while(d.length) {
auto r = socket.send(d);
@ -2354,7 +2434,13 @@ wss://echo.websocket.org
}
}
/*private*/ bool llreceive() {
/++
Waits for more data off the low-level socket and adds it to the pending buffer.
Returns `true` if the connection is still active.
+/
/// Group: blocking_api
public bool lowLevelReceive() {
auto r = socket.receive(receiveBuffer[receiveBufferUsedLength .. $]);
if(r == 0)
return false;
@ -2364,47 +2450,146 @@ wss://echo.websocket.org
return true;
}
public void autoprocess() {
/++
Waits for and returns the next complete message on the socket.
Note that the onmessage function is still called, right before
this returns.
+/
/// Group: blocking_api
public WebSocketMessage waitForNextMessage() {
do {
ubyte[] d = receiveBuffer[0 .. receiveBufferUsedLength];
auto s = d;
while(d.length) {
auto orig = d;
auto m = WebSocketMessage.read(d);
if(d is orig)
break;
if(m.opcode == WebSocketOpcode.close) {
readyState_ = CLOSED;
if(onclose)
onclose();
} else if(m.opcode == WebSocketOpcode.text) {
if(onmessage)
onmessage(m.textData);
} else {
// FIXME
}
}
receiveBufferUsedLength -= s.length - d.length;
} while(llreceive());
auto m = processOnce();
if(m.populated)
return m;
} while(lowLevelReceive());
return WebSocketMessage.init; // FIXME? maybe.
}
/++
Tells if [waitForNextMessage] would block.
+/
/// Group: blocking_api
public bool waitForNextMessageWouldBlock() {
checkAgain:
if(isMessageBuffered())
return false;
if(!isDataPending())
return true;
while(isDataPending())
lowLevelReceive();
goto checkAgain;
}
/++
Is there a message in the buffer already?
If `true`, [waitForNextMessage] is guaranteed to return immediately.
If `false`, check [isDataPending] as the next step.
+/
/// Group: blocking_api
public bool isMessageBuffered() {
ubyte[] d = receiveBuffer[0 .. receiveBufferUsedLength];
auto s = d;
if(d.length) {
auto orig = d;
auto m = WebSocketMessage.read(d);
// that's how it indicates that it needs more data
if(d !is orig)
return true;
}
return false;
}
/++
Is data pending on the socket? Also check [isMessageBuffered] to see if there
is already a message in memory too.
If this returns `true`, you can call [lowLevelReceive], then try [isMessageBuffered]
again.
+/
/// Group: blocking_api
public bool isDataPending() {
static SocketSet readSet;
if(readSet is null)
readSet = new SocketSet();
version(with_openssl)
if(auto s = cast(SslClientSocket) socket) {
// select doesn't handle the case with stuff
// left in the ssl buffer so i'm checking it separately
if(s.dataPending()) {
return true;
}
}
readSet.add(socket);
//tryAgain:
auto selectGot = Socket.select(readSet, null, null, 0.seconds /* timeout */);
if(selectGot == 0) { /* timeout */
// timeout
return false;
} else if(selectGot == -1) { /* interrupted */
return false;
} else { /* ready */
if(readSet.isSet(socket)) {
return true;
}
}
return false;
}
private WebSocketMessage processOnce() {
ubyte[] d = receiveBuffer[0 .. receiveBufferUsedLength];
auto s = d;
// FIXME: handle continuation frames
WebSocketMessage m;
if(d.length) {
auto orig = d;
m = WebSocketMessage.read(d);
// that's how it indicates that it needs more data
if(d is orig)
return WebSocketMessage.init;
if(m.opcode == WebSocketOpcode.close) {
readyState_ = CLOSED;
if(onclose)
onclose();
} else if(m.opcode == WebSocketOpcode.text) {
if(ontextmessage)
ontextmessage(m.textData);
} else {
// FIXME
}
}
receiveBufferUsedLength -= s.length - d.length;
return m;
}
private void autoprocess() {
// FIXME
do {
processOnce();
} while(lowLevelReceive());
}
void delegate() onclose; ///
void delegate() onerror; ///
void delegate(in char[]) ontextmessage; ///
void delegate() onopen; ///
/++
+/
void send(in ubyte[] binaryData) {
WebSocketMessage wss;
wss.fin = true;
wss.opcode = WebSocketOpcode.binary;
wss.data = cast(ubyte[]) binaryData;
wss.send(&llsend);
/// Group: browser_api
void onmessage(void delegate(in char[]) dg) {
ontextmessage = dg;
}
void delegate() onclose; ///
void delegate() onerror; ///
void delegate(in char[]) onmessage; ///
void delegate() onopen; ///
/*
const int bufferedAmount // amount pending
const string extensions
@ -2417,6 +2602,7 @@ wss://echo.websocket.org
/* copy/paste from cgi.d */
private {
enum WebSocketOpcode : ubyte {
continuation = 0,
text = 1,
binary = 2,
// 3, 4, 5, 6, 7 RESERVED
@ -2426,7 +2612,8 @@ private {
// 11,12,13,14,15 RESERVED
}
struct WebSocketMessage {
public struct WebSocketMessage {
private bool populated;
bool fin;
bool rsv1;
bool rsv2;
@ -2533,6 +2720,8 @@ private {
ubyte b = d[0];
msg.populated = true;
msg.opcode = cast(WebSocketOpcode) (b & 0x0f);
b >>= 4;
msg.rsv3 = b & 0x01;