/** OBSOLETE: This provides a kind of real time updates that can be consumed by javascript (and probably other things eventually). Superseded by new functionality built into [arsd.cgi]. First, you compile the server app. dmd -version=standalone_rtud -version=rtud_daemon Run it. It's pretty generic; probably don't have to modify it but you always can. It's useful to have a long running process anyway. But then you'll want an intermediary between this and the javascript. Use the handleListenerGateway() function in your cgi app for that. You can pass it a channel prefix for things like user ids. In your javascript, use EventListener("path to gateway"); And addEventListener(type, function, false); Note: this javascript does not work in all browsers, but real time updates should really be optional anyway. I might add a traditional ajax fallback but still, it's all js so be sure it's non-essential if possible. And in your app, as events happen, use in D: auto stream = new UpdateStream(channel); stream.sendMessage(type, message); and the helper app will push it all out. You might want to wrap some of this in try/catch since if the helper app dies, this will throw since it can't connect. I found using user names as channels is good stuff. Then your JS doesn't provide a channel at all - your helper app gives it through the channel prefix argument. */ module arsd.rtud; import std.string; import std.array : replace; import std.conv; import std.date; class UpdateStream { File net; string channel; this(string channel) { net = openNetwork("localhost", 7071); this.channel = channel; } ~this() { net.close(); } void deleteMessage(string messageId) { import arsd.cgi; // : encodeVariables; string message = encodeVariables([ "id" : messageId, "operation" : "delete" ]); net.writeln(message); net.flush(); } void sendMessage(string eventType, string messageText, long ttl = 2500) { import arsd.cgi; // : encodeVariables; string message = encodeVariables([ //"operation" : "post", //"id" : ????, "channel" : channel, "type" : eventType, "data" : messageText, "ttl" : to!string(ttl) ]); net.writeln(message); net.flush(); } } /+ if("channels" in message) { if("last-message-id" in message) if("minimum-time" in message) if("close-time" in message) +/ version(D_Version2) { static import linux = core.sys.posix.unistd; static import sock = core.sys.posix.sys.socket; } else { static import linux = std.c.linux.linux; static import sock = std.c.linux.socket; } int openNetworkFd(string host, ushort port) { import std.exception; auto h = enforce( sock.gethostbyname(std.string.toStringz(host)), new StdioException("gethostbyname")); int s = sock.socket(sock.AF_INET, sock.SOCK_STREAM, 0); enforce(s != -1, new StdioException("socket")); scope(failure) { linux.close(s); } sock.sockaddr_in addr; addr.sin_family = sock.AF_INET; addr.sin_port = sock.htons(port); std.c.string.memcpy(&addr.sin_addr.s_addr, h.h_addr, h.h_length); enforce(sock.connect(s, cast(sock.sockaddr*) &addr, addr.sizeof) != -1, new StdioException("Connect failed")); return s; } void writeToFd(int fd, string s) { again: auto num = linux.write(fd, s.ptr, s.length); if(num < 0) throw new Exception("couldn't write"); if(num == 0) return; s = s[num .. $]; if(s.length) goto again; } __gshared bool deathRequested = false; extern(C) void requestDeath(int sig) { deathRequested = true; } import arsd.cgi; /// The throttledConnection param is useful for helping to get /// around browser connection limitations. /// If the user opens a bunch of tabs, these long standing /// connections can hit the per-host connection limit, breaking /// navigation until the connection times out. /// The throttle option sets a long retry period and polls /// instead of waits. This sucks, but sucks less than your whole /// site hanging because the browser is queuing your connections! int handleListenerGateway(Cgi cgi, string channelPrefix, bool throttledConnection = false) { cgi.setCache(false); import core.sys.posix.signal; sigaction_t act; // I want all zero everywhere else; the read() must not automatically restart for this to work. act.sa_handler = &requestDeath; if(linux.sigaction(linux.SIGTERM, &act, null) != 0) throw new Exception("sig err"); auto f = openNetworkFd("localhost", 7070); scope(exit) linux.close(f); string[string] variables; variables["channel"] = channelPrefix ~ ("channel" in cgi.get ? cgi.get["channel"] : ""); if("minimum-time" in cgi.get) variables["minimum-time"] = cgi.get["minimum-time"]; if("last-message-id" in cgi.get) variables["last-message-id"] = cgi.get["last-message-id"]; bool isSse; if(cgi.accept == "text/event-stream") { cgi.setResponseContentType("text/event-stream"); isSse = true; if(cgi.lastEventId.length) variables["last-message-id"] = cgi.lastEventId; if(throttledConnection) { cgi.write("retry: 15000\n"); } else { cgi.write(":\n"); // the comment ensures apache doesn't skip us } cgi.flush(); // sending the headers along } else { // gotta handle it as ajax polling variables["close-time"] = "0"; // ask for long polling } if(throttledConnection) variables["close-time"] = "-1"; // close immediately writeToFd(f, encodeVariables(variables) ~ "\n"); string wegot; string[4096] buffer; for(; !deathRequested ;) { auto num = linux.read(f, buffer.ptr, buffer.length); if(num < 0) throw new Exception("read error"); if(num == 0) break; auto chunk = buffer[0 .. num]; if(isSse) { cgi.write(chunk); cgi.flush(); } else { wegot ~= cast(string) chunk; } } // this is to support older browsers if(!isSse && !deathRequested) { // we have to parse it out and reformat for plain cgi... auto lol = parseMessages(wegot); //cgi.setResponseContentType("text/json"); // FIXME gotta reorganize my json stuff //cgi.write(toJson(lol)); return 1; } return 0; } struct Message { string type; string id; string data; long timestamp; long ttl; string operation; } Message[] getMessages(string channel, string eventTypeFilter = null, long maxAge = 0) { auto f = openNetworkFd("localhost", 7070); scope(exit) linux.close(f); string[string] variables; variables["channel"] = channel; if(maxAge) variables["minimum-time"] = to!string(getUtcTime() - maxAge); variables["close-time"] = "-1"; // close immediately writeToFd(f, encodeVariables(variables) ~ "\n"); string wegot; string[4096] buffer; for(;;) { auto num = linux.read(f, buffer.ptr, buffer.length); if(num < 0) throw new Exception("read error"); if(num == 0) break; auto chunk = buffer[0 .. num]; wegot ~= cast(string) chunk; } return parseMessages(wegot, eventTypeFilter); } Message[] parseMessages(string wegot, string eventTypeFilter = null) { // gotta parse this since rtud writes out the format for browsers Message[] ret; foreach(message; wegot.split("\n\n")) { Message m; foreach(line; message.split("\n")) { if(line.length == 0) throw new Exception("wtf"); if(line[0] == ':') line = line[1 .. $]; if(line.length == 0) continue; // just an empty comment auto idx = line.indexOf(":"); if(idx == -1) continue; // probably just a comment if(idx + 2 > line.length) continue; // probably just a comment too auto name = line[0 .. idx]; auto data = line[idx + 2 .. $]; switch(name) { default: break; // do nothing case "timestamp": if(data.length) m.timestamp = to!long(data); break; case "ttl": if(data.length) m.ttl = to!long(data); break; case "operation": m.operation = data; break; case "id": m.id = data; break; case "event": m.type = data; break; case "data": m.data ~= data; break; } } if(eventTypeFilter is null || eventTypeFilter == m.type) ret ~= m; } return ret; } version(rtud_daemon) : import arsd.netman; // Real time update daemon /* You push messages out to channels, where they are held for a certain length of time. It can also do state with listener updates. Clients ask for messages since a time, and if there are none, you hold the connection until something arrives. There should be D and Javascript apis for pushing and receiving. JS: var updateObject = RealTimeUpdate(); updateObject.someMessage = function(msg) { // react to it } updateObject.listen(channel); updateObject.send(message, args); // probably shouldn't need this from JS */ /* Incoming Packet format is x-www-urlencoded. There must be no new lines in there - be sure to url encode them. A message is separated by newlines. */ class RtudConnection : Connection { RealTimeUpdateDaemon daemon; this(RealTimeUpdateDaemon daemon) { this.daemon = daemon; } override void onDataReceived() { import arsd.cgi;// : decodeVariables; try_again: auto data = cast(string) read(); auto index = data.indexOf("\n"); if(index == -1) return; // wait for more data auto messageRaw = data[0 .. index]; changeReadPosition(index + 1); auto message = decodeVariables(messageRaw); handleMessage(message); goto try_again; } invariant() { assert(daemon !is null); } abstract void handleMessage(string[][string] message); } class NotificationConnection : RtudConnection { this(RealTimeUpdateDaemon daemon) { super(daemon); closeTime = long.max; } long closeTime; /// send: what channels you're interested in, a minimum time, /// and a close time. /// if the close time is negative, you are just polling curiously. /// if it is zero, it will close after your next batch. (long polling) /// anything else stays open for as long as it can in there. override void handleMessage(string[][string] message) { Channel*[] channels; if("channels" in message) { foreach(ch; message["channels"]) { auto channel = daemon.getChannel(ch); channels ~= channel; channel.subscribeTo(this); } } if("channel" in message) { auto channel = daemon.getChannel(message["channel"][$-1]); channels ~= channel; channel.subscribeTo(this); } import std.algorithm; import std.range; Message*[] backMessages; if("last-message-id" in message) { auto lastMessageId = message["last-message-id"][$-1]; foreach(channel; channels) backMessages ~= channel.messages; auto bm = sort!"a.timestamp < b.timestamp"(backMessages); backMessages = array(find!("a.id == b")(bm, lastMessageId)); while(backMessages.length && backMessages[0].id == lastMessageId) backMessages = backMessages[1 .. $]; // the last message is the one they got //writeln("backed up from ", lastMessageId, " is"); //foreach(msg; backMessages) //writeln(*msg); } else if("minimum-time" in message) { foreach(channel; channels) backMessages ~= channel.messages; auto bm = sort!"a.timestamp < b.timestamp"(backMessages); backMessages = array(find!("a.timestamp >= b")(bm, to!long(message["minimum-time"][$-1]))); } if("close-time" in message) closeTime = to!long(message["close-time"][$-1]); // send the back messages immediately daemon.writeMessagesTo(backMessages, this, "backed-up"); // if(closeTime > 0 && closeTime != long.max) // closeTime = getUtcTime() + closeTime; // FIXME: do i use this? Should I use this? } override void onDisconnect() { daemon.removeConnection(this); } } class DataConnection : RtudConnection { this(RealTimeUpdateDaemon daemon) { super(daemon); } override void handleMessage(string[][string] message) { string getStr(string key, string def) { if(key in message) { auto s = message[key][$ - 1]; if(s.length) return s; } return def; } string operation = getStr("operation", "post"); Message* m = daemon.getMessage(getStr("id", null)); switch(operation) { default: throw new Exception("unknown operation " ~ operation); break; case "delete": daemon.deleteMessage(m); break; case "edit": case "post": // we have to create the message and send it out m.type = getStr("type", "message"); m.data = getStr("data", ""); m.timestamp = to!long(getStr("timestamp", to!string(getUtcTime()))); m.ttl = to!long(getStr("ttl", "1000")); } assert(m !is null); if("channels" in message) foreach(ch; message["channels"]) { auto channel = daemon.getChannel(ch); assert(channel !is null); channel.writeMessage(m, operation); } if("channel" in message) { auto channel = daemon.getChannel(message["channel"][$-1]); channel.writeMessage(m, operation); } } } struct Channel { string id; Message*[] messages; // a poor man's set... NotificationConnection[NotificationConnection] listeningConnections; RealTimeUpdateDaemon daemon; void writeMessage(Message* message, string operation) { messages ~= message; foreach(k, v; listeningConnections) daemon.writeMessagesTo([message], v, operation); } void subscribeTo(NotificationConnection c) { listeningConnections[c] = c; } } class RealTimeUpdateDaemon : NetworkManager { this() { super(); setConnectionSpawner(7070, &createNotificationConnection); listen(7070); setConnectionSpawner(7071, &createDataConnection); listen(7071); } private Channel*[string] channels; private Message*[string] messages; Message* getMessage(string id) { if(id.length && id in messages) return messages[id]; if(id.length == 0) id = to!string(getUtcTime()); longerId: if(id in messages) { id ~= "-"; goto longerId; } auto message = new Message; message.id = id; messages[id] = message; //writeln("NEW MESSAGE: ", *message); return message; } void deleteMessage(Message* m) { messages.remove(m.id); foreach(k, v; channels) foreach(i, msg; v.messages) { if(msg is m) { v.messages = v.messages[0 .. i] ~ v.messages[i + 1 .. $]; break; } } } Channel* getChannel(string id) { if(id in channels) return channels[id]; auto c = new Channel; c.daemon = this; c.id = id; channels[id] = c; return c; } void writeMessagesTo(Message*[] messages, NotificationConnection connection, string operation) { foreach(messageMain; messages) { if(messageMain.timestamp + messageMain.ttl < getUtcTime) deleteMessage(messageMain); // too old, kill it Message message = *messageMain; message.operation = operation; // this should never happen, but just in case replace(message.type, "\n", ""); connection.write(":timestamp: " ~ to!string(message.timestamp) ~ "\n"); connection.write(":ttl: " ~ to!string(message.ttl) ~ "\n"); connection.write(":operation: " ~ message.operation ~ "\n"); if(message.id.length) connection.write("id: " ~ message.id ~ "\n"); connection.write("event: " ~ message.type ~ "\n"); connection.write("data: " ~ replace(message.data, "\n", "\ndata: ") ~ "\n"); connection.write("\n"); } if(connection.closeTime <= 0) // FIXME: other times? if(connection.closeTime != 0 || messages.length) connection.disconnect(); // note this actually queues a disconnect, so we cool } void removeConnection(NotificationConnection connection) { foreach(channel; channels) channel.listeningConnections.remove(connection); } Connection createNotificationConnection() { return new NotificationConnection(this); } Connection createDataConnection() { return new DataConnection(this); } } void rtudMain() { auto netman = new RealTimeUpdateDaemon; bool proceed = true; while(proceed) try proceed = netman.proceed(); catch(ConnectionException e) { writeln(e.toString()); e.c.disconnectNow(); } catch(Throwable e) { writeln(e.toString()); } } version(standalone_rtud) void main() { rtudMain(); }