more runtime config options

This commit is contained in:
Adam D. Ruppe 2022-08-23 12:45:21 -04:00
parent 935e776b65
commit 52ff937a12
1 changed files with 95 additions and 19 deletions

114
cgi.d
View File

@ -402,6 +402,11 @@ version(embedded_httpd_hybrid) {
version=cgi_use_fiber;
}
version(cgi_use_fork)
enum cgi_use_fork_default = true;
else
enum cgi_use_fork_default = false;
// the servers must know about the connections to talk to them; the interfaces are vital
version(with_addon_servers)
version=with_addon_servers_connections;
@ -1861,6 +1866,8 @@ class Cgi {
// FIXME: if size is > max content length it should
// also fail at this point.
_rawDataOutput(cast(ubyte[]) "HTTP/1.1 100 Continue\r\n\r\n");
// FIXME: let the user write out 103 early hints too
}
}
// else
@ -3492,6 +3499,28 @@ struct RequestServer {
///
ushort listeningPort = defaultListeningPort();
/++
Uses a fork() call, if available, to provide additional crash resiliency and possibly improved performance. On the
other hand, if you fork, you must not assume any memory is shared between requests (you shouldn't be anyway though! But
if you have to, you probably want to set this to false and use an explicit threaded server with [serveEmbeddedHttp]) and
[stop] may not work as well.
History:
Added August 12, 2022 (dub v10.9). Previously, this was only configurable through the `-version=cgi_no_fork`
argument to dmd. That version still defines the value of `cgi_use_fork_default`, used to initialize this, for
compatibility.
+/
bool useFork = cgi_use_fork_default;
/++
Determines the number of worker threads to spawn per process, for server modes that use worker threads. 0 will use a
default based on the number of cpus modified by the server mode.
History:
Added August 12, 2022 (dub v10.9)
+/
int numberOfThreads = 0;
///
this(string defaultHost, ushort defaultPort) {
this.listeningHost = defaultHost;
@ -3656,7 +3685,7 @@ struct RequestServer {
__traits(child, _this, fun)(cgi);
else static assert(0, "Not implemented in your compiler version!");
}
auto manager = new ListeningConnectionManager(listeningHost, listeningPort, &doThreadHttpConnection!(CustomCgi, funToUse));
auto manager = new ListeningConnectionManager(listeningHost, listeningPort, &doThreadHttpConnection!(CustomCgi, funToUse), null, useFork, numberOfThreads);
manager.listen();
}
@ -3665,7 +3694,7 @@ struct RequestServer {
+/
void serveScgi(alias fun, CustomCgi = Cgi, long maxContentLength = defaultMaxContentLength)() {
globalStopFlag = false;
auto manager = new ListeningConnectionManager(listeningHost, listeningPort, &doThreadScgiConnection!(CustomCgi, fun, maxContentLength));
auto manager = new ListeningConnectionManager(listeningHost, listeningPort, &doThreadScgiConnection!(CustomCgi, fun, maxContentLength), null, useFork, numberOfThreads);
manager.listen();
}
@ -3681,18 +3710,48 @@ struct RequestServer {
doThreadHttpConnectionGuts!(CustomCgi, fun, true)(new FakeSocketForStdin());
}
/++
The [stop] function sets a flag that request handlers can (and should) check periodically. If a handler doesn't
respond to this flag, the library will force the issue. This determines when and how the issue will be forced.
+/
enum ForceStop {
/++
Stops accepting new requests, but lets ones already in the queue start and complete before exiting.
+/
afterQueuedRequestsComplete,
/++
Finishes requests already started their handlers, but drops any others in the queue. Streaming handlers
should cooperate and exit gracefully, but if they don't, it will continue waiting for them.
+/
afterCurrentRequestsComplete,
/++
Partial response writes will throw an exception, cancelling any streaming response, but complete
writes will continue to process. Request handlers that respect the stop token will also gracefully cancel.
+/
cancelStreamingRequestsEarly,
/++
All writes will throw.
+/
cancelAllRequestsEarly,
/++
Use OS facilities to forcibly kill running threads. The server process will be in an undefined state after this call (if this call ever returns).
+/
forciblyTerminate,
}
version(embedded_httpd_processes) {} else
/++
Stops serving after the current requests.
Stops serving after the current requests are completed.
Bugs:
Not implemented on version=embedded_httpd_processes, version=fastcgi, or on any operating system aside from Linux at this time.
Try SIGINT there perhaps.
Not implemented on version=embedded_httpd_processes, version=fastcgi on any system, or embedded_httpd on Windows (it does work on embedded_httpd_hybrid
on Windows however). Only partially implemented on non-Linux posix systems.
A Windows implementation is planned but not sure about the others. Maybe a posix pipe can be used on other OSes. I do not intend
to implement this for the processes config.
You might also try SIGINT perhaps.
The stopPriority is not yet fully implemented.
+/
static void stop() {
static void stop(ForceStop stopPriority = ForceStop.afterCurrentRequestsComplete) {
globalStopFlag = true;
version(Posix)
@ -5226,6 +5285,17 @@ class ListeningConnectionManager {
return listener.accept(); // FIXME: check the cancel flag!
}
int defaultNumberOfThreads() {
import std.parallelism;
version(cgi_use_fiber) {
return totalCPUs * 1 + 1;
} else {
// I times 4 here because there's a good chance some will be blocked on i/o.
return totalCPUs * 4;
}
}
void listen() {
shared(int) loopBroken;
@ -5257,11 +5327,12 @@ class ListeningConnectionManager {
}
}
} else {
import std.parallelism;
version(cgi_use_fork) {
//asm { int 3; }
fork();
if(useFork) {
version(linux) {
//asm { int 3; }
fork();
}
}
version(cgi_use_fiber) {
@ -5270,7 +5341,7 @@ class ListeningConnectionManager {
listener.accept();
}
WorkerThread[] threads = new WorkerThread[](totalCPUs * 1 + 1);
WorkerThread[] threads = new WorkerThread[](numberOfThreads);
foreach(i, ref thread; threads) {
thread = new WorkerThread(this, handler, cast(int) i);
thread.start();
@ -5297,8 +5368,7 @@ class ListeningConnectionManager {
} else {
semaphore = new Semaphore();
// I times 4 here because there's a good chance some will be blocked on i/o.
ConnectionThread[] threads = new ConnectionThread[](totalCPUs * 4);
ConnectionThread[] threads = new ConnectionThread[](numberOfThreads);
foreach(i, ref thread; threads) {
thread = new ConnectionThread(this, handler, cast(int) i);
thread.start();
@ -5382,17 +5452,20 @@ class ListeningConnectionManager {
private void dg_handler(Socket s) {
fhandler(s);
}
this(string host, ushort port, void function(Socket) handler, void delegate() dropPrivs = null) {
this(string host, ushort port, void function(Socket) handler, void delegate() dropPrivs = null, bool useFork = cgi_use_fork_default, int numberOfThreads = 0) {
fhandler = handler;
this(host, port, &dg_handler, dropPrivs);
this(host, port, &dg_handler, dropPrivs, useFork, numberOfThreads);
}
this(string host, ushort port, void delegate(Socket) handler, void delegate() dropPrivs = null) {
this(string host, ushort port, void delegate(Socket) handler, void delegate() dropPrivs = null, bool useFork = cgi_use_fork_default, int numberOfThreads = 0) {
this.handler = handler;
this.useFork = useFork;
this.numberOfThreads = numberOfThreads ? numberOfThreads : defaultNumberOfThreads();
listener = startListening(host, port, tcp, cleanup, 128, dropPrivs);
version(cgi_use_fiber) version(cgi_use_fork)
version(cgi_use_fiber)
if(useFork)
listener.blocking = false;
// this is the UI control thread and thus gets more priority
@ -5401,6 +5474,9 @@ class ListeningConnectionManager {
Socket listener;
void delegate(Socket) handler;
immutable bool useFork;
int numberOfThreads;
}
Socket startListening(string host, ushort port, ref bool tcp, ref void delegate() cleanup, int backQueue, void delegate() dropPrivs) {