diff --git a/rtud.d b/rtud.d new file mode 100644 index 0000000..1886033 --- /dev/null +++ b/rtud.d @@ -0,0 +1,506 @@ +/** + This provides a kind of real time updates that can be consumed + by javascript (and probably other things eventually). + + First, you compile the server app. dmd -version=standalone_rtud -version=rtud_daemon + + Run it. It's pretty generic; probably don't have to modify it + but you always can. It's useful to have a long running process + anyway. + + But then you'll want an intermediary between this and the javascript. + Use the handleListenerGateway() function in your cgi app for that. + You can pass it a channel prefix for things like user ids. + + In your javascript, use EventListener("path to gateway"); + And addEventListener(type, function, false); + + + And in your app, as events happen, use in D: + auto stream = new UpdateStream(channel); + stream.sendMessage(type, message); + + and the helper app will push it all out. You might want to wrap + some of this in try/catch since if the helper app dies, this will + throw since it can't connect. + + + I found using user names as channels is good stuff. Then your JS + doesn't provide a channel at all - your helper app gives it through + the channel prefix argument. +*/ +module arsd.rtud; + +import std.string; +import std.array : replace; +import std.conv; +import std.date; + + +class UpdateStream { + File net; + string channel; + + this(string channel) { + net = openNetwork("localhost", 7071); + this.channel = channel; + } + + ~this() { + net.close(); + } + + void sendMessage(string eventType, string messageText, long ttl = 2500) { + import arsd.cgi; // : encodeVariables; + string message = encodeVariables([ + //"operation" : "post", + //"id" : ????, + "channel" : channel, + "type" : eventType, + "data" : messageText, + "ttl" : to!string(ttl) + ]); + + net.writeln(message); + net.flush(); + } +} + +/+ + if("channels" in message) { + if("last-message-id" in message) + if("minimum-time" in message) + if("close-time" in message) ++/ + +static import linux = std.c.linux.linux; +static import sock = std.c.linux.socket; + +int openNetworkFd(string host, ushort port) { + import std.exception; + auto h = enforce( sock.gethostbyname(std.string.toStringz(host)), + new StdioException("gethostbyname")); + + int s = sock.socket(sock.AF_INET, sock.SOCK_STREAM, 0); + enforce(s != -1, new StdioException("socket")); + + scope(failure) { + linux.close(s); + } + + sock.sockaddr_in addr; + + addr.sin_family = sock.AF_INET; + addr.sin_port = sock.htons(port); + std.c.string.memcpy(&addr.sin_addr.s_addr, h.h_addr, h.h_length); + + enforce(sock.connect(s, cast(sock.sockaddr*) &addr, addr.sizeof) != -1, + new StdioException("Connect failed")); + + return s; +} + +void writeToFd(int fd, string s) { + again: + int num = linux.write(fd, s.ptr, s.length); + if(num < 0) + throw new Exception("couldn't write"); + if(num == 0) + return; + s = s[num .. $]; + if(s.length) + goto again; +} + + +import arsd.cgi; +int handleListenerGateway(Cgi cgi, string channelPrefix) { + cgi.setCache(false); + + auto f = openNetworkFd("localhost", 7070); + scope(exit) linux.close(f); + + string[string] variables; + + variables["channel"] = channelPrefix ~ ("channel" in cgi.get ? cgi.get["channel"] : ""); + if("minimum-time" in cgi.get) + variables["minimum-time"] = cgi.get["minimum-time"]; + if("last-message-id" in cgi.get) + variables["last-message-id"] = cgi.get["last-message-id"]; + + bool isSse; + + if(cgi.accept == "text/event-stream") { + cgi.setResponseContentType("text/event-stream"); + isSse = true; + if(cgi.lastEventId.length) + variables["last-message-id"] = cgi.lastEventId; + + cgi.write(""); + cgi.flush(); // sending the headers along + } else { + // gotta handle it as ajax polling + variables["close-time"] = "0"; // ask for long polling + } + + writeToFd(f, encodeVariables(variables) ~ "\n"); + + string wegot; + + string[4096] buffer; + + for(;;) { + int num = linux.read(f, buffer.ptr, buffer.length); + if(num < 0) + throw new Exception("read error"); + if(num == 0) + break; + + auto chunk = buffer[0 .. num]; + if(isSse) { + cgi.write(chunk); + cgi.flush(); + } else { + wegot ~= cast(string) chunk; + } + } + + if(!isSse) { + // FIXME + // we have to parse it out and reformat for plain cgi... + cgi.write("LAME LAME LAME\n"); + cgi.write(wegot); + return 1; + } + + return 0; +} + +version(rtud_daemon) : + +import arsd.netman; + +struct Message { + string type; + string id; + string data; + long timestamp; + long ttl; + + string operation; +} + + +// Real time update daemon +/* + You push messages out to channels, where they are held for a certain length of time. + + It can also do state with listener updates. + + Clients ask for messages since a time, and if there are none, you hold the connection until something arrives. + + + There should be D and Javascript apis for pushing and receiving. + + + JS: + + var updateObject = RealTimeUpdate(); + + updateObject.someMessage = function(msg) { + // react to it + } + + updateObject.listen(channel); + + updateObject.send(message, args); // probably shouldn't need this from JS +*/ + +/* + Incoming Packet format is x-www-urlencoded. There must be no new lines + in there - be sure to url encode them. + + A message is separated by newlines. +*/ + +class RtudConnection : Connection { + RealTimeUpdateDaemon daemon; + + this(RealTimeUpdateDaemon daemon) { + this.daemon = daemon; + } + + override void onDataReceived() { + import arsd.cgi;// : decodeVariables; + try_again: + auto data = cast(string) read(); + + auto index = data.indexOf("\n"); + if(index == -1) + return; // wait for more data + + auto messageRaw = data[0 .. index]; + changeReadPosition(index + 1); + + auto message = decodeVariables(messageRaw); + + handleMessage(message); + goto try_again; + } + + invariant() { + assert(daemon !is null); + } + + abstract void handleMessage(string[][string] message); +} + +class NotificationConnection : RtudConnection { + this(RealTimeUpdateDaemon daemon) { + super(daemon); + closeTime = long.max; + } + + long closeTime; + + /// send: what channels you're interested in, a minimum time, + /// and a close time. + /// if the close time is negative, you are just polling curiously. + /// if it is zero, it will close after your next batch. (long polling) + /// anything else stays open for as long as it can in there. + + override void handleMessage(string[][string] message) { + Channel*[] channels; + + if("channels" in message) { + foreach(ch; message["channels"]) { + auto channel = daemon.getChannel(ch); + channels ~= channel; + channel.subscribeTo(this); + } + } + + if("channel" in message) { + auto channel = daemon.getChannel(message["channel"][$-1]); + channels ~= channel; + channel.subscribeTo(this); + } + + import std.algorithm; + import std.range; + + Message*[] backMessages; + + if("last-message-id" in message) { + foreach(channel; channels) + backMessages ~= channel.messages; + + auto bm = sort!"a.timestamp < b.timestamp"(backMessages); + + backMessages = array(find!("a.id == b")(bm, message["last-message-id"][$-1])); + if(backMessages.length) + backMessages = backMessages[1 .. $]; // the last message is the one they got + } else if("minimum-time" in message) { + foreach(channel; channels) + backMessages ~= channel.messages; + + auto bm = sort!"a.timestamp < b.timestamp"(backMessages); + + backMessages = array(find!("a.timestamp > b")(bm, to!long(message["minimum-time"][$-1]))); + } + + if("close-time" in message) + closeTime = to!long(message["close-time"][$-1]); + + // send the back messages immediately + daemon.writeMessagesTo(backMessages, this, "backed-up"); + +// if(closeTime > 0 && closeTime != long.max) +// closeTime = getUTCtime() + closeTime; // FIXME: do i use this? Should I use this? + } + + override void onDisconnect() { + daemon.removeConnection(this); + } + +} + +class DataConnection : RtudConnection { + this(RealTimeUpdateDaemon daemon) { + super(daemon); + } + + override void handleMessage(string[][string] message) { + string getStr(string key, string def) { + if(key in message) + return message[key][$ - 1]; + return def; + } + + string operation = getStr("operation", "post"); + + Message* m = daemon.getMessage(getStr("id", null)); + switch(operation) { + default: throw new Exception("unknown operation " ~ operation); break; + case "delete": + daemon.deleteMessage(m); + break; + case "edit": + case "post": + // we have to create the message and send it out + m.type = getStr("type", "message"); + m.data = getStr("data", ""); + m.timestamp = to!long(getStr("timestamp", to!string(getUTCtime()))); + m.ttl = to!long(getStr("ttl", "1000")); + } + + assert(m !is null); + + if("channels" in message) + foreach(ch; message["channels"]) { + auto channel = daemon.getChannel(ch); + assert(channel !is null); + channel.writeMessage(m, operation); + } + + if("channel" in message) { + auto channel = daemon.getChannel(message["channel"][$-1]); + channel.writeMessage(m, operation); + } + } +} + +struct Channel { + string id; + Message*[] messages; + + // a poor man's set... + NotificationConnection[NotificationConnection] listeningConnections; + + + RealTimeUpdateDaemon daemon; + + void writeMessage(Message* message, string operation) { + messages ~= message; + foreach(k, v; listeningConnections) + daemon.writeMessagesTo([message], v, operation); + } + + void subscribeTo(NotificationConnection c) { + listeningConnections[c] = c; + } +} + + +class RealTimeUpdateDaemon : NetworkManager { + this() { + super(); + setConnectionSpawner(7070, &createNotificationConnection); + listen(7070); + setConnectionSpawner(7071, &createDataConnection); + listen(7071); + } + + private Channel*[string] channels; + private Message*[string] messages; + + Message* getMessage(string id) { + if(id.length && id in messages) + return messages[id]; + + if(id.length == 0) + id = to!string(getUTCtime()); + + longerId: + if(id in messages) { + id ~= "-"; + goto longerId; + } + + + auto message = new Message; + message.id = id; + messages[id] = message; + + return message; + } + + void deleteMessage(Message* m) { + messages.remove(m.id); + foreach(k, v; channels) + foreach(i, msg; v.messages) { + if(msg is m) { + v.messages = v.messages[0 .. i] ~ v.messages[i + 1 .. $]; + break; + } + } + } + + Channel* getChannel(string id) { + if(id in channels) + return channels[id]; + + auto c = new Channel; + c.daemon = this; + c.id = id; + channels[id] = c; + return c; + } + + void writeMessagesTo(Message*[] messages, NotificationConnection connection, string operation) { + foreach(messageMain; messages) { + if(messageMain.timestamp + messageMain.ttl < getUTCtime) + deleteMessage(messageMain); // too old, kill it + Message message = *messageMain; + message.operation = operation; + connection.write("id: " ~ message.id ~ "\n"); + connection.write("event: " ~ message.type ~ "\n"); + connection.write(":timestamp: " ~ to!string(message.timestamp) ~ "\n"); + connection.write(":ttl: " ~ to!string(message.ttl) ~ "\n"); + connection.write(":operation: " ~ message.operation ~ "\n"); + connection.write("data: " ~ replace(message.data, "\n", "\ndata: ") ~ "\n"); + connection.write("\n"); + } + + if(connection.closeTime <= 0) // FIXME: other times? + if(connection.closeTime != 0 || messages.length) + connection.disconnect(); // note this actually queues a disconnect, so we cool + } + + void removeConnection(NotificationConnection connection) { + foreach(channel; channels) + channel.listeningConnections.remove(connection); + } + + Connection createNotificationConnection() { + return new NotificationConnection(this); + } + + Connection createDataConnection() { + return new DataConnection(this); + } +} + +void rtudMain() { + auto netman = new RealTimeUpdateDaemon; + + bool proceed = true; + + while(proceed) + try + proceed = netman.proceed(); + catch(ConnectionException e) { + writeln(e.toString()); + e.c.disconnectNow(); + } + catch(Throwable e) { + + + writeln(e.toString()); + } +} + +version(standalone_rtud) +void main() { + rtudMain(); +}