mirror of https://github.com/adamdruppe/arsd.git
666 lines
16 KiB
D
666 lines
16 KiB
D
/**
|
|
OBSOLETE: This provides a kind of real time updates that can be consumed
|
|
by javascript (and probably other things eventually). Superseded by
|
|
new functionality built into [arsd.cgi].
|
|
|
|
First, you compile the server app. dmd -version=standalone_rtud -version=rtud_daemon
|
|
|
|
Run it. It's pretty generic; probably don't have to modify it
|
|
but you always can. It's useful to have a long running process
|
|
anyway.
|
|
|
|
But then you'll want an intermediary between this and the javascript.
|
|
Use the handleListenerGateway() function in your cgi app for that.
|
|
You can pass it a channel prefix for things like user ids.
|
|
|
|
In your javascript, use EventListener("path to gateway");
|
|
And addEventListener(type, function, false);
|
|
|
|
Note: this javascript does not work in all browsers, but
|
|
real time updates should really be optional anyway.
|
|
|
|
I might add a traditional ajax fallback but still, it's all
|
|
js so be sure it's non-essential if possible.
|
|
|
|
|
|
And in your app, as events happen, use in D:
|
|
auto stream = new UpdateStream(channel);
|
|
stream.sendMessage(type, message);
|
|
|
|
and the helper app will push it all out. You might want to wrap
|
|
some of this in try/catch since if the helper app dies, this will
|
|
throw since it can't connect.
|
|
|
|
|
|
I found using user names as channels is good stuff. Then your JS
|
|
doesn't provide a channel at all - your helper app gives it through
|
|
the channel prefix argument.
|
|
*/
|
|
module arsd.rtud;
|
|
|
|
import std.string;
|
|
import std.array : replace;
|
|
import std.conv;
|
|
import std.date;
|
|
|
|
|
|
class UpdateStream {
|
|
File net;
|
|
string channel;
|
|
|
|
this(string channel) {
|
|
net = openNetwork("localhost", 7071);
|
|
this.channel = channel;
|
|
}
|
|
|
|
~this() {
|
|
net.close();
|
|
}
|
|
|
|
void deleteMessage(string messageId) {
|
|
import arsd.cgi; // : encodeVariables;
|
|
string message = encodeVariables([
|
|
"id" : messageId,
|
|
"operation" : "delete"
|
|
]);
|
|
|
|
net.writeln(message);
|
|
net.flush();
|
|
}
|
|
|
|
void sendMessage(string eventType, string messageText, long ttl = 2500) {
|
|
import arsd.cgi; // : encodeVariables;
|
|
string message = encodeVariables([
|
|
//"operation" : "post",
|
|
//"id" : ????,
|
|
"channel" : channel,
|
|
"type" : eventType,
|
|
"data" : messageText,
|
|
"ttl" : to!string(ttl)
|
|
]);
|
|
|
|
net.writeln(message);
|
|
net.flush();
|
|
}
|
|
}
|
|
|
|
/+
|
|
if("channels" in message) {
|
|
if("last-message-id" in message)
|
|
if("minimum-time" in message)
|
|
if("close-time" in message)
|
|
+/
|
|
|
|
version(D_Version2) {
|
|
static import linux = core.sys.posix.unistd;
|
|
static import sock = core.sys.posix.sys.socket;
|
|
} else {
|
|
static import linux = std.c.linux.linux;
|
|
static import sock = std.c.linux.socket;
|
|
}
|
|
|
|
int openNetworkFd(string host, ushort port) {
|
|
import std.exception;
|
|
auto h = enforce( sock.gethostbyname(std.string.toStringz(host)),
|
|
new StdioException("gethostbyname"));
|
|
|
|
int s = sock.socket(sock.AF_INET, sock.SOCK_STREAM, 0);
|
|
enforce(s != -1, new StdioException("socket"));
|
|
|
|
scope(failure) {
|
|
linux.close(s);
|
|
}
|
|
|
|
sock.sockaddr_in addr;
|
|
|
|
addr.sin_family = sock.AF_INET;
|
|
addr.sin_port = sock.htons(port);
|
|
std.c.string.memcpy(&addr.sin_addr.s_addr, h.h_addr, h.h_length);
|
|
|
|
enforce(sock.connect(s, cast(sock.sockaddr*) &addr, addr.sizeof) != -1,
|
|
new StdioException("Connect failed"));
|
|
|
|
return s;
|
|
}
|
|
|
|
void writeToFd(int fd, string s) {
|
|
again:
|
|
auto num = linux.write(fd, s.ptr, s.length);
|
|
if(num < 0)
|
|
throw new Exception("couldn't write");
|
|
if(num == 0)
|
|
return;
|
|
s = s[num .. $];
|
|
if(s.length)
|
|
goto again;
|
|
}
|
|
|
|
__gshared bool deathRequested = false;
|
|
extern(C)
|
|
void requestDeath(int sig) {
|
|
deathRequested = true;
|
|
}
|
|
|
|
import arsd.cgi;
|
|
/// The throttledConnection param is useful for helping to get
|
|
/// around browser connection limitations.
|
|
|
|
/// If the user opens a bunch of tabs, these long standing
|
|
/// connections can hit the per-host connection limit, breaking
|
|
/// navigation until the connection times out.
|
|
|
|
/// The throttle option sets a long retry period and polls
|
|
/// instead of waits. This sucks, but sucks less than your whole
|
|
/// site hanging because the browser is queuing your connections!
|
|
int handleListenerGateway(Cgi cgi, string channelPrefix, bool throttledConnection = false) {
|
|
cgi.setCache(false);
|
|
|
|
import core.sys.posix.signal;
|
|
sigaction_t act;
|
|
// I want all zero everywhere else; the read() must not automatically restart for this to work.
|
|
act.sa_handler = &requestDeath;
|
|
|
|
if(linux.sigaction(linux.SIGTERM, &act, null) != 0)
|
|
throw new Exception("sig err");
|
|
|
|
auto f = openNetworkFd("localhost", 7070);
|
|
scope(exit) linux.close(f);
|
|
|
|
string[string] variables;
|
|
|
|
variables["channel"] = channelPrefix ~ ("channel" in cgi.get ? cgi.get["channel"] : "");
|
|
if("minimum-time" in cgi.get)
|
|
variables["minimum-time"] = cgi.get["minimum-time"];
|
|
if("last-message-id" in cgi.get)
|
|
variables["last-message-id"] = cgi.get["last-message-id"];
|
|
|
|
bool isSse;
|
|
|
|
if(cgi.accept == "text/event-stream") {
|
|
cgi.setResponseContentType("text/event-stream");
|
|
isSse = true;
|
|
if(cgi.lastEventId.length)
|
|
variables["last-message-id"] = cgi.lastEventId;
|
|
|
|
if(throttledConnection) {
|
|
cgi.write("retry: 15000\n");
|
|
} else {
|
|
cgi.write(":\n"); // the comment ensures apache doesn't skip us
|
|
}
|
|
|
|
cgi.flush(); // sending the headers along
|
|
} else {
|
|
// gotta handle it as ajax polling
|
|
variables["close-time"] = "0"; // ask for long polling
|
|
}
|
|
|
|
|
|
if(throttledConnection)
|
|
variables["close-time"] = "-1"; // close immediately
|
|
|
|
writeToFd(f, encodeVariables(variables) ~ "\n");
|
|
|
|
string wegot;
|
|
|
|
string[4096] buffer;
|
|
|
|
for(; !deathRequested ;) {
|
|
auto num = linux.read(f, buffer.ptr, buffer.length);
|
|
if(num < 0)
|
|
throw new Exception("read error");
|
|
if(num == 0)
|
|
break;
|
|
|
|
auto chunk = buffer[0 .. num];
|
|
if(isSse) {
|
|
cgi.write(chunk);
|
|
cgi.flush();
|
|
} else {
|
|
wegot ~= cast(string) chunk;
|
|
}
|
|
}
|
|
|
|
// this is to support older browsers
|
|
if(!isSse && !deathRequested) {
|
|
// we have to parse it out and reformat for plain cgi...
|
|
auto lol = parseMessages(wegot);
|
|
//cgi.setResponseContentType("text/json");
|
|
// FIXME gotta reorganize my json stuff
|
|
//cgi.write(toJson(lol));
|
|
return 1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
struct Message {
|
|
string type;
|
|
string id;
|
|
string data;
|
|
long timestamp;
|
|
long ttl;
|
|
|
|
string operation;
|
|
}
|
|
|
|
|
|
Message[] getMessages(string channel, string eventTypeFilter = null, long maxAge = 0) {
|
|
auto f = openNetworkFd("localhost", 7070);
|
|
scope(exit) linux.close(f);
|
|
|
|
string[string] variables;
|
|
|
|
variables["channel"] = channel;
|
|
if(maxAge)
|
|
variables["minimum-time"] = to!string(getUtcTime() - maxAge);
|
|
|
|
variables["close-time"] = "-1"; // close immediately
|
|
|
|
writeToFd(f, encodeVariables(variables) ~ "\n");
|
|
|
|
string wegot;
|
|
|
|
string[4096] buffer;
|
|
|
|
for(;;) {
|
|
auto num = linux.read(f, buffer.ptr, buffer.length);
|
|
if(num < 0)
|
|
throw new Exception("read error");
|
|
if(num == 0)
|
|
break;
|
|
|
|
auto chunk = buffer[0 .. num];
|
|
wegot ~= cast(string) chunk;
|
|
}
|
|
|
|
return parseMessages(wegot, eventTypeFilter);
|
|
}
|
|
|
|
Message[] parseMessages(string wegot, string eventTypeFilter = null) {
|
|
// gotta parse this since rtud writes out the format for browsers
|
|
Message[] ret;
|
|
foreach(message; wegot.split("\n\n")) {
|
|
Message m;
|
|
foreach(line; message.split("\n")) {
|
|
if(line.length == 0)
|
|
throw new Exception("wtf");
|
|
if(line[0] == ':')
|
|
line = line[1 .. $];
|
|
|
|
if(line.length == 0)
|
|
continue; // just an empty comment
|
|
|
|
auto idx = line.indexOf(":");
|
|
if(idx == -1)
|
|
continue; // probably just a comment
|
|
|
|
if(idx + 2 > line.length)
|
|
continue; // probably just a comment too
|
|
|
|
auto name = line[0 .. idx];
|
|
auto data = line[idx + 2 .. $];
|
|
|
|
switch(name) {
|
|
default: break; // do nothing
|
|
case "timestamp":
|
|
if(data.length)
|
|
m.timestamp = to!long(data);
|
|
break;
|
|
case "ttl":
|
|
if(data.length)
|
|
m.ttl = to!long(data);
|
|
break;
|
|
case "operation":
|
|
m.operation = data;
|
|
break;
|
|
case "id":
|
|
m.id = data;
|
|
break;
|
|
case "event":
|
|
m.type = data;
|
|
break;
|
|
case "data":
|
|
m.data ~= data;
|
|
break;
|
|
}
|
|
}
|
|
if(eventTypeFilter is null || eventTypeFilter == m.type)
|
|
ret ~= m;
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
|
|
version(rtud_daemon) :
|
|
|
|
import arsd.netman;
|
|
|
|
// Real time update daemon
|
|
/*
|
|
You push messages out to channels, where they are held for a certain length of time.
|
|
|
|
It can also do state with listener updates.
|
|
|
|
Clients ask for messages since a time, and if there are none, you hold the connection until something arrives.
|
|
|
|
|
|
There should be D and Javascript apis for pushing and receiving.
|
|
|
|
|
|
JS:
|
|
|
|
var updateObject = RealTimeUpdate();
|
|
|
|
updateObject.someMessage = function(msg) {
|
|
// react to it
|
|
}
|
|
|
|
updateObject.listen(channel);
|
|
|
|
updateObject.send(message, args); // probably shouldn't need this from JS
|
|
*/
|
|
|
|
/*
|
|
Incoming Packet format is x-www-urlencoded. There must be no new lines
|
|
in there - be sure to url encode them.
|
|
|
|
A message is separated by newlines.
|
|
*/
|
|
|
|
class RtudConnection : Connection {
|
|
RealTimeUpdateDaemon daemon;
|
|
|
|
this(RealTimeUpdateDaemon daemon) {
|
|
this.daemon = daemon;
|
|
}
|
|
|
|
override void onDataReceived() {
|
|
import arsd.cgi;// : decodeVariables;
|
|
try_again:
|
|
auto data = cast(string) read();
|
|
|
|
auto index = data.indexOf("\n");
|
|
if(index == -1)
|
|
return; // wait for more data
|
|
|
|
auto messageRaw = data[0 .. index];
|
|
changeReadPosition(index + 1);
|
|
|
|
auto message = decodeVariables(messageRaw);
|
|
|
|
handleMessage(message);
|
|
goto try_again;
|
|
}
|
|
|
|
invariant() {
|
|
assert(daemon !is null);
|
|
}
|
|
|
|
abstract void handleMessage(string[][string] message);
|
|
}
|
|
|
|
class NotificationConnection : RtudConnection {
|
|
this(RealTimeUpdateDaemon daemon) {
|
|
super(daemon);
|
|
closeTime = long.max;
|
|
}
|
|
|
|
long closeTime;
|
|
|
|
/// send: what channels you're interested in, a minimum time,
|
|
/// and a close time.
|
|
/// if the close time is negative, you are just polling curiously.
|
|
/// if it is zero, it will close after your next batch. (long polling)
|
|
/// anything else stays open for as long as it can in there.
|
|
|
|
override void handleMessage(string[][string] message) {
|
|
Channel*[] channels;
|
|
|
|
if("channels" in message) {
|
|
foreach(ch; message["channels"]) {
|
|
auto channel = daemon.getChannel(ch);
|
|
channels ~= channel;
|
|
channel.subscribeTo(this);
|
|
}
|
|
}
|
|
|
|
if("channel" in message) {
|
|
auto channel = daemon.getChannel(message["channel"][$-1]);
|
|
channels ~= channel;
|
|
channel.subscribeTo(this);
|
|
}
|
|
|
|
import std.algorithm;
|
|
import std.range;
|
|
|
|
Message*[] backMessages;
|
|
|
|
if("last-message-id" in message) {
|
|
auto lastMessageId = message["last-message-id"][$-1];
|
|
foreach(channel; channels)
|
|
backMessages ~= channel.messages;
|
|
|
|
auto bm = sort!"a.timestamp < b.timestamp"(backMessages);
|
|
|
|
backMessages = array(find!("a.id == b")(bm, lastMessageId));
|
|
while(backMessages.length && backMessages[0].id == lastMessageId)
|
|
backMessages = backMessages[1 .. $]; // the last message is the one they got
|
|
|
|
//writeln("backed up from ", lastMessageId, " is");
|
|
//foreach(msg; backMessages)
|
|
//writeln(*msg);
|
|
} else if("minimum-time" in message) {
|
|
foreach(channel; channels)
|
|
backMessages ~= channel.messages;
|
|
|
|
auto bm = sort!"a.timestamp < b.timestamp"(backMessages);
|
|
|
|
backMessages = array(find!("a.timestamp >= b")(bm, to!long(message["minimum-time"][$-1])));
|
|
}
|
|
|
|
if("close-time" in message)
|
|
closeTime = to!long(message["close-time"][$-1]);
|
|
|
|
// send the back messages immediately
|
|
daemon.writeMessagesTo(backMessages, this, "backed-up");
|
|
|
|
// if(closeTime > 0 && closeTime != long.max)
|
|
// closeTime = getUtcTime() + closeTime; // FIXME: do i use this? Should I use this?
|
|
}
|
|
|
|
override void onDisconnect() {
|
|
daemon.removeConnection(this);
|
|
}
|
|
|
|
}
|
|
|
|
class DataConnection : RtudConnection {
|
|
this(RealTimeUpdateDaemon daemon) {
|
|
super(daemon);
|
|
}
|
|
|
|
override void handleMessage(string[][string] message) {
|
|
string getStr(string key, string def) {
|
|
if(key in message) {
|
|
auto s = message[key][$ - 1];
|
|
if(s.length)
|
|
return s;
|
|
}
|
|
return def;
|
|
}
|
|
|
|
string operation = getStr("operation", "post");
|
|
|
|
Message* m = daemon.getMessage(getStr("id", null));
|
|
switch(operation) {
|
|
default: throw new Exception("unknown operation " ~ operation); break;
|
|
case "delete":
|
|
daemon.deleteMessage(m);
|
|
break;
|
|
case "edit":
|
|
case "post":
|
|
// we have to create the message and send it out
|
|
m.type = getStr("type", "message");
|
|
m.data = getStr("data", "");
|
|
m.timestamp = to!long(getStr("timestamp", to!string(getUtcTime())));
|
|
m.ttl = to!long(getStr("ttl", "1000"));
|
|
}
|
|
|
|
assert(m !is null);
|
|
|
|
if("channels" in message)
|
|
foreach(ch; message["channels"]) {
|
|
auto channel = daemon.getChannel(ch);
|
|
assert(channel !is null);
|
|
channel.writeMessage(m, operation);
|
|
}
|
|
|
|
if("channel" in message) {
|
|
auto channel = daemon.getChannel(message["channel"][$-1]);
|
|
channel.writeMessage(m, operation);
|
|
}
|
|
}
|
|
}
|
|
|
|
struct Channel {
|
|
string id;
|
|
Message*[] messages;
|
|
|
|
// a poor man's set...
|
|
NotificationConnection[NotificationConnection] listeningConnections;
|
|
|
|
|
|
RealTimeUpdateDaemon daemon;
|
|
|
|
void writeMessage(Message* message, string operation) {
|
|
messages ~= message;
|
|
foreach(k, v; listeningConnections)
|
|
daemon.writeMessagesTo([message], v, operation);
|
|
}
|
|
|
|
void subscribeTo(NotificationConnection c) {
|
|
listeningConnections[c] = c;
|
|
}
|
|
}
|
|
|
|
|
|
class RealTimeUpdateDaemon : NetworkManager {
|
|
this() {
|
|
super();
|
|
setConnectionSpawner(7070, &createNotificationConnection);
|
|
listen(7070);
|
|
setConnectionSpawner(7071, &createDataConnection);
|
|
listen(7071);
|
|
}
|
|
|
|
private Channel*[string] channels;
|
|
private Message*[string] messages;
|
|
|
|
Message* getMessage(string id) {
|
|
if(id.length && id in messages)
|
|
return messages[id];
|
|
|
|
if(id.length == 0)
|
|
id = to!string(getUtcTime());
|
|
|
|
longerId:
|
|
if(id in messages) {
|
|
id ~= "-";
|
|
goto longerId;
|
|
}
|
|
|
|
|
|
auto message = new Message;
|
|
message.id = id;
|
|
messages[id] = message;
|
|
|
|
//writeln("NEW MESSAGE: ", *message);
|
|
|
|
return message;
|
|
}
|
|
|
|
void deleteMessage(Message* m) {
|
|
messages.remove(m.id);
|
|
foreach(k, v; channels)
|
|
foreach(i, msg; v.messages) {
|
|
if(msg is m) {
|
|
v.messages = v.messages[0 .. i] ~ v.messages[i + 1 .. $];
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
Channel* getChannel(string id) {
|
|
if(id in channels)
|
|
return channels[id];
|
|
|
|
auto c = new Channel;
|
|
c.daemon = this;
|
|
c.id = id;
|
|
channels[id] = c;
|
|
return c;
|
|
}
|
|
|
|
void writeMessagesTo(Message*[] messages, NotificationConnection connection, string operation) {
|
|
foreach(messageMain; messages) {
|
|
if(messageMain.timestamp + messageMain.ttl < getUtcTime)
|
|
deleteMessage(messageMain); // too old, kill it
|
|
Message message = *messageMain;
|
|
message.operation = operation;
|
|
|
|
// this should never happen, but just in case
|
|
replace(message.type, "\n", "");
|
|
connection.write(":timestamp: " ~ to!string(message.timestamp) ~ "\n");
|
|
connection.write(":ttl: " ~ to!string(message.ttl) ~ "\n");
|
|
connection.write(":operation: " ~ message.operation ~ "\n");
|
|
if(message.id.length)
|
|
connection.write("id: " ~ message.id ~ "\n");
|
|
connection.write("event: " ~ message.type ~ "\n");
|
|
connection.write("data: " ~ replace(message.data, "\n", "\ndata: ") ~ "\n");
|
|
connection.write("\n");
|
|
}
|
|
|
|
if(connection.closeTime <= 0) // FIXME: other times?
|
|
if(connection.closeTime != 0 || messages.length)
|
|
connection.disconnect(); // note this actually queues a disconnect, so we cool
|
|
}
|
|
|
|
void removeConnection(NotificationConnection connection) {
|
|
foreach(channel; channels)
|
|
channel.listeningConnections.remove(connection);
|
|
}
|
|
|
|
Connection createNotificationConnection() {
|
|
return new NotificationConnection(this);
|
|
}
|
|
|
|
Connection createDataConnection() {
|
|
return new DataConnection(this);
|
|
}
|
|
}
|
|
|
|
void rtudMain() {
|
|
auto netman = new RealTimeUpdateDaemon;
|
|
|
|
bool proceed = true;
|
|
|
|
while(proceed)
|
|
try
|
|
proceed = netman.proceed();
|
|
catch(ConnectionException e) {
|
|
writeln(e.toString());
|
|
e.c.disconnectNow();
|
|
}
|
|
catch(Throwable e) {
|
|
|
|
|
|
writeln(e.toString());
|
|
}
|
|
}
|
|
|
|
version(standalone_rtud)
|
|
void main() {
|
|
rtudMain();
|
|
}
|