timer schedulign server

This commit is contained in:
Adam D. Ruppe 2019-08-04 19:07:16 -04:00
parent 78fd5c9b7c
commit f43729fef8
1 changed files with 190 additions and 18 deletions

208
cgi.d
View File

@ -3090,6 +3090,17 @@ void cgiMainImpl(alias fun, CustomCgi = Cgi, long maxContentLength = defaultMaxC
case "--event-server":
runEventServer();
return;
case "--timer-server":
runTimerServer();
return;
case "--timed-jobs":
import core.demangle;
foreach(k, v; scheduledJobHandlers)
writeln(k, "\t", demangle(k));
return;
case "--timed-job":
scheduledJobHandlers[args[2]](args[3 .. $]);
return;
default:
// intentionally blank - do nothing and carry on to run normally
}
@ -4859,6 +4870,10 @@ void runEventServer()() {
runAddonServer("/tmp/arsd_cgi_event_server", new EventSourceServerImplementation());
}
void runTimerServer()() {
runAddonServer("/tmp/arsd_scheduled_job_server", new ScheduledJobServerImplementation());
}
version(Posix) {
alias LocalServerConnectionHandle = int;
alias CgiConnectionHandle = int;
@ -4959,7 +4974,7 @@ struct IoOp {
// Your handler may be called in a different thread than the one that initiated the IO request!
// It is also possible to have multiple io requests being called simultaneously. Use proper thread safety caution.
private void delegate(IoOp*, int) handler;
private bool delegate(IoOp*, int) handler; // returns true if you are done and want it to be closed
private void delegate(IoOp*) closeHandler;
private void delegate(IoOp*) completeHandler;
private int internalFd;
@ -4985,10 +5000,10 @@ struct IoOp {
}
}
IoOp* allocateIoOp(int fd, int operation, int bufferSize, void delegate(IoOp*, int) handler) {
IoOp* allocateIoOp(int fd, int operation, int bufferSize, bool delegate(IoOp*, int) handler) {
import core.stdc.stdlib;
auto ptr = malloc(IoOp.sizeof + bufferSize);
auto ptr = calloc(IoOp.sizeof + bufferSize, 1);
if(ptr is null)
assert(0); // out of memory!
@ -5000,10 +5015,18 @@ IoOp* allocateIoOp(int fd, int operation, int bufferSize, void delegate(IoOp*, i
op.bufferLengthAllocated = bufferSize;
op.bufferLengthUsed = 0;
import core.memory;
GC.addRoot(ptr);
return op;
}
void freeIoOp(ref IoOp* ptr) {
import core.memory;
GC.removeRoot(ptr);
import core.stdc.stdlib;
free(ptr);
ptr = null;
@ -5053,11 +5076,13 @@ https://docs.microsoft.com/en-us/windows/desktop/api/winsock2/nf-winsock2-wsaget
$(TIP If you make your subclass a `final class`, there is a slight performance improvement.)
+/
interface EventIoServer {
void handleLocalConnectionData(IoOp* op, int receivedFd);
bool handleLocalConnectionData(IoOp* op, int receivedFd);
void handleLocalConnectionClose(IoOp* op);
void handleLocalConnectionComplete(IoOp* op);
void wait_timeout();
void fileClosed(int fd);
void epoll_fd(int fd);
}
// the sink should buffer it
@ -5118,7 +5143,16 @@ private void deserialize(T)(scope ubyte[] delegate(int sz) get, scope void deleg
T t = cast(T) get(len * cast(int) typeof(T.init[0]).sizeof);
dg(t);
} else static if(is(T == E[], E)) {
T t;
int len;
auto data = get(len.sizeof);
len = (cast(int[]) data)[0];
t.length = len;
foreach(ref e; t) {
deserialize!E(get, (ele) { e = ele; });
}
dg(t);
} else static assert(0, T.stringof);
}
@ -5614,15 +5648,17 @@ final class BasicDataServerImplementation : BasicDataServer, EventIoServer {
Session[string] sessions;
void handleLocalConnectionData(IoOp* op, int receivedFd) {
bool handleLocalConnectionData(IoOp* op, int receivedFd) {
auto data = op.usedBuffer;
dispatchRpcServer!BasicDataServer(this, data, op.fd);
return false;
}
void handleLocalConnectionClose(IoOp* op) {} // doesn't really matter, this is a fairly stateless go
void handleLocalConnectionComplete(IoOp* op) {} // again, irrelevant
void wait_timeout() {}
void fileClosed(int fd) {} // stateless so irrelevant
void epoll_fd(int fd) {}
}
/++
@ -5631,11 +5667,22 @@ final class BasicDataServerImplementation : BasicDataServer, EventIoServer {
struct ScheduledJobHelper {
private string func;
private string[] args;
private bool consumed;
private this(string func, string[] args) {
this.func = func;
this.args = args;
}
~this() {
assert(consumed);
}
/++
Schedules the job to be run at the given time.
+/
void at(DateTime when, immutable TimeZone timezone = UTC()) {
consumed = true;
}
@ -5643,6 +5690,7 @@ struct ScheduledJobHelper {
Schedules the job to run at least after the specified delay.
+/
void delay(Duration delay) {
consumed = true;
}
@ -5651,8 +5699,12 @@ struct ScheduledJobHelper {
$(NOTE It may run in a background thread. Don't segfault!)
+/
void runNowInBackground() {
void asap() {
consumed = true;
//delay(0);
auto conn = ScheduledJobServerConnection.connection;
import std.file;
auto jobId = conn.scheduleJob(0, 1, thisExePath, func, args);
}
/++
@ -5664,13 +5716,40 @@ struct ScheduledJobHelper {
}
}
private immutable void delegate(string[])[string] scheduledJobHandlers;
/++
First step to schedule a job on the scheduled job server.
The scheduled job needs to be a top-level function that doesn't read any
variables from outside its arguments because it may be run in a new process,
without any context existing later.
You MUST set details on the returned object to actually do anything!
+/
ScheduledJobHelper schedule(alias fn, T...)(T args) {
return ScheduledJobHelper();
template schedule(alias fn, T...) if(is(typeof(fn) == function)) {
///
ScheduledJobHelper schedule(T args) {
// this isn't meant to ever be called, but instead just to
// get the compiler to type check the arguments passed for us
auto sample = delegate() {
fn(args);
};
string[] sargs;
foreach(arg; args)
sargs ~= to!string(arg);
return ScheduledJobHelper(fn.mangleof, sargs);
}
shared static this() {
scheduledJobHandlers[fn.mangleof] = delegate(string[] sargs) {
import std.traits;
Parameters!fn args;
foreach(idx, ref arg; args)
arg = to!(typeof(arg))(sargs[idx]);
fn(args);
};
}
}
///
@ -5685,14 +5764,99 @@ class ScheduledJobServerConnection : ScheduledJobServer {
mixin ImplementRpcClientInterface!(ScheduledJobServer, "/tmp/arsd_scheduled_job_server");
}
final class ScheduledJobServerImplementation : ScheduledJobServer {
final class ScheduledJobServerImplementation : ScheduledJobServer, EventIoServer {
// whenIs is 0 for relative, 1 for absolute
protected int scheduleJob(int whenIs, int when, string executable, string func, string[] args) {
return 0;
auto nj = nextJobId;
nextJobId++;
version(linux) {
import core.sys.linux.timerfd;
import core.sys.linux.epoll;
import core.sys.posix.unistd;
auto fd = timerfd_create(CLOCK_REALTIME, TFD_NONBLOCK | TFD_CLOEXEC);
if(fd == -1)
throw new Exception("fd timer create failed");
auto job = Job(executable, func, args, fd, nj);
itimerspec value;
value.it_value.tv_sec = when;
value.it_value.tv_nsec = 0;
value.it_interval.tv_sec = 0;
value.it_interval.tv_nsec = 0;
if(timerfd_settime(fd, whenIs == 1 ? TFD_TIMER_ABSTIME : 0, &value, null) == -1)
throw new Exception("couldn't set fd timer");
auto op = allocateIoOp(fd, IoOp.Read, 16, (IoOp* op, int fd) {
jobs.remove(nj);
spawnProcess([job.executable, "--timed-job", job.func] ~ args);
return true;
});
scope(failure)
freeIoOp(op);
epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
ev.data.ptr = op;
if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev) == -1)
throw new Exception("epoll_ctl " ~ to!string(errno));
}
jobs[nj] = job;
return nj;
}
protected void cancelJob(int jobId) {
version(linux) {
auto job = jobId in jobs;
if(job is null)
return;
version(linux) {
import core.sys.linux.timerfd;
import core.sys.linux.epoll;
import core.sys.posix.unistd;
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, job.timerfd, null);
close(job.timerfd);
}
}
jobs.remove(jobId);
}
int nextJobId = 1;
static struct Job {
string executable;
string func;
string[] args;
int timerfd;
int id;
}
Job[int] jobs;
// event io server methods below
bool handleLocalConnectionData(IoOp* op, int receivedFd) {
auto data = op.usedBuffer;
dispatchRpcServer!ScheduledJobServer(this, data, op.fd);
return false;
}
void handleLocalConnectionClose(IoOp* op) {} // doesn't really matter, this is a fairly stateless go
void handleLocalConnectionComplete(IoOp* op) {} // again, irrelevant
void wait_timeout() {}
void fileClosed(int fd) {} // stateless so irrelevant
int epoll_fd_;
void epoll_fd(int fd) {this.epoll_fd_ = fd; }
int epoll_fd() { return epoll_fd_; }
}
///
@ -5819,7 +5983,7 @@ final class EventSourceServerImplementation : EventSourceServer, EventIoServer {
}
}
void handleLocalConnectionData(IoOp* op, int receivedFd) {
bool handleLocalConnectionData(IoOp* op, int receivedFd) {
if(receivedFd != -1) {
//writeln("GOT FD ", receivedFd, " -- ", op.usedBuffer);
@ -5847,6 +6011,7 @@ final class EventSourceServerImplementation : EventSourceServer, EventIoServer {
dispatchRpcServer!EventSourceServer(this, data, op.fd);
}
}
return false;
}
void handleLocalConnectionClose(IoOp* op) {}
void handleLocalConnectionComplete(IoOp* op) {}
@ -5873,6 +6038,7 @@ final class EventSourceServerImplementation : EventSourceServer, EventIoServer {
}
}
void epoll_fd(int fd) {}
private:
@ -6089,6 +6255,8 @@ void runAddonServer(EIS)(string localListenerName, EIS eis) if(is(EIS : EventIoS
import core.sys.posix.poll;
}
eis.epoll_fd = epoll_fd;
auto acceptOp = allocateIoOp(sock, IoOp.Read, 0, null);
scope(exit)
freeIoOp(acceptOp);
@ -6232,7 +6400,7 @@ void runAddonServer(EIS)(string localListenerName, EIS eis) if(is(EIS : EventIoS
}
} else if(ioop.operation == IoOp.Read) {
while(true) {
auto got = recv(ioop.fd, ioop.allocatedBuffer.ptr, ioop.allocatedBuffer.length, 0);
auto got = read(ioop.fd, ioop.allocatedBuffer.ptr, ioop.allocatedBuffer.length);
if(got == -1) {
if(errno == EAGAIN || errno == EWOULDBLOCK) {
// all done, got it all
@ -6240,7 +6408,7 @@ void runAddonServer(EIS)(string localListenerName, EIS eis) if(is(EIS : EventIoS
ioop.completeHandler(ioop);
break;
}
throw new Exception("recv " ~ to!string(errno));
throw new Exception("recv " ~ to!string(ioop.fd) ~ " errno " ~ to!string(errno));
}
if(got == 0) {
@ -6252,7 +6420,11 @@ void runAddonServer(EIS)(string localListenerName, EIS eis) if(is(EIS : EventIoS
}
ioop.bufferLengthUsed = cast(int) got;
ioop.handler(ioop, -1);
if(ioop.handler(ioop, ioop.fd)) {
close(ioop.fd);
freeIoOp(ioop);
break;
}
}
}
@ -8502,9 +8674,9 @@ auto serveStaticFileDirectory(string urlPrefix, string directory = null) {
auto fn = details.directory ~ file;
if(std.file.exists(fn)) {
if(contentType.indexOf("image/") == 0)
cgi.setCache(true);
else if(contentType.indexOf("audio/") == 0)
//if(contentType.indexOf("image/") == 0)
//cgi.setCache(true);
//else if(contentType.indexOf("audio/") == 0)
cgi.setCache(true);
cgi.setResponseContentType(contentType);
cgi.write(std.file.read(fn), true);