diff --git a/cgi.d b/cgi.d index db17187..da77ed3 100644 --- a/cgi.d +++ b/cgi.d @@ -3090,6 +3090,17 @@ void cgiMainImpl(alias fun, CustomCgi = Cgi, long maxContentLength = defaultMaxC case "--event-server": runEventServer(); return; + case "--timer-server": + runTimerServer(); + return; + case "--timed-jobs": + import core.demangle; + foreach(k, v; scheduledJobHandlers) + writeln(k, "\t", demangle(k)); + return; + case "--timed-job": + scheduledJobHandlers[args[2]](args[3 .. $]); + return; default: // intentionally blank - do nothing and carry on to run normally } @@ -4859,6 +4870,10 @@ void runEventServer()() { runAddonServer("/tmp/arsd_cgi_event_server", new EventSourceServerImplementation()); } +void runTimerServer()() { + runAddonServer("/tmp/arsd_scheduled_job_server", new ScheduledJobServerImplementation()); +} + version(Posix) { alias LocalServerConnectionHandle = int; alias CgiConnectionHandle = int; @@ -4959,7 +4974,7 @@ struct IoOp { // Your handler may be called in a different thread than the one that initiated the IO request! // It is also possible to have multiple io requests being called simultaneously. Use proper thread safety caution. - private void delegate(IoOp*, int) handler; + private bool delegate(IoOp*, int) handler; // returns true if you are done and want it to be closed private void delegate(IoOp*) closeHandler; private void delegate(IoOp*) completeHandler; private int internalFd; @@ -4985,10 +5000,10 @@ struct IoOp { } } -IoOp* allocateIoOp(int fd, int operation, int bufferSize, void delegate(IoOp*, int) handler) { +IoOp* allocateIoOp(int fd, int operation, int bufferSize, bool delegate(IoOp*, int) handler) { import core.stdc.stdlib; - auto ptr = malloc(IoOp.sizeof + bufferSize); + auto ptr = calloc(IoOp.sizeof + bufferSize, 1); if(ptr is null) assert(0); // out of memory! @@ -5000,10 +5015,18 @@ IoOp* allocateIoOp(int fd, int operation, int bufferSize, void delegate(IoOp*, i op.bufferLengthAllocated = bufferSize; op.bufferLengthUsed = 0; + import core.memory; + + GC.addRoot(ptr); + return op; } void freeIoOp(ref IoOp* ptr) { + + import core.memory; + GC.removeRoot(ptr); + import core.stdc.stdlib; free(ptr); ptr = null; @@ -5053,11 +5076,13 @@ https://docs.microsoft.com/en-us/windows/desktop/api/winsock2/nf-winsock2-wsaget $(TIP If you make your subclass a `final class`, there is a slight performance improvement.) +/ interface EventIoServer { - void handleLocalConnectionData(IoOp* op, int receivedFd); + bool handleLocalConnectionData(IoOp* op, int receivedFd); void handleLocalConnectionClose(IoOp* op); void handleLocalConnectionComplete(IoOp* op); void wait_timeout(); void fileClosed(int fd); + + void epoll_fd(int fd); } // the sink should buffer it @@ -5118,7 +5143,16 @@ private void deserialize(T)(scope ubyte[] delegate(int sz) get, scope void deleg T t = cast(T) get(len * cast(int) typeof(T.init[0]).sizeof); dg(t); - + } else static if(is(T == E[], E)) { + T t; + int len; + auto data = get(len.sizeof); + len = (cast(int[]) data)[0]; + t.length = len; + foreach(ref e; t) { + deserialize!E(get, (ele) { e = ele; }); + } + dg(t); } else static assert(0, T.stringof); } @@ -5614,15 +5648,17 @@ final class BasicDataServerImplementation : BasicDataServer, EventIoServer { Session[string] sessions; - void handleLocalConnectionData(IoOp* op, int receivedFd) { + bool handleLocalConnectionData(IoOp* op, int receivedFd) { auto data = op.usedBuffer; dispatchRpcServer!BasicDataServer(this, data, op.fd); + return false; } void handleLocalConnectionClose(IoOp* op) {} // doesn't really matter, this is a fairly stateless go void handleLocalConnectionComplete(IoOp* op) {} // again, irrelevant void wait_timeout() {} void fileClosed(int fd) {} // stateless so irrelevant + void epoll_fd(int fd) {} } /++ @@ -5631,11 +5667,22 @@ final class BasicDataServerImplementation : BasicDataServer, EventIoServer { struct ScheduledJobHelper { private string func; private string[] args; + private bool consumed; + + private this(string func, string[] args) { + this.func = func; + this.args = args; + } + + ~this() { + assert(consumed); + } /++ Schedules the job to be run at the given time. +/ void at(DateTime when, immutable TimeZone timezone = UTC()) { + consumed = true; } @@ -5643,6 +5690,7 @@ struct ScheduledJobHelper { Schedules the job to run at least after the specified delay. +/ void delay(Duration delay) { + consumed = true; } @@ -5651,8 +5699,12 @@ struct ScheduledJobHelper { $(NOTE It may run in a background thread. Don't segfault!) +/ - void runNowInBackground() { + void asap() { + consumed = true; //delay(0); + auto conn = ScheduledJobServerConnection.connection; + import std.file; + auto jobId = conn.scheduleJob(0, 1, thisExePath, func, args); } /++ @@ -5664,13 +5716,40 @@ struct ScheduledJobHelper { } } +private immutable void delegate(string[])[string] scheduledJobHandlers; + /++ First step to schedule a job on the scheduled job server. + The scheduled job needs to be a top-level function that doesn't read any + variables from outside its arguments because it may be run in a new process, + without any context existing later. + You MUST set details on the returned object to actually do anything! +/ -ScheduledJobHelper schedule(alias fn, T...)(T args) { - return ScheduledJobHelper(); +template schedule(alias fn, T...) if(is(typeof(fn) == function)) { + /// + ScheduledJobHelper schedule(T args) { + // this isn't meant to ever be called, but instead just to + // get the compiler to type check the arguments passed for us + auto sample = delegate() { + fn(args); + }; + string[] sargs; + foreach(arg; args) + sargs ~= to!string(arg); + return ScheduledJobHelper(fn.mangleof, sargs); + } + + shared static this() { + scheduledJobHandlers[fn.mangleof] = delegate(string[] sargs) { + import std.traits; + Parameters!fn args; + foreach(idx, ref arg; args) + arg = to!(typeof(arg))(sargs[idx]); + fn(args); + }; + } } /// @@ -5685,14 +5764,99 @@ class ScheduledJobServerConnection : ScheduledJobServer { mixin ImplementRpcClientInterface!(ScheduledJobServer, "/tmp/arsd_scheduled_job_server"); } -final class ScheduledJobServerImplementation : ScheduledJobServer { +final class ScheduledJobServerImplementation : ScheduledJobServer, EventIoServer { + // whenIs is 0 for relative, 1 for absolute protected int scheduleJob(int whenIs, int when, string executable, string func, string[] args) { - return 0; + auto nj = nextJobId; + nextJobId++; + + version(linux) { + import core.sys.linux.timerfd; + import core.sys.linux.epoll; + import core.sys.posix.unistd; + + + auto fd = timerfd_create(CLOCK_REALTIME, TFD_NONBLOCK | TFD_CLOEXEC); + if(fd == -1) + throw new Exception("fd timer create failed"); + + auto job = Job(executable, func, args, fd, nj); + + itimerspec value; + value.it_value.tv_sec = when; + value.it_value.tv_nsec = 0; + + value.it_interval.tv_sec = 0; + value.it_interval.tv_nsec = 0; + + if(timerfd_settime(fd, whenIs == 1 ? TFD_TIMER_ABSTIME : 0, &value, null) == -1) + throw new Exception("couldn't set fd timer"); + + auto op = allocateIoOp(fd, IoOp.Read, 16, (IoOp* op, int fd) { + jobs.remove(nj); + + spawnProcess([job.executable, "--timed-job", job.func] ~ args); + + return true; + }); + scope(failure) + freeIoOp(op); + + epoll_event ev; + ev.events = EPOLLIN | EPOLLET; + ev.data.ptr = op; + if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev) == -1) + throw new Exception("epoll_ctl " ~ to!string(errno)); + } + + jobs[nj] = job; + return nj; } protected void cancelJob(int jobId) { + version(linux) { + auto job = jobId in jobs; + if(job is null) + return; + version(linux) { + import core.sys.linux.timerfd; + import core.sys.linux.epoll; + import core.sys.posix.unistd; + epoll_ctl(epoll_fd, EPOLL_CTL_DEL, job.timerfd, null); + close(job.timerfd); + } + } + jobs.remove(jobId); } + + int nextJobId = 1; + static struct Job { + string executable; + string func; + string[] args; + int timerfd; + int id; + } + Job[int] jobs; + + + // event io server methods below + + bool handleLocalConnectionData(IoOp* op, int receivedFd) { + auto data = op.usedBuffer; + dispatchRpcServer!ScheduledJobServer(this, data, op.fd); + return false; + } + + void handleLocalConnectionClose(IoOp* op) {} // doesn't really matter, this is a fairly stateless go + void handleLocalConnectionComplete(IoOp* op) {} // again, irrelevant + void wait_timeout() {} + void fileClosed(int fd) {} // stateless so irrelevant + + int epoll_fd_; + void epoll_fd(int fd) {this.epoll_fd_ = fd; } + int epoll_fd() { return epoll_fd_; } } /// @@ -5819,7 +5983,7 @@ final class EventSourceServerImplementation : EventSourceServer, EventIoServer { } } - void handleLocalConnectionData(IoOp* op, int receivedFd) { + bool handleLocalConnectionData(IoOp* op, int receivedFd) { if(receivedFd != -1) { //writeln("GOT FD ", receivedFd, " -- ", op.usedBuffer); @@ -5847,6 +6011,7 @@ final class EventSourceServerImplementation : EventSourceServer, EventIoServer { dispatchRpcServer!EventSourceServer(this, data, op.fd); } } + return false; } void handleLocalConnectionClose(IoOp* op) {} void handleLocalConnectionComplete(IoOp* op) {} @@ -5873,6 +6038,7 @@ final class EventSourceServerImplementation : EventSourceServer, EventIoServer { } } + void epoll_fd(int fd) {} private: @@ -6089,6 +6255,8 @@ void runAddonServer(EIS)(string localListenerName, EIS eis) if(is(EIS : EventIoS import core.sys.posix.poll; } + eis.epoll_fd = epoll_fd; + auto acceptOp = allocateIoOp(sock, IoOp.Read, 0, null); scope(exit) freeIoOp(acceptOp); @@ -6232,7 +6400,7 @@ void runAddonServer(EIS)(string localListenerName, EIS eis) if(is(EIS : EventIoS } } else if(ioop.operation == IoOp.Read) { while(true) { - auto got = recv(ioop.fd, ioop.allocatedBuffer.ptr, ioop.allocatedBuffer.length, 0); + auto got = read(ioop.fd, ioop.allocatedBuffer.ptr, ioop.allocatedBuffer.length); if(got == -1) { if(errno == EAGAIN || errno == EWOULDBLOCK) { // all done, got it all @@ -6240,7 +6408,7 @@ void runAddonServer(EIS)(string localListenerName, EIS eis) if(is(EIS : EventIoS ioop.completeHandler(ioop); break; } - throw new Exception("recv " ~ to!string(errno)); + throw new Exception("recv " ~ to!string(ioop.fd) ~ " errno " ~ to!string(errno)); } if(got == 0) { @@ -6252,7 +6420,11 @@ void runAddonServer(EIS)(string localListenerName, EIS eis) if(is(EIS : EventIoS } ioop.bufferLengthUsed = cast(int) got; - ioop.handler(ioop, -1); + if(ioop.handler(ioop, ioop.fd)) { + close(ioop.fd); + freeIoOp(ioop); + break; + } } } @@ -8502,9 +8674,9 @@ auto serveStaticFileDirectory(string urlPrefix, string directory = null) { auto fn = details.directory ~ file; if(std.file.exists(fn)) { - if(contentType.indexOf("image/") == 0) - cgi.setCache(true); - else if(contentType.indexOf("audio/") == 0) + //if(contentType.indexOf("image/") == 0) + //cgi.setCache(true); + //else if(contentType.indexOf("audio/") == 0) cgi.setCache(true); cgi.setResponseContentType(contentType); cgi.write(std.file.read(fn), true);