stop method to programatically terminate server

This commit is contained in:
Adam D. Ruppe 2021-12-22 18:47:28 -05:00
parent a1659f7c34
commit 933423478f
1 changed files with 110 additions and 13 deletions

123
cgi.d
View File

@ -3594,10 +3594,27 @@ struct RequestServer {
}
}
void serveEmbeddedHttp(alias fun, CustomCgi = Cgi, long maxContentLength = defaultMaxContentLength)() {
auto manager = new ListeningConnectionManager(listeningHost, listeningPort, &doThreadHttpConnection!(CustomCgi, fun));
/++
Runs the embedded HTTP thread server specifically, regardless of which build configuration you have.
If you want the forking worker process server, you do need to compile with the embedded_httpd_processes config though.
+/
void serveEmbeddedHttp(alias fun, CustomCgi = Cgi, long maxContentLength = defaultMaxContentLength)(ThisFor!fun _this) {
static if(__traits(isStaticFunction, fun))
alias funToUse = fun;
else
void funToUse(CustomCgi cgi) {
static if(__VERSION__ > 2097)
__traits(child, _this, fun)(cgi);
else static assert(0, "Not implemented in your compiler version!");
}
auto manager = new ListeningConnectionManager(listeningHost, listeningPort, &doThreadHttpConnection!(CustomCgi, funToUse));
manager.listen();
}
/++
Runs the embedded SCGI server specifically, regardless of which build configuration you have.
+/
void serveScgi(alias fun, CustomCgi = Cgi, long maxContentLength = defaultMaxContentLength)() {
auto manager = new ListeningConnectionManager(listeningHost, listeningPort, &doThreadScgiConnection!(CustomCgi, fun, maxContentLength));
manager.listen();
@ -3615,11 +3632,50 @@ struct RequestServer {
doThreadHttpConnectionGuts!(CustomCgi, fun, true)(new FakeSocketForStdin());
}
void stop() {
// FIXME
/++
Stops serving after the current requests.
Bugs:
Not implemented on version=embedded_httpd_processes, version=fastcgi, or on any operating system aside from Linux at this time.
Try SIGINT there perhaps.
A Windows implementation is planned but not sure about the others. Maybe a posix pipe can be used on other OSes. I do not intend
to implement this for the processes config.
+/
version(embedded_httpd_processes) {} else
static void stop() {
globalStopFlag = true;
version(Posix)
if(cancelfd > 0) {
ulong a = 1;
core.sys.posix.unistd.write(cancelfd, &a, a.sizeof);
}
version(Windows)
if(iocp) {
foreach(i; 0 .. 16) // FIXME
PostQueuedCompletionStatus(iocp, 0, cast(ULONG_PTR) null, null);
}
}
}
private alias AliasSeq(T...) = T;
version(with_breaking_cgi_features)
mixin(q{
template ThisFor(alias t) {
static if(__traits(isStaticFunction, t)) {
alias ThisFor = AliasSeq!();
} else {
alias ThisFor = __traits(parent, t);
}
}
});
else
alias ThisFor(alias t) = AliasSeq!();
private __gshared bool globalStopFlag = false;
private int privDropUserId;
private int privDropGroupId;
@ -4097,6 +4153,8 @@ void handleCgiRequest(alias fun, CustomCgi = Cgi, long maxContentLength = defaul
}
}
private __gshared int cancelfd = -1;
/+
The event loop for embedded_httpd_threads will prolly fiber dispatch
cgi constructors too, so slow posts will not monopolize a worker thread.
@ -4526,6 +4584,9 @@ void doThreadHttpConnectionGuts(CustomCgi, alias fun, bool alwaysCloseConnection
closeConnection = true;
}
if(globalStopFlag)
closeConnection = true;
if(closeConnection || alwaysCloseConnection) {
connection.shutdown(SocketShutdown.BOTH);
connection.close();
@ -5092,8 +5153,36 @@ class ListeningConnectionManager {
ubyte nextIndexBack;
shared(int) queueLength;
Socket acceptCancelable() {
version(Posix) {
import core.sys.posix.sys.select;
fd_set read_fds;
FD_ZERO(&read_fds);
FD_SET(listener.handle, &read_fds);
FD_SET(cancelfd, &read_fds);
auto max = listener.handle > cancelfd ? listener.handle : cancelfd;
auto ret = select(max + 1, &read_fds, null, null, null);
if(ret == -1) {
import core.stdc.errno;
if(errno == EINTR)
return null;
else
throw new Exception("wtf select");
}
if(FD_ISSET(cancelfd, &read_fds)) {
return null;
}
if(FD_ISSET(listener.handle, &read_fds))
return listener.accept();
return null;
} else
return listener.accept(); // FIXME: check the cancel flag!
}
void listen() {
running = true;
shared(int) loopBroken;
version(Posix) {
@ -5101,14 +5190,20 @@ class ListeningConnectionManager {
signal(SIGPIPE, SIG_IGN);
}
version(linux) {
if(cancelfd == -1)
cancelfd = eventfd(0, 0);
}
version(cgi_no_threads) {
// NEVER USE THIS
// it exists only for debugging and other special occasions
// the thread mode is faster and less likely to stall the whole
// thing when a request is slow
while(!loopBroken && running) {
auto sn = listener.accept();
while(!loopBroken && !globalStopFlag) {
auto sn = acceptCancelable();
if(sn is null) continue;
cloexec(sn);
try {
handler(sn);
@ -5149,7 +5244,7 @@ class ListeningConnectionManager {
}
while(running) {
while(!globalStopFlag) {
Thread.sleep(1.seconds);
if(fiber_crash_check())
break;
@ -5262,11 +5357,6 @@ class ListeningConnectionManager {
Socket listener;
void delegate(Socket) handler;
bool running;
void quit() {
running = false;
}
}
Socket startListening(string host, ushort port, ref bool tcp, ref void delegate() cleanup, int backQueue) {
@ -5412,6 +5502,8 @@ class ConnectionThread : Thread {
// so if there's a bunch of idle keep-alive connections, it can
// consume all the worker threads... just sitting there.
lcm.semaphore.wait();
if(globalStopFlag)
return;
Socket socket;
synchronized(lcm) {
auto idx = lcm.nextIndexFront;
@ -5575,6 +5667,9 @@ class WorkerThread : Thread {
dg(sn);
} else {
if(cast(size_t) events[idx].data.ptr < 1024) {
throw new Exception("this doesn't look like a fiber pointer...");
}
auto fiber = cast(CgiFiber) events[idx].data.ptr;
fiber.proceed();
}
@ -10901,6 +10996,8 @@ bool apiDispatcher()(Cgi cgi) {
import arsd.dom;
}
+/
version(linux)
private extern(C) int eventfd (uint initval, int flags) nothrow @trusted @nogc;
/*
Copyright: Adam D. Ruppe, 2008 - 2021
License: [http://www.boost.org/LICENSE_1_0.txt|Boost License 1.0].