diff --git a/cgi.d b/cgi.d index b0e4c89..e359185 100644 --- a/cgi.d +++ b/cgi.d @@ -402,6 +402,11 @@ version(embedded_httpd_hybrid) { version=cgi_use_fiber; } +version(cgi_use_fork) + enum cgi_use_fork_default = true; +else + enum cgi_use_fork_default = false; + // the servers must know about the connections to talk to them; the interfaces are vital version(with_addon_servers) version=with_addon_servers_connections; @@ -1861,6 +1866,8 @@ class Cgi { // FIXME: if size is > max content length it should // also fail at this point. _rawDataOutput(cast(ubyte[]) "HTTP/1.1 100 Continue\r\n\r\n"); + + // FIXME: let the user write out 103 early hints too } } // else @@ -3492,6 +3499,28 @@ struct RequestServer { /// ushort listeningPort = defaultListeningPort(); + /++ + Uses a fork() call, if available, to provide additional crash resiliency and possibly improved performance. On the + other hand, if you fork, you must not assume any memory is shared between requests (you shouldn't be anyway though! But + if you have to, you probably want to set this to false and use an explicit threaded server with [serveEmbeddedHttp]) and + [stop] may not work as well. + + History: + Added August 12, 2022 (dub v10.9). Previously, this was only configurable through the `-version=cgi_no_fork` + argument to dmd. That version still defines the value of `cgi_use_fork_default`, used to initialize this, for + compatibility. + +/ + bool useFork = cgi_use_fork_default; + + /++ + Determines the number of worker threads to spawn per process, for server modes that use worker threads. 0 will use a + default based on the number of cpus modified by the server mode. + + History: + Added August 12, 2022 (dub v10.9) + +/ + int numberOfThreads = 0; + /// this(string defaultHost, ushort defaultPort) { this.listeningHost = defaultHost; @@ -3656,7 +3685,7 @@ struct RequestServer { __traits(child, _this, fun)(cgi); else static assert(0, "Not implemented in your compiler version!"); } - auto manager = new ListeningConnectionManager(listeningHost, listeningPort, &doThreadHttpConnection!(CustomCgi, funToUse)); + auto manager = new ListeningConnectionManager(listeningHost, listeningPort, &doThreadHttpConnection!(CustomCgi, funToUse), null, useFork, numberOfThreads); manager.listen(); } @@ -3665,7 +3694,7 @@ struct RequestServer { +/ void serveScgi(alias fun, CustomCgi = Cgi, long maxContentLength = defaultMaxContentLength)() { globalStopFlag = false; - auto manager = new ListeningConnectionManager(listeningHost, listeningPort, &doThreadScgiConnection!(CustomCgi, fun, maxContentLength)); + auto manager = new ListeningConnectionManager(listeningHost, listeningPort, &doThreadScgiConnection!(CustomCgi, fun, maxContentLength), null, useFork, numberOfThreads); manager.listen(); } @@ -3681,18 +3710,48 @@ struct RequestServer { doThreadHttpConnectionGuts!(CustomCgi, fun, true)(new FakeSocketForStdin()); } + /++ + The [stop] function sets a flag that request handlers can (and should) check periodically. If a handler doesn't + respond to this flag, the library will force the issue. This determines when and how the issue will be forced. + +/ + enum ForceStop { + /++ + Stops accepting new requests, but lets ones already in the queue start and complete before exiting. + +/ + afterQueuedRequestsComplete, + /++ + Finishes requests already started their handlers, but drops any others in the queue. Streaming handlers + should cooperate and exit gracefully, but if they don't, it will continue waiting for them. + +/ + afterCurrentRequestsComplete, + /++ + Partial response writes will throw an exception, cancelling any streaming response, but complete + writes will continue to process. Request handlers that respect the stop token will also gracefully cancel. + +/ + cancelStreamingRequestsEarly, + /++ + All writes will throw. + +/ + cancelAllRequestsEarly, + /++ + Use OS facilities to forcibly kill running threads. The server process will be in an undefined state after this call (if this call ever returns). + +/ + forciblyTerminate, + } + version(embedded_httpd_processes) {} else /++ - Stops serving after the current requests. + Stops serving after the current requests are completed. Bugs: - Not implemented on version=embedded_httpd_processes, version=fastcgi, or on any operating system aside from Linux at this time. - Try SIGINT there perhaps. + Not implemented on version=embedded_httpd_processes, version=fastcgi on any system, or embedded_httpd on Windows (it does work on embedded_httpd_hybrid + on Windows however). Only partially implemented on non-Linux posix systems. - A Windows implementation is planned but not sure about the others. Maybe a posix pipe can be used on other OSes. I do not intend - to implement this for the processes config. + You might also try SIGINT perhaps. + + The stopPriority is not yet fully implemented. +/ - static void stop() { + static void stop(ForceStop stopPriority = ForceStop.afterCurrentRequestsComplete) { globalStopFlag = true; version(Posix) @@ -5226,6 +5285,17 @@ class ListeningConnectionManager { return listener.accept(); // FIXME: check the cancel flag! } + int defaultNumberOfThreads() { + import std.parallelism; + version(cgi_use_fiber) { + return totalCPUs * 1 + 1; + } else { + // I times 4 here because there's a good chance some will be blocked on i/o. + return totalCPUs * 4; + } + + } + void listen() { shared(int) loopBroken; @@ -5257,11 +5327,12 @@ class ListeningConnectionManager { } } } else { - import std.parallelism; - version(cgi_use_fork) { - //asm { int 3; } - fork(); + if(useFork) { + version(linux) { + //asm { int 3; } + fork(); + } } version(cgi_use_fiber) { @@ -5270,7 +5341,7 @@ class ListeningConnectionManager { listener.accept(); } - WorkerThread[] threads = new WorkerThread[](totalCPUs * 1 + 1); + WorkerThread[] threads = new WorkerThread[](numberOfThreads); foreach(i, ref thread; threads) { thread = new WorkerThread(this, handler, cast(int) i); thread.start(); @@ -5297,8 +5368,7 @@ class ListeningConnectionManager { } else { semaphore = new Semaphore(); - // I times 4 here because there's a good chance some will be blocked on i/o. - ConnectionThread[] threads = new ConnectionThread[](totalCPUs * 4); + ConnectionThread[] threads = new ConnectionThread[](numberOfThreads); foreach(i, ref thread; threads) { thread = new ConnectionThread(this, handler, cast(int) i); thread.start(); @@ -5382,17 +5452,20 @@ class ListeningConnectionManager { private void dg_handler(Socket s) { fhandler(s); } - this(string host, ushort port, void function(Socket) handler, void delegate() dropPrivs = null) { + this(string host, ushort port, void function(Socket) handler, void delegate() dropPrivs = null, bool useFork = cgi_use_fork_default, int numberOfThreads = 0) { fhandler = handler; - this(host, port, &dg_handler, dropPrivs); + this(host, port, &dg_handler, dropPrivs, useFork, numberOfThreads); } - this(string host, ushort port, void delegate(Socket) handler, void delegate() dropPrivs = null) { + this(string host, ushort port, void delegate(Socket) handler, void delegate() dropPrivs = null, bool useFork = cgi_use_fork_default, int numberOfThreads = 0) { this.handler = handler; + this.useFork = useFork; + this.numberOfThreads = numberOfThreads ? numberOfThreads : defaultNumberOfThreads(); listener = startListening(host, port, tcp, cleanup, 128, dropPrivs); - version(cgi_use_fiber) version(cgi_use_fork) + version(cgi_use_fiber) + if(useFork) listener.blocking = false; // this is the UI control thread and thus gets more priority @@ -5401,6 +5474,9 @@ class ListeningConnectionManager { Socket listener; void delegate(Socket) handler; + + immutable bool useFork; + int numberOfThreads; } Socket startListening(string host, ushort port, ref bool tcp, ref void delegate() cleanup, int backQueue, void delegate() dropPrivs) {