mirror of https://github.com/adamdruppe/arsd.git
catchup
This commit is contained in:
parent
e2e8a917e5
commit
38c84d7fa2
665
cgi.d
665
cgi.d
|
@ -738,13 +738,11 @@ class Cgi {
|
|||
this.postJson = null;
|
||||
}
|
||||
|
||||
version(Posix)
|
||||
int getOutputFileHandle() {
|
||||
CgiConnectionHandle getOutputFileHandle() {
|
||||
return _outputFileHandle;
|
||||
}
|
||||
|
||||
version(Posix)
|
||||
int _outputFileHandle = -1;
|
||||
CgiConnectionHandle _outputFileHandle = INVALID_CGI_CONNECTION_HANDLE;
|
||||
|
||||
/** Initializes it using a CGI or CGI-like interface */
|
||||
this(long maxContentLength = defaultMaxContentLength,
|
||||
|
@ -1122,6 +1120,12 @@ class Cgi {
|
|||
} else if(pps.contentType == "multipart/form-data") {
|
||||
pps.isMultipart = true;
|
||||
enforce(pps.boundary.length, "no boundary");
|
||||
} else if(pps.contentType == "text/plain") {
|
||||
pps.isMultipart = false;
|
||||
pps.isJson = true; // FIXME: hack, it isn't actually this
|
||||
} else if(pps.contentType == "text/xml") { // FIXME: what if we used this as a fallback?
|
||||
pps.isMultipart = false;
|
||||
pps.isJson = true; // FIXME: hack, it isn't actually this
|
||||
} else if(pps.contentType == "application/json") {
|
||||
pps.isJson = true;
|
||||
pps.isMultipart = false;
|
||||
|
@ -4504,53 +4508,32 @@ void sendToWebsocketServer(string content, string group) {
|
|||
|
||||
|
||||
void runEventServer()() {
|
||||
runAddonServer("/tmp/arsd_cgi_event_server");
|
||||
runAddonServer("/tmp/arsd_cgi_event_server", new EventSourceServer());
|
||||
}
|
||||
|
||||
// sends this cgi request to the event server so it will be fed events. You should not do anything else with the cgi object after this.
|
||||
void sendConnectionToEventServer()(Cgi cgi, in char[] eventUrl) {
|
||||
|
||||
cgi.setResponseContentType("text/event-stream");
|
||||
cgi.write(":\n"); // to initialize the chunking and send headers before keeping the fd for later
|
||||
cgi.flush();
|
||||
|
||||
cgi.closed = true;
|
||||
auto s = openLocalServerConnection("/tmp/arsd_cgi_event_server");
|
||||
scope(exit)
|
||||
closeLocalServerConnection(s);
|
||||
|
||||
version(fastcgi)
|
||||
static assert(0, "sending fcgi connections not supported");
|
||||
|
||||
int fd = cgi.getOutputFileHandle();
|
||||
if(fd == -1)
|
||||
throw new Exception("bad fd from cgi!");
|
||||
|
||||
char[1024] buffer;
|
||||
buffer[0] = cgi.responseChunked ? 1 : 0;
|
||||
|
||||
buffer[1 .. eventUrl.length + 1] = eventUrl[];
|
||||
|
||||
auto res = write_fd(s, buffer.ptr, 1 + eventUrl.length, fd);
|
||||
assert(res == 1 + eventUrl.length);
|
||||
}
|
||||
|
||||
void sendEventToEventServer()(string url, string event, string data, int lifetime) {
|
||||
auto s = openLocalServerConnection("/tmp/arsd_cgi_event_server");
|
||||
scope(exit)
|
||||
closeLocalServerConnection(s);
|
||||
|
||||
SendableEvent sev;
|
||||
sev.populate(url, event, data, lifetime);
|
||||
|
||||
auto ret = send(s, &sev, sev.sizeof, 0);
|
||||
assert(ret == sev.sizeof);
|
||||
}
|
||||
|
||||
version(Posix)
|
||||
version(Posix) {
|
||||
alias LocalServerConnectionHandle = int;
|
||||
else version(Windows)
|
||||
alias CgiConnectionHandle = int;
|
||||
alias SocketConnectionHandle = int;
|
||||
|
||||
enum INVALID_CGI_CONNECTION_HANDLE = -1;
|
||||
} else version(Windows) {
|
||||
alias LocalServerConnectionHandle = HANDLE;
|
||||
version(embedded_httpd) {
|
||||
alias CgiConnectionHandle = SOCKET;
|
||||
enum INVALID_CGI_CONNECTION_HANDLE = INVALID_SOCKET;
|
||||
} else version(fastcgi) {
|
||||
alias CgiConnectionHandle = void*; // Doesn't actually work! But I don't want compile to fail pointlessly at this point.
|
||||
enum INVALID_CGI_CONNECTION_HANDLE = null;
|
||||
} else version(scgi) {
|
||||
alias CgiConnectionHandle = HANDLE;
|
||||
enum INVALID_CGI_CONNECTION_HANDLE = null;
|
||||
} else { /* version(plain_cgi) */
|
||||
alias CgiConnectionHandle = HANDLE;
|
||||
enum INVALID_CGI_CONNECTION_HANDLE = null;
|
||||
}
|
||||
alias SocketConnectionHandle = SOCKET;
|
||||
}
|
||||
|
||||
LocalServerConnectionHandle openLocalServerConnection(string name) {
|
||||
version(Posix) {
|
||||
|
@ -4582,6 +4565,8 @@ LocalServerConnectionHandle openLocalServerConnection(string name) {
|
|||
throw new Exception("connect " ~ to!string(errno));
|
||||
|
||||
return sock;
|
||||
} else version(Windows) {
|
||||
return null; // FIXME
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -4594,15 +4579,6 @@ void closeLocalServerConnection(LocalServerConnectionHandle handle) {
|
|||
}
|
||||
|
||||
void runSessionServer()() {
|
||||
/+
|
||||
The session server api should prolly be:
|
||||
|
||||
setSessionValues
|
||||
getSessionValues
|
||||
changeSessionId
|
||||
createSession
|
||||
destroySesson
|
||||
+/
|
||||
assert(0, "not implemented");
|
||||
}
|
||||
|
||||
|
@ -4624,6 +4600,10 @@ struct IoOp {
|
|||
@disable this();
|
||||
@disable this(this);
|
||||
|
||||
/*
|
||||
So we want to be able to eventually handle generic sockets too.
|
||||
*/
|
||||
|
||||
enum Read = 1;
|
||||
enum Write = 2;
|
||||
enum Accept = 3;
|
||||
|
@ -4631,9 +4611,9 @@ struct IoOp {
|
|||
|
||||
// Your handler may be called in a different thread than the one that initiated the IO request!
|
||||
// It is also possible to have multiple io requests being called simultaneously. Use proper thread safety caution.
|
||||
private void function(IoOp*, int) handler;
|
||||
private void function(IoOp*) closeHandler;
|
||||
private void function(IoOp*) completeHandler;
|
||||
private void delegate(IoOp*, int) handler;
|
||||
private void delegate(IoOp*) closeHandler;
|
||||
private void delegate(IoOp*) completeHandler;
|
||||
private int internalFd;
|
||||
private int operation;
|
||||
private int bufferLengthAllocated;
|
||||
|
@ -4657,7 +4637,7 @@ struct IoOp {
|
|||
}
|
||||
}
|
||||
|
||||
IoOp* allocateIoOp(int fd, int operation, int bufferSize, void function(IoOp*, int) handler) {
|
||||
IoOp* allocateIoOp(int fd, int operation, int bufferSize, void delegate(IoOp*, int) handler) {
|
||||
import core.stdc.stdlib;
|
||||
|
||||
auto ptr = malloc(IoOp.sizeof + bufferSize);
|
||||
|
@ -4681,156 +4661,389 @@ void freeIoOp(ref IoOp* ptr) {
|
|||
ptr = null;
|
||||
}
|
||||
|
||||
///
|
||||
struct SendableEvent {
|
||||
int urlLength;
|
||||
char[256] urlBuffer = 0;
|
||||
int typeLength;
|
||||
char[32] typeBuffer = 0;
|
||||
int messageLength;
|
||||
char[2048] messageBuffer = 0;
|
||||
int _lifetime;
|
||||
|
||||
char[] message() {
|
||||
return messageBuffer[0 .. messageLength];
|
||||
}
|
||||
char[] type() {
|
||||
return typeBuffer[0 .. typeLength];
|
||||
}
|
||||
char[] url() {
|
||||
return urlBuffer[0 .. urlLength];
|
||||
}
|
||||
int lifetime() {
|
||||
return _lifetime;
|
||||
}
|
||||
|
||||
///
|
||||
void populate(string url, string type, string message, int lifetime)
|
||||
in {
|
||||
assert(url.length < this.urlBuffer.length);
|
||||
assert(type.length < this.typeBuffer.length);
|
||||
assert(message.length < this.messageBuffer.length);
|
||||
}
|
||||
do {
|
||||
this.urlLength = cast(int) url.length;
|
||||
this.typeLength = cast(int) type.length;
|
||||
this.messageLength = cast(int) message.length;
|
||||
this._lifetime = lifetime;
|
||||
|
||||
this.urlBuffer[0 .. url.length] = url[];
|
||||
this.typeBuffer[0 .. type.length] = type[];
|
||||
this.messageBuffer[0 .. message.length] = message[];
|
||||
}
|
||||
}
|
||||
|
||||
struct EventConnection {
|
||||
int fd;
|
||||
bool needsChunking;
|
||||
}
|
||||
|
||||
private EventConnection[][string] eventConnectionsByUrl;
|
||||
|
||||
private void handleInputEvent(scope SendableEvent* event) {
|
||||
static int eventId;
|
||||
|
||||
static struct StoredEvent {
|
||||
int id;
|
||||
string type;
|
||||
string message;
|
||||
int lifetimeRemaining;
|
||||
}
|
||||
|
||||
StoredEvent[][string] byUrl;
|
||||
|
||||
int thisId = ++eventId;
|
||||
|
||||
if(event.lifetime)
|
||||
byUrl[event.url.idup] ~= StoredEvent(thisId, event.type.idup, event.message.idup, event.lifetime);
|
||||
|
||||
auto connectionsPtr = event.url in eventConnectionsByUrl;
|
||||
EventConnection[] connections;
|
||||
if(connectionsPtr is null)
|
||||
return;
|
||||
else
|
||||
connections = *connectionsPtr;
|
||||
|
||||
char[4096] buffer;
|
||||
char[] formattedMessage;
|
||||
|
||||
void append(const char[] a) {
|
||||
// the 6's here are to leave room for a HTTP chunk header, if it proves necessary
|
||||
buffer[6 + formattedMessage.length .. 6 + formattedMessage.length + a.length] = a[];
|
||||
formattedMessage = buffer[6 .. 6 + formattedMessage.length + a.length];
|
||||
}
|
||||
|
||||
/*
|
||||
rawDataOutput(cast(const(ubyte)[]) toHex(t.length));
|
||||
rawDataOutput(cast(const(ubyte)[]) "\r\n");
|
||||
rawDataOutput(cast(const(ubyte)[]) t);
|
||||
rawDataOutput(cast(const(ubyte)[]) "\r\n");
|
||||
*/
|
||||
import std.algorithm.iteration;
|
||||
|
||||
if(connections.length) {
|
||||
append("id: ");
|
||||
append(to!string(thisId));
|
||||
append("\n");
|
||||
|
||||
append("event: ");
|
||||
append(event.type);
|
||||
append("\n");
|
||||
|
||||
foreach(line; event.message.splitter("\n")) {
|
||||
append("data: ");
|
||||
append(line);
|
||||
append("\n");
|
||||
}
|
||||
|
||||
append("\n");
|
||||
}
|
||||
|
||||
// chunk it for HTTP!
|
||||
auto len = toHex(formattedMessage.length);
|
||||
buffer[4 .. 6] = "\r\n"[];
|
||||
buffer[4 - len.length .. 4] = len[];
|
||||
|
||||
auto chunkedMessage = buffer[4 - len.length .. 6 + formattedMessage.length];
|
||||
// done
|
||||
|
||||
// FIXME: send back requests when needed
|
||||
// FIXME: send a single ":\n" every 15 seconds to keep alive
|
||||
|
||||
foreach(connection; connections) {
|
||||
if(connection.needsChunking)
|
||||
nonBlockingWrite(connection.fd, chunkedMessage);
|
||||
else
|
||||
nonBlockingWrite(connection.fd, formattedMessage);
|
||||
}
|
||||
}
|
||||
|
||||
void nonBlockingWrite(int connection, const void[] data) {
|
||||
version(Posix)
|
||||
void nonBlockingWrite(EventIoServer eis, int connection, const void[] data) {
|
||||
import core.sys.posix.unistd;
|
||||
|
||||
auto ret = write(connection, data.ptr, data.length);
|
||||
// FIXME: what if the file closed?
|
||||
if(ret != data.length) {
|
||||
if(ret == 0 || errno == EPIPE) {
|
||||
// the file is closed, remove it
|
||||
outer: foreach(url, ref connections; eventConnectionsByUrl) {
|
||||
foreach(idx, conn; connections) {
|
||||
if(conn.fd == connection) {
|
||||
connections[idx] = connections[$-1];
|
||||
connections = connections[0 .. $ - 1];
|
||||
continue outer;
|
||||
}
|
||||
}
|
||||
}
|
||||
eis.fileClosed(connection);
|
||||
} else
|
||||
throw new Exception("alas " ~ to!string(ret) ~ " " ~ to!string(errno)); // FIXME
|
||||
}
|
||||
}
|
||||
version(Windows)
|
||||
void nonBlockingWrite(EventIoServer eis, int connection, const void[] data) {
|
||||
// FIXME
|
||||
}
|
||||
|
||||
void runAddonServer()(string name) {
|
||||
bool isInvalidHandle(CgiConnectionHandle h) {
|
||||
return h == INVALID_CGI_CONNECTION_HANDLE;
|
||||
}
|
||||
|
||||
/++
|
||||
You can customize your server by subclassing the appropriate server. Then, register your
|
||||
subclass at compile time with the [registerEventIoServer] template, or implement your own
|
||||
main function and call it yourself.
|
||||
|
||||
$(TIP If you make your subclass a `final class`, there is a slight performance improvement.)
|
||||
+/
|
||||
interface EventIoServer {
|
||||
void handleLocalConnectionData(IoOp* op, int receivedFd);
|
||||
void handleLocalConnectionClose(IoOp* op);
|
||||
void handleLocalConnectionComplete(IoOp* op);
|
||||
void wait_timeout();
|
||||
void fileClosed(int fd);
|
||||
}
|
||||
|
||||
final class BasicDataServer : EventIoServer {
|
||||
static struct ClientConnection {
|
||||
/+
|
||||
The session server api should prolly be:
|
||||
|
||||
setSessionValues
|
||||
getSessionValues
|
||||
changeSessionId
|
||||
createSession
|
||||
destroySesson
|
||||
+/
|
||||
|
||||
}
|
||||
|
||||
protected:
|
||||
|
||||
void handleLocalConnectionData(IoOp* op, int receivedFd);
|
||||
|
||||
void handleLocalConnectionClose(IoOp* op) {} // doesn't really matter, this is a fairly stateless go
|
||||
void handleLocalConnectionComplete(IoOp* op) {} // again, irrelevant
|
||||
void wait_timeout() {}
|
||||
void fileClosed(int fd) { assert(0); }
|
||||
|
||||
private:
|
||||
|
||||
static struct SendableDataRequest {
|
||||
enum Operation : ubyte {
|
||||
noop = 0,
|
||||
// on data itself
|
||||
get,
|
||||
set,
|
||||
append,
|
||||
increment,
|
||||
decrement,
|
||||
|
||||
// on the session
|
||||
createSession,
|
||||
changeSessionId,
|
||||
destroySession,
|
||||
}
|
||||
|
||||
char[16] sessionId;
|
||||
Operation operation;
|
||||
|
||||
int keyLength;
|
||||
char[128] keyBuffer;
|
||||
|
||||
int dataLength;
|
||||
ubyte[1000] dataBuffer;
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
final class EventSourceServer : EventIoServer {
|
||||
/++
|
||||
sends this cgi request to the event server so it will be fed events. You should not do anything else with the cgi object after this.
|
||||
|
||||
$(WARNING This API is extremely unstable. I might change it or remove it without notice.)
|
||||
|
||||
See_Also:
|
||||
[sendEvent]
|
||||
+/
|
||||
public static void adoptConnection(Cgi cgi, in char[] eventUrl) {
|
||||
/*
|
||||
If lastEventId is missing or empty, you just get new events as they come.
|
||||
|
||||
If it is set from something else, it sends all since then (that are still alive)
|
||||
down the pipe immediately.
|
||||
|
||||
The reason it can come from the header is that's what the standard defines for
|
||||
browser reconnects. The reason it can come from a query string is just convenience
|
||||
in catching up in a user-defined manner.
|
||||
|
||||
The reason the header overrides the query string is if the browser tries to reconnect,
|
||||
it will send the header AND the query (it reconnects to the same url), so we just
|
||||
want to do the restart thing.
|
||||
|
||||
Note that if you ask for "0" as the lastEventId, it will get ALL still living events.
|
||||
*/
|
||||
string lastEventId = cgi.lastEventId;
|
||||
if(lastEventId.length == 0 && "lastEventId" in cgi.get)
|
||||
lastEventId = cgi.get["lastEventId"];
|
||||
|
||||
cgi.setResponseContentType("text/event-stream");
|
||||
cgi.write(":\n", false); // to initialize the chunking and send headers before keeping the fd for later
|
||||
cgi.flush();
|
||||
|
||||
cgi.closed = true;
|
||||
auto s = openLocalServerConnection("/tmp/arsd_cgi_event_server");
|
||||
scope(exit)
|
||||
closeLocalServerConnection(s);
|
||||
|
||||
version(fastcgi)
|
||||
throw new Exception("sending fcgi connections not supported");
|
||||
|
||||
auto fd = cgi.getOutputFileHandle();
|
||||
if(isInvalidHandle(fd))
|
||||
throw new Exception("bad fd from cgi!");
|
||||
|
||||
SendableEventConnection sec;
|
||||
sec.populate(cgi.responseChunked, eventUrl, lastEventId);
|
||||
|
||||
version(Posix) {
|
||||
auto res = write_fd(s, cast(void*) &sec, sec.sizeof, fd);
|
||||
assert(res == sec.sizeof);
|
||||
} else version(Windows) {
|
||||
// FIXME
|
||||
}
|
||||
}
|
||||
|
||||
/++
|
||||
Sends an event to the event server, starting it if necessary. The event server will distribute it to any listening clients, and store it for `lifetime` seconds for any later listening clients to catch up later.
|
||||
|
||||
$(WARNING This API is extremely unstable. I might change it or remove it without notice.)
|
||||
|
||||
Params:
|
||||
url = A string identifying this event "bucket". Listening clients must also connect to this same string. I called it `url` because I envision it being just passed as the url of the request.
|
||||
event = the event type string, which is used in the Javascript addEventListener API on EventSource
|
||||
data = the event data. Available in JS as `event.data`.
|
||||
lifetime = the amount of time to keep this event for replaying on the event server.
|
||||
|
||||
See_Also:
|
||||
[sendEventToEventServer]
|
||||
+/
|
||||
public static void sendEvent(string url, string event, string data, int lifetime) {
|
||||
auto s = openLocalServerConnection("/tmp/arsd_cgi_event_server");
|
||||
scope(exit)
|
||||
closeLocalServerConnection(s);
|
||||
|
||||
SendableEvent sev;
|
||||
sev.populate(url, event, data, lifetime);
|
||||
|
||||
version(Posix) {
|
||||
auto ret = send(s, &sev, sev.sizeof, 0);
|
||||
assert(ret == sev.sizeof);
|
||||
} else version(Windows) {
|
||||
// FIXME
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
protected:
|
||||
|
||||
|
||||
|
||||
void handleLocalConnectionData(IoOp* op, int receivedFd) {
|
||||
if(receivedFd != -1) {
|
||||
//writeln("GOT FD ", receivedFd, " -- ", op.usedBuffer);
|
||||
|
||||
//core.sys.posix.unistd.write(receivedFd, "hello".ptr, 5);
|
||||
|
||||
SendableEventConnection* got = cast(SendableEventConnection*) op.usedBuffer.ptr;
|
||||
|
||||
auto url = got.url.idup;
|
||||
eventConnectionsByUrl[url] ~= EventConnection(receivedFd, got.responseChunked > 0 ? true : false);
|
||||
|
||||
// FIXME: catch up on past messages here
|
||||
} else {
|
||||
auto data = op.usedBuffer;
|
||||
auto event = cast(SendableEvent*) data.ptr;
|
||||
|
||||
handleInputEvent(event);
|
||||
}
|
||||
}
|
||||
void handleLocalConnectionClose(IoOp* op) {}
|
||||
void handleLocalConnectionComplete(IoOp* op) {}
|
||||
|
||||
void wait_timeout() {
|
||||
// just keeping alive
|
||||
foreach(url, connections; eventConnectionsByUrl)
|
||||
foreach(connection; connections)
|
||||
if(connection.needsChunking)
|
||||
nonBlockingWrite(this, connection.fd, "2\r\n:\n");
|
||||
else
|
||||
nonBlockingWrite(this, connection.fd, ":\n");
|
||||
}
|
||||
|
||||
void fileClosed(int fd) {
|
||||
outer: foreach(url, ref connections; eventConnectionsByUrl) {
|
||||
foreach(idx, conn; connections) {
|
||||
if(fd == conn.fd) {
|
||||
connections[idx] = connections[$-1];
|
||||
connections = connections[0 .. $ - 1];
|
||||
continue outer;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
private:
|
||||
|
||||
|
||||
struct SendableEventConnection {
|
||||
ubyte responseChunked;
|
||||
|
||||
int urlLength;
|
||||
char[256] urlBuffer = 0;
|
||||
|
||||
int lastEventIdLength;
|
||||
char[32] lastEventIdBuffer = 0;
|
||||
|
||||
char[] url() {
|
||||
return urlBuffer[0 .. urlLength];
|
||||
}
|
||||
char[] lastEventId() {
|
||||
return lastEventIdBuffer[0 .. lastEventIdLength];
|
||||
}
|
||||
void populate(bool responseChunked, in char[] url, in char[] lastEventId)
|
||||
in {
|
||||
assert(url.length < this.urlBuffer.length);
|
||||
assert(lastEventId.length < this.lastEventIdBuffer.length);
|
||||
}
|
||||
do {
|
||||
this.responseChunked = responseChunked ? 1 : 0;
|
||||
this.urlLength = cast(int) url.length;
|
||||
this.lastEventIdLength = cast(int) lastEventId.length;
|
||||
|
||||
this.urlBuffer[0 .. url.length] = url[];
|
||||
this.lastEventIdBuffer[0 .. lastEventId.length] = lastEventId[];
|
||||
}
|
||||
}
|
||||
|
||||
struct SendableEvent {
|
||||
int urlLength;
|
||||
char[256] urlBuffer = 0;
|
||||
int typeLength;
|
||||
char[32] typeBuffer = 0;
|
||||
int messageLength;
|
||||
char[2048] messageBuffer = 0;
|
||||
int _lifetime;
|
||||
|
||||
char[] message() {
|
||||
return messageBuffer[0 .. messageLength];
|
||||
}
|
||||
char[] type() {
|
||||
return typeBuffer[0 .. typeLength];
|
||||
}
|
||||
char[] url() {
|
||||
return urlBuffer[0 .. urlLength];
|
||||
}
|
||||
int lifetime() {
|
||||
return _lifetime;
|
||||
}
|
||||
|
||||
///
|
||||
void populate(string url, string type, string message, int lifetime)
|
||||
in {
|
||||
assert(url.length < this.urlBuffer.length);
|
||||
assert(type.length < this.typeBuffer.length);
|
||||
assert(message.length < this.messageBuffer.length);
|
||||
}
|
||||
do {
|
||||
this.urlLength = cast(int) url.length;
|
||||
this.typeLength = cast(int) type.length;
|
||||
this.messageLength = cast(int) message.length;
|
||||
this._lifetime = lifetime;
|
||||
|
||||
this.urlBuffer[0 .. url.length] = url[];
|
||||
this.typeBuffer[0 .. type.length] = type[];
|
||||
this.messageBuffer[0 .. message.length] = message[];
|
||||
}
|
||||
}
|
||||
|
||||
struct EventConnection {
|
||||
int fd;
|
||||
bool needsChunking;
|
||||
}
|
||||
|
||||
private EventConnection[][string] eventConnectionsByUrl;
|
||||
|
||||
private void handleInputEvent(scope SendableEvent* event) {
|
||||
static int eventId;
|
||||
|
||||
static struct StoredEvent {
|
||||
int id;
|
||||
string type;
|
||||
string message;
|
||||
int lifetimeRemaining;
|
||||
}
|
||||
|
||||
StoredEvent[][string] byUrl;
|
||||
|
||||
int thisId = ++eventId;
|
||||
|
||||
if(event.lifetime)
|
||||
byUrl[event.url.idup] ~= StoredEvent(thisId, event.type.idup, event.message.idup, event.lifetime);
|
||||
|
||||
auto connectionsPtr = event.url in eventConnectionsByUrl;
|
||||
EventConnection[] connections;
|
||||
if(connectionsPtr is null)
|
||||
return;
|
||||
else
|
||||
connections = *connectionsPtr;
|
||||
|
||||
char[4096] buffer;
|
||||
char[] formattedMessage;
|
||||
|
||||
void append(const char[] a) {
|
||||
// the 6's here are to leave room for a HTTP chunk header, if it proves necessary
|
||||
buffer[6 + formattedMessage.length .. 6 + formattedMessage.length + a.length] = a[];
|
||||
formattedMessage = buffer[6 .. 6 + formattedMessage.length + a.length];
|
||||
}
|
||||
|
||||
import std.algorithm.iteration;
|
||||
|
||||
if(connections.length) {
|
||||
append("id: ");
|
||||
append(to!string(thisId));
|
||||
append("\n");
|
||||
|
||||
append("event: ");
|
||||
append(event.type);
|
||||
append("\n");
|
||||
|
||||
foreach(line; event.message.splitter("\n")) {
|
||||
append("data: ");
|
||||
append(line);
|
||||
append("\n");
|
||||
}
|
||||
|
||||
append("\n");
|
||||
}
|
||||
|
||||
// chunk it for HTTP!
|
||||
auto len = toHex(formattedMessage.length);
|
||||
buffer[4 .. 6] = "\r\n"[];
|
||||
buffer[4 - len.length .. 4] = len[];
|
||||
|
||||
auto chunkedMessage = buffer[4 - len.length .. 6 + formattedMessage.length];
|
||||
// done
|
||||
|
||||
// FIXME: send back requests when needed
|
||||
// FIXME: send a single ":\n" every 15 seconds to keep alive
|
||||
|
||||
foreach(connection; connections) {
|
||||
if(connection.needsChunking)
|
||||
nonBlockingWrite(this, connection.fd, chunkedMessage);
|
||||
else
|
||||
nonBlockingWrite(this, connection.fd, formattedMessage);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void runAddonServer(EIS)(string localListenerName, EIS eis) if(is(EIS : EventIoServer)) {
|
||||
version(Posix) {
|
||||
|
||||
import core.sys.posix.unistd;
|
||||
|
@ -4840,30 +5053,6 @@ void runAddonServer()(string name) {
|
|||
import core.sys.posix.signal;
|
||||
signal(SIGPIPE, SIG_IGN);
|
||||
|
||||
static void handleLocalConnectionData(IoOp* op, int receivedFd) {
|
||||
if(receivedFd != -1) {
|
||||
//writeln("GOT FD ", receivedFd, " -- ", op.usedBuffer);
|
||||
|
||||
//core.sys.posix.unistd.write(receivedFd, "hello".ptr, 5);
|
||||
|
||||
string url = (cast(char[]) op.usedBuffer[1 .. $]).idup;
|
||||
eventConnectionsByUrl[url] ~= EventConnection(receivedFd, op.usedBuffer[0] > 0 ? true : false);
|
||||
|
||||
// FIXME: catch up on past messages here
|
||||
} else {
|
||||
auto data = op.usedBuffer;
|
||||
auto event = cast(SendableEvent*) data.ptr;
|
||||
|
||||
handleInputEvent(event);
|
||||
}
|
||||
}
|
||||
|
||||
static void handleLocalConnectionClose(IoOp* op) {
|
||||
//writeln("CLOSED");
|
||||
}
|
||||
static void handleLocalConnectionComplete(IoOp* op) {
|
||||
//writeln("COMPLETED");
|
||||
}
|
||||
int sock = socket(AF_UNIX, SOCK_STREAM, 0);
|
||||
if(sock == -1)
|
||||
throw new Exception("socket " ~ to!string(errno));
|
||||
|
@ -4879,10 +5068,10 @@ void runAddonServer()(string name) {
|
|||
version(linux) {
|
||||
// on linux, we will use the abstract namespace
|
||||
addr.sun_path[0] = 0;
|
||||
addr.sun_path[1 .. name.length + 1] = cast(typeof(addr.sun_path[])) name[];
|
||||
addr.sun_path[1 .. localListenerName.length + 1] = cast(typeof(addr.sun_path[])) localListenerName[];
|
||||
} else {
|
||||
// but otherwise, just use a file cuz we must.
|
||||
addr.sun_path[0 .. name.length] = cast(typeof(addr.sun_path[])) name[];
|
||||
addr.sun_path[0 .. localListenerName.length] = cast(typeof(addr.sun_path[])) localListenerName[];
|
||||
}
|
||||
|
||||
if(bind(sock, cast(sockaddr*) &addr, addr.sizeof) == -1)
|
||||
|
@ -4915,7 +5104,10 @@ void runAddonServer()(string name) {
|
|||
epoll_event[64] events;
|
||||
|
||||
while(true) {
|
||||
int timeout_milliseconds = -1; // infinite
|
||||
|
||||
// FIXME: it should actually do a timerfd that runs on any thing that hasn't been run recently
|
||||
|
||||
int timeout_milliseconds = 15000; // -1; // infinite
|
||||
//writeln("waiting for ", name);
|
||||
auto nfds = epoll_wait(epoll_fd, events.ptr, events.length, timeout_milliseconds);
|
||||
if(nfds == -1) {
|
||||
|
@ -4924,6 +5116,10 @@ void runAddonServer()(string name) {
|
|||
throw new Exception("epoll_wait " ~ to!string(errno));
|
||||
}
|
||||
|
||||
if(nfds == 0) {
|
||||
eis.wait_timeout();
|
||||
}
|
||||
|
||||
foreach(idx; 0 .. nfds) {
|
||||
auto flags = events[idx].events;
|
||||
auto ioop = cast(IoOp*) events[idx].data.ptr;
|
||||
|
@ -4933,7 +5129,7 @@ void runAddonServer()(string name) {
|
|||
if(ioop.fd == sock && (flags & EPOLLIN)) {
|
||||
// on edge triggering, it is important that we get it all
|
||||
while(true) {
|
||||
auto size = addr.sizeof;
|
||||
auto size = cast(uint) addr.sizeof;
|
||||
auto ns = accept(sock, cast(sockaddr*) &addr, &size);
|
||||
if(ns == -1) {
|
||||
if(errno == EAGAIN || errno == EWOULDBLOCK) {
|
||||
|
@ -4946,9 +5142,9 @@ void runAddonServer()(string name) {
|
|||
makeNonBlocking(ns);
|
||||
epoll_event nev;
|
||||
nev.events = EPOLLIN | EPOLLET;
|
||||
auto niop = allocateIoOp(ns, IoOp.ReadSocketHandle, 4096, &handleLocalConnectionData);
|
||||
niop.closeHandler = &handleLocalConnectionClose;
|
||||
niop.completeHandler = &handleLocalConnectionComplete;
|
||||
auto niop = allocateIoOp(ns, IoOp.ReadSocketHandle, 4096, &eis.handleLocalConnectionData);
|
||||
niop.closeHandler = &eis.handleLocalConnectionClose;
|
||||
niop.completeHandler = &eis.handleLocalConnectionComplete;
|
||||
scope(failure) freeIoOp(niop);
|
||||
nev.data.ptr = niop;
|
||||
if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, ns, &nev) == -1)
|
||||
|
@ -4976,7 +5172,7 @@ void runAddonServer()(string name) {
|
|||
break;
|
||||
}
|
||||
|
||||
ioop.bufferLengthUsed = got;
|
||||
ioop.bufferLengthUsed = cast(int) got;
|
||||
ioop.handler(ioop, in_fd);
|
||||
}
|
||||
} else if(ioop.operation == IoOp.Read) {
|
||||
|
@ -5000,7 +5196,7 @@ void runAddonServer()(string name) {
|
|||
break;
|
||||
}
|
||||
|
||||
ioop.bufferLengthUsed = got;
|
||||
ioop.bufferLengthUsed = cast(int) got;
|
||||
ioop.handler(ioop, -1);
|
||||
}
|
||||
}
|
||||
|
@ -5012,11 +5208,6 @@ void runAddonServer()(string name) {
|
|||
// this isn't seriously implemented.
|
||||
static assert(0);
|
||||
}
|
||||
|
||||
// then we need to run the event loop. a user-defined function may be called here to help
|
||||
// the event loop needs to process the websocket messages
|
||||
|
||||
|
||||
} else version(Windows) {
|
||||
|
||||
// set up a named pipe
|
||||
|
|
73
color.d
73
color.d
|
@ -1081,6 +1081,79 @@ class TrueColorImage : MemoryImage {
|
|||
}
|
||||
}
|
||||
|
||||
/+
|
||||
/// An RGB array of image data.
|
||||
class TrueColorImageWithoutAlpha : MemoryImage {
|
||||
struct Data {
|
||||
ubyte[] bytes; // the data as rgba bytes. Stored left to right, top to bottom, no padding.
|
||||
}
|
||||
|
||||
/// .
|
||||
Data imageData;
|
||||
|
||||
int _width;
|
||||
int _height;
|
||||
|
||||
override void clearInternal () nothrow @system {// @nogc {
|
||||
import core.memory : GC;
|
||||
// it is safe to call [GC.free] with `null` pointer.
|
||||
GC.free(imageData.bytes.ptr); imageData.bytes = null;
|
||||
_width = _height = 0;
|
||||
}
|
||||
|
||||
/// .
|
||||
override TrueColorImageWithoutAlpha clone() const pure nothrow @trusted {
|
||||
auto n = new TrueColorImageWithoutAlpha(width, height);
|
||||
n.imageData.bytes[] = this.imageData.bytes[]; // copy into existing array ctor allocated
|
||||
return n;
|
||||
}
|
||||
|
||||
/// .
|
||||
override int width() const pure nothrow @trusted @nogc { return _width; }
|
||||
///.
|
||||
override int height() const pure nothrow @trusted @nogc { return _height; }
|
||||
|
||||
override Color getPixel(int x, int y) const pure nothrow @trusted @nogc {
|
||||
if (x >= 0 && y >= 0 && x < _width && y < _height) {
|
||||
uint pos = (y*_width+x) * 3;
|
||||
return Color(imageData.bytes[0], imageData.bytes[1], imageData.bytes[2], 255);
|
||||
} else {
|
||||
return Color(0, 0, 0, 0);
|
||||
}
|
||||
}
|
||||
|
||||
override void setPixel(int x, int y, in Color clr) nothrow @trusted {
|
||||
if (x >= 0 && y >= 0 && x < _width && y < _height) {
|
||||
uint pos = y*_width+x;
|
||||
//if (pos < imageData.bytes.length/4) imageData.colors.ptr[pos] = clr;
|
||||
// FIXME
|
||||
}
|
||||
}
|
||||
|
||||
/// .
|
||||
this(int w, int h) pure nothrow @safe {
|
||||
_width = w;
|
||||
_height = h;
|
||||
imageData.bytes = new ubyte[w*h*3];
|
||||
}
|
||||
|
||||
/// Creates with existing data. The data pointer is stored here.
|
||||
this(int w, int h, ubyte[] data) pure nothrow @safe {
|
||||
_width = w;
|
||||
_height = h;
|
||||
assert(data.length == w * h * 3);
|
||||
imageData.bytes = data;
|
||||
}
|
||||
|
||||
///
|
||||
override TrueColorImage getAsTrueColorImage() pure nothrow @safe {
|
||||
// FIXME
|
||||
//return this;
|
||||
}
|
||||
}
|
||||
+/
|
||||
|
||||
|
||||
alias extern(C) int function(const void*, const void*) @system Comparator;
|
||||
@trusted void nonPhobosSort(T)(T[] obj, Comparator comparator) {
|
||||
import core.stdc.stdlib;
|
||||
|
|
12
mssql.d
12
mssql.d
|
@ -155,16 +155,16 @@ class MsSqlResult : ResultSet {
|
|||
string a;
|
||||
|
||||
more:
|
||||
SQLCHAR[255] buf;
|
||||
if(SQLGetData(statement, cast(ushort)(i+1), SQL_CHAR, buf.ptr, 255, &ptr) != SQL_SUCCESS)
|
||||
SQLCHAR[1024] buf;
|
||||
if(SQLGetData(statement, cast(ushort)(i+1), SQL_CHAR, buf.ptr, 1024, &ptr) != SQL_SUCCESS)
|
||||
throw new DatabaseException("get data: " ~ getSQLError(SQL_HANDLE_STMT, statement));
|
||||
|
||||
assert(ptr != SQL_NO_TOTAL);
|
||||
if(ptr == SQL_NULL_DATA)
|
||||
a = null;
|
||||
else {
|
||||
a ~= cast(string) buf[0 .. ptr > 255 ? 255 : ptr].idup;
|
||||
ptr -= ptr > 255 ? 255 : ptr;
|
||||
a ~= cast(string) buf[0 .. ptr > 1024 ? 1024 : ptr].idup;
|
||||
ptr -= ptr > 1024 ? 1024 : ptr;
|
||||
if(ptr)
|
||||
goto more;
|
||||
}
|
||||
|
@ -181,11 +181,11 @@ class MsSqlResult : ResultSet {
|
|||
void makeFieldMapping() {
|
||||
for(int i = 0; i < numFields; i++) {
|
||||
SQLSMALLINT len;
|
||||
SQLCHAR[255] buf;
|
||||
SQLCHAR[1024] buf;
|
||||
auto ret = SQLDescribeCol(statement,
|
||||
cast(ushort)(i+1),
|
||||
cast(ubyte*)buf.ptr,
|
||||
255,
|
||||
1024,
|
||||
&len,
|
||||
null, null, null, null);
|
||||
if (ret != SQL_SUCCESS)
|
||||
|
|
|
@ -1,4 +1,7 @@
|
|||
//
|
||||
// Would be nice: way to take output of the canvas to an image file (raster and/or svg)
|
||||
//
|
||||
//
|
||||
// Copyright (c) 2013 Mikko Mononen memon@inside.org
|
||||
//
|
||||
// This software is provided 'as-is', without any express or implied
|
||||
|
|
|
@ -1165,6 +1165,8 @@ TrueColorImage trueColorImageFromNativeHandle(NativeWindowHandle handle, int wid
|
|||
auto display = XDisplayConnection.get;
|
||||
auto image = XGetImage(display, handle, 0, 0, width, height, (cast(c_ulong) ~0) /*AllPlanes*/, ZPixmap);
|
||||
|
||||
// https://github.com/adamdruppe/arsd/issues/98
|
||||
|
||||
// FIXME: copy that shit
|
||||
|
||||
XDestroyImage(image);
|
||||
|
|
Loading…
Reference in New Issue