timer schedulign server

This commit is contained in:
Adam D. Ruppe 2019-08-04 19:07:16 -04:00
parent 78fd5c9b7c
commit 8050f1f54b
1 changed files with 190 additions and 18 deletions

208
cgi.d
View File

@ -3090,6 +3090,17 @@ void cgiMainImpl(alias fun, CustomCgi = Cgi, long maxContentLength = defaultMaxC
case "--event-server": case "--event-server":
runEventServer(); runEventServer();
return; return;
case "--timer-server":
runTimerServer();
return;
case "--timed-jobs":
import core.demangle;
foreach(k, v; scheduledJobHandlers)
writeln(k, "\t", demangle(k));
return;
case "--timed-job":
scheduledJobHandlers[args[2]](args[3 .. $]);
return;
default: default:
// intentionally blank - do nothing and carry on to run normally // intentionally blank - do nothing and carry on to run normally
} }
@ -4859,6 +4870,10 @@ void runEventServer()() {
runAddonServer("/tmp/arsd_cgi_event_server", new EventSourceServerImplementation()); runAddonServer("/tmp/arsd_cgi_event_server", new EventSourceServerImplementation());
} }
void runTimerServer()() {
runAddonServer("/tmp/arsd_scheduled_job_server", new ScheduledJobServerImplementation());
}
version(Posix) { version(Posix) {
alias LocalServerConnectionHandle = int; alias LocalServerConnectionHandle = int;
alias CgiConnectionHandle = int; alias CgiConnectionHandle = int;
@ -4959,7 +4974,7 @@ struct IoOp {
// Your handler may be called in a different thread than the one that initiated the IO request! // Your handler may be called in a different thread than the one that initiated the IO request!
// It is also possible to have multiple io requests being called simultaneously. Use proper thread safety caution. // It is also possible to have multiple io requests being called simultaneously. Use proper thread safety caution.
private void delegate(IoOp*, int) handler; private bool delegate(IoOp*, int) handler; // returns true if you are done and want it to be closed
private void delegate(IoOp*) closeHandler; private void delegate(IoOp*) closeHandler;
private void delegate(IoOp*) completeHandler; private void delegate(IoOp*) completeHandler;
private int internalFd; private int internalFd;
@ -4985,10 +5000,10 @@ struct IoOp {
} }
} }
IoOp* allocateIoOp(int fd, int operation, int bufferSize, void delegate(IoOp*, int) handler) { IoOp* allocateIoOp(int fd, int operation, int bufferSize, bool delegate(IoOp*, int) handler) {
import core.stdc.stdlib; import core.stdc.stdlib;
auto ptr = malloc(IoOp.sizeof + bufferSize); auto ptr = calloc(IoOp.sizeof + bufferSize, 1);
if(ptr is null) if(ptr is null)
assert(0); // out of memory! assert(0); // out of memory!
@ -5000,10 +5015,18 @@ IoOp* allocateIoOp(int fd, int operation, int bufferSize, void delegate(IoOp*, i
op.bufferLengthAllocated = bufferSize; op.bufferLengthAllocated = bufferSize;
op.bufferLengthUsed = 0; op.bufferLengthUsed = 0;
import core.memory;
GC.addRoot(ptr);
return op; return op;
} }
void freeIoOp(ref IoOp* ptr) { void freeIoOp(ref IoOp* ptr) {
import core.memory;
GC.removeRoot(ptr);
import core.stdc.stdlib; import core.stdc.stdlib;
free(ptr); free(ptr);
ptr = null; ptr = null;
@ -5053,11 +5076,13 @@ https://docs.microsoft.com/en-us/windows/desktop/api/winsock2/nf-winsock2-wsaget
$(TIP If you make your subclass a `final class`, there is a slight performance improvement.) $(TIP If you make your subclass a `final class`, there is a slight performance improvement.)
+/ +/
interface EventIoServer { interface EventIoServer {
void handleLocalConnectionData(IoOp* op, int receivedFd); bool handleLocalConnectionData(IoOp* op, int receivedFd);
void handleLocalConnectionClose(IoOp* op); void handleLocalConnectionClose(IoOp* op);
void handleLocalConnectionComplete(IoOp* op); void handleLocalConnectionComplete(IoOp* op);
void wait_timeout(); void wait_timeout();
void fileClosed(int fd); void fileClosed(int fd);
void epoll_fd(int fd);
} }
// the sink should buffer it // the sink should buffer it
@ -5118,7 +5143,16 @@ private void deserialize(T)(scope ubyte[] delegate(int sz) get, scope void deleg
T t = cast(T) get(len * cast(int) typeof(T.init[0]).sizeof); T t = cast(T) get(len * cast(int) typeof(T.init[0]).sizeof);
dg(t); dg(t);
} else static if(is(T == E[], E)) {
T t;
int len;
auto data = get(len.sizeof);
len = (cast(int[]) data)[0];
t.length = len;
foreach(ref e; t) {
deserialize!E(get, (ele) { e = ele; });
}
dg(t);
} else static assert(0, T.stringof); } else static assert(0, T.stringof);
} }
@ -5614,15 +5648,17 @@ final class BasicDataServerImplementation : BasicDataServer, EventIoServer {
Session[string] sessions; Session[string] sessions;
void handleLocalConnectionData(IoOp* op, int receivedFd) { bool handleLocalConnectionData(IoOp* op, int receivedFd) {
auto data = op.usedBuffer; auto data = op.usedBuffer;
dispatchRpcServer!BasicDataServer(this, data, op.fd); dispatchRpcServer!BasicDataServer(this, data, op.fd);
return false;
} }
void handleLocalConnectionClose(IoOp* op) {} // doesn't really matter, this is a fairly stateless go void handleLocalConnectionClose(IoOp* op) {} // doesn't really matter, this is a fairly stateless go
void handleLocalConnectionComplete(IoOp* op) {} // again, irrelevant void handleLocalConnectionComplete(IoOp* op) {} // again, irrelevant
void wait_timeout() {} void wait_timeout() {}
void fileClosed(int fd) {} // stateless so irrelevant void fileClosed(int fd) {} // stateless so irrelevant
void epoll_fd(int fd) {}
} }
/++ /++
@ -5631,11 +5667,22 @@ final class BasicDataServerImplementation : BasicDataServer, EventIoServer {
struct ScheduledJobHelper { struct ScheduledJobHelper {
private string func; private string func;
private string[] args; private string[] args;
private bool consumed;
private this(string func, string[] args) {
this.func = func;
this.args = args;
}
~this() {
assert(consumed);
}
/++ /++
Schedules the job to be run at the given time. Schedules the job to be run at the given time.
+/ +/
void at(DateTime when, immutable TimeZone timezone = UTC()) { void at(DateTime when, immutable TimeZone timezone = UTC()) {
consumed = true;
} }
@ -5643,6 +5690,7 @@ struct ScheduledJobHelper {
Schedules the job to run at least after the specified delay. Schedules the job to run at least after the specified delay.
+/ +/
void delay(Duration delay) { void delay(Duration delay) {
consumed = true;
} }
@ -5651,8 +5699,12 @@ struct ScheduledJobHelper {
$(NOTE It may run in a background thread. Don't segfault!) $(NOTE It may run in a background thread. Don't segfault!)
+/ +/
void runNowInBackground() { void asap() {
consumed = true;
//delay(0); //delay(0);
auto conn = ScheduledJobServerConnection.connection;
import std.file;
auto jobId = conn.scheduleJob(0, 1, thisExePath, func, args);
} }
/++ /++
@ -5664,13 +5716,40 @@ struct ScheduledJobHelper {
} }
} }
private immutable void delegate(string[])[string] scheduledJobHandlers;
/++ /++
First step to schedule a job on the scheduled job server. First step to schedule a job on the scheduled job server.
The scheduled job needs to be a top-level function that doesn't read any
variables from outside its arguments because it may be run in a new process,
without any context existing later.
You MUST set details on the returned object to actually do anything! You MUST set details on the returned object to actually do anything!
+/ +/
ScheduledJobHelper schedule(alias fn, T...)(T args) { template schedule(alias fn, T...) if(is(typeof(fn) == function)) {
return ScheduledJobHelper(); ///
ScheduledJobHelper schedule(T args) {
// this isn't meant to ever be called, but instead just to
// get the compiler to type check the arguments passed for us
auto sample = delegate() {
fn(args);
};
string[] sargs;
foreach(arg; args)
sargs ~= to!string(arg);
return ScheduledJobHelper(fn.mangleof, sargs);
}
shared static this() {
scheduledJobHandlers[fn.mangleof] = delegate(string[] sargs) {
import std.traits;
Parameters!fn args;
foreach(idx, ref arg; args)
arg = to!(typeof(arg))(sargs[idx]);
fn(args);
};
}
} }
/// ///
@ -5685,15 +5764,100 @@ class ScheduledJobServerConnection : ScheduledJobServer {
mixin ImplementRpcClientInterface!(ScheduledJobServer, "/tmp/arsd_scheduled_job_server"); mixin ImplementRpcClientInterface!(ScheduledJobServer, "/tmp/arsd_scheduled_job_server");
} }
final class ScheduledJobServerImplementation : ScheduledJobServer { final class ScheduledJobServerImplementation : ScheduledJobServer, EventIoServer {
// whenIs is 0 for relative, 1 for absolute
protected int scheduleJob(int whenIs, int when, string executable, string func, string[] args) { protected int scheduleJob(int whenIs, int when, string executable, string func, string[] args) {
return 0; auto nj = nextJobId;
nextJobId++;
version(linux) {
import core.sys.linux.timerfd;
import core.sys.linux.epoll;
import core.sys.posix.unistd;
auto fd = timerfd_create(CLOCK_REALTIME, TFD_NONBLOCK | TFD_CLOEXEC);
if(fd == -1)
throw new Exception("fd timer create failed");
auto job = Job(executable, func, args, fd, nj);
itimerspec value;
value.it_value.tv_sec = when;
value.it_value.tv_nsec = 0;
value.it_interval.tv_sec = 0;
value.it_interval.tv_nsec = 0;
if(timerfd_settime(fd, whenIs == 1 ? TFD_TIMER_ABSTIME : 0, &value, null) == -1)
throw new Exception("couldn't set fd timer");
auto op = allocateIoOp(fd, IoOp.Read, 16, (IoOp* op, int fd) {
jobs.remove(nj);
spawnProcess([job.executable, "--timed-job", job.func] ~ args);
return true;
});
scope(failure)
freeIoOp(op);
epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
ev.data.ptr = op;
if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev) == -1)
throw new Exception("epoll_ctl " ~ to!string(errno));
}
jobs[nj] = job;
return nj;
} }
protected void cancelJob(int jobId) { protected void cancelJob(int jobId) {
version(linux) {
auto job = jobId in jobs;
if(job is null)
return;
version(linux) {
import core.sys.linux.timerfd;
import core.sys.linux.epoll;
import core.sys.posix.unistd;
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, job.timerfd, null);
close(job.timerfd);
} }
} }
jobs.remove(jobId);
}
int nextJobId = 1;
static struct Job {
string executable;
string func;
string[] args;
int timerfd;
int id;
}
Job[int] jobs;
// event io server methods below
bool handleLocalConnectionData(IoOp* op, int receivedFd) {
auto data = op.usedBuffer;
dispatchRpcServer!ScheduledJobServer(this, data, op.fd);
return false;
}
void handleLocalConnectionClose(IoOp* op) {} // doesn't really matter, this is a fairly stateless go
void handleLocalConnectionComplete(IoOp* op) {} // again, irrelevant
void wait_timeout() {}
void fileClosed(int fd) {} // stateless so irrelevant
int epoll_fd_;
void epoll_fd(int fd) {this.epoll_fd_ = fd; }
int epoll_fd() { return epoll_fd_; }
}
/// ///
interface EventSourceServer { interface EventSourceServer {
@ -5819,7 +5983,7 @@ final class EventSourceServerImplementation : EventSourceServer, EventIoServer {
} }
} }
void handleLocalConnectionData(IoOp* op, int receivedFd) { bool handleLocalConnectionData(IoOp* op, int receivedFd) {
if(receivedFd != -1) { if(receivedFd != -1) {
//writeln("GOT FD ", receivedFd, " -- ", op.usedBuffer); //writeln("GOT FD ", receivedFd, " -- ", op.usedBuffer);
@ -5847,6 +6011,7 @@ final class EventSourceServerImplementation : EventSourceServer, EventIoServer {
dispatchRpcServer!EventSourceServer(this, data, op.fd); dispatchRpcServer!EventSourceServer(this, data, op.fd);
} }
} }
return false;
} }
void handleLocalConnectionClose(IoOp* op) {} void handleLocalConnectionClose(IoOp* op) {}
void handleLocalConnectionComplete(IoOp* op) {} void handleLocalConnectionComplete(IoOp* op) {}
@ -5873,6 +6038,7 @@ final class EventSourceServerImplementation : EventSourceServer, EventIoServer {
} }
} }
void epoll_fd(int fd) {}
private: private:
@ -6089,6 +6255,8 @@ void runAddonServer(EIS)(string localListenerName, EIS eis) if(is(EIS : EventIoS
import core.sys.posix.poll; import core.sys.posix.poll;
} }
eis.epoll_fd = epoll_fd;
auto acceptOp = allocateIoOp(sock, IoOp.Read, 0, null); auto acceptOp = allocateIoOp(sock, IoOp.Read, 0, null);
scope(exit) scope(exit)
freeIoOp(acceptOp); freeIoOp(acceptOp);
@ -6232,7 +6400,7 @@ void runAddonServer(EIS)(string localListenerName, EIS eis) if(is(EIS : EventIoS
} }
} else if(ioop.operation == IoOp.Read) { } else if(ioop.operation == IoOp.Read) {
while(true) { while(true) {
auto got = recv(ioop.fd, ioop.allocatedBuffer.ptr, ioop.allocatedBuffer.length, 0); auto got = read(ioop.fd, ioop.allocatedBuffer.ptr, ioop.allocatedBuffer.length);
if(got == -1) { if(got == -1) {
if(errno == EAGAIN || errno == EWOULDBLOCK) { if(errno == EAGAIN || errno == EWOULDBLOCK) {
// all done, got it all // all done, got it all
@ -6240,7 +6408,7 @@ void runAddonServer(EIS)(string localListenerName, EIS eis) if(is(EIS : EventIoS
ioop.completeHandler(ioop); ioop.completeHandler(ioop);
break; break;
} }
throw new Exception("recv " ~ to!string(errno)); throw new Exception("recv " ~ to!string(ioop.fd) ~ " errno " ~ to!string(errno));
} }
if(got == 0) { if(got == 0) {
@ -6252,7 +6420,11 @@ void runAddonServer(EIS)(string localListenerName, EIS eis) if(is(EIS : EventIoS
} }
ioop.bufferLengthUsed = cast(int) got; ioop.bufferLengthUsed = cast(int) got;
ioop.handler(ioop, -1); if(ioop.handler(ioop, ioop.fd)) {
close(ioop.fd);
freeIoOp(ioop);
break;
}
} }
} }
@ -8502,9 +8674,9 @@ auto serveStaticFileDirectory(string urlPrefix, string directory = null) {
auto fn = details.directory ~ file; auto fn = details.directory ~ file;
if(std.file.exists(fn)) { if(std.file.exists(fn)) {
if(contentType.indexOf("image/") == 0) //if(contentType.indexOf("image/") == 0)
cgi.setCache(true); //cgi.setCache(true);
else if(contentType.indexOf("audio/") == 0) //else if(contentType.indexOf("audio/") == 0)
cgi.setCache(true); cgi.setCache(true);
cgi.setResponseContentType(contentType); cgi.setResponseContentType(contentType);
cgi.write(std.file.read(fn), true); cgi.write(std.file.read(fn), true);