From 77867da28476b1b746a667c8032d660040a4f9c9 Mon Sep 17 00:00:00 2001 From: "Adam D. Ruppe" Date: Sun, 7 Oct 2018 21:21:38 -0400 Subject: [PATCH] rewrite the thread system (finally!) --- cgi.d | 170 ++++++++++++++++++++++++---------------------------------- 1 file changed, 70 insertions(+), 100 deletions(-) diff --git a/cgi.d b/cgi.d index 38f776a..0b4dbbb 100644 --- a/cgi.d +++ b/cgi.d @@ -7,6 +7,8 @@ // FIXME: to do: add openssl optionally // make sure embedded_httpd doesn't send two answers if one writes() then dies +// future direction: websocket as a separate process that you can sendfile to for an async passoff of those long-lived connections + /++ Provides a uniform server-side API for CGI, FastCGI, SCGI, and HTTP web applications. @@ -3100,6 +3102,7 @@ void doThreadScgiConnection(CustomCgi, alias fun, long maxContentLength)(Socket try { fun(cgi); cgi.close(); + connection.close(); } catch(Throwable t) { // no std err if(!handleException(cgi, t)) { @@ -3401,6 +3404,9 @@ class ConnectionThread2 : Thread { void function(Socket) handler; } +import core.sync.semaphore; +import core.atomic; + /** To use this thing: @@ -3415,38 +3421,57 @@ class ConnectionThread2 : Thread { FIXME: should I offer an event based async thing like netman did too? Yeah, probably. */ class ListeningConnectionManager { + Semaphore semaphore; + Socket[256] queue; + shared(ubyte) nextIndexFront; + ubyte nextIndexBack; + shared(int) queueLength; + void listen() { - version(cgi_multiple_connections_per_thread) { - import std.concurrency; - import std.random; - ConnectionThread2[16] pool; - foreach(ref p; pool) { - p = new ConnectionThread2(handler); - p.start(); - } + running = true; + shared(int) loopBroken; - while(true) { - auto connection = listener.accept(); + version(cgi_no_threads) { + // NEVER USE THIS + // it exists only for debugging and other special occasions - bool handled = false; - retry: - foreach(p; pool) - if(p.available) { - handled = true; - send(p.tid, cast(size_t) cast(void*) connection); - break; - } - - // none available right now, make it wait a bit then try again - if(!handled) { - Thread.sleep(dur!"msecs"(25)); - goto retry; + // the thread mode is faster and less likely to stall the whole + // thing when a request is slow + while(!loopBroken && running) { + auto sn = listener.accept(); + try { + handler(sn); + } catch(Exception e) { + // if a connection goes wrong, we want to just say no, but try to carry on unless it is an Error of some sort (in which case, we'll die. You might want an external helper program to revive the server when it dies) + sn.close(); } } } else { - foreach(connection; this) - handler(connection); - + semaphore = new Semaphore(); + + ConnectionThread[16] threads; + foreach(ref thread; threads) { + thread = new ConnectionThread(this, handler); + thread.start(); + } + + while(!loopBroken && running) { + auto sn = listener.accept(); + while(queueLength >= queue.length) + Thread.sleep(1.msecs); + synchronized(this) { + queue[nextIndexBack] = sn; + nextIndexBack++; + atomicOp!"+="(queueLength, 1); + } + semaphore.notify(); + + foreach(thread; threads) { + if(!thread.isRunning) { + thread.join(); + } + } + } } } @@ -3465,50 +3490,6 @@ class ListeningConnectionManager { void quit() { running = false; } - - int opApply(scope CMT dg) { - running = true; - shared(int) loopBroken; - - while(!loopBroken && running) { - auto sn = listener.accept(); - try { - version(cgi_no_threads) { - // NEVER USE THIS - // it exists only for debugging and other special occasions - - // the thread mode is faster and less likely to stall the whole - // thing when a request is slow - dg(sn); - } else { - /* - version(cgi_multiple_connections_per_thread) { - bool foundOne = false; - tryAgain: - foreach(t; pool) - if(t.s is null) { - t.s = sn; - foundOne = true; - break; - } - Thread.sleep(dur!"msecs"(1)); - if(!foundOne) - goto tryAgain; - } else { - */ - auto thread = new ConnectionThread(sn, &loopBroken, dg); - thread.start(); - //} - } - // loopBroken = dg(sn); - } catch(Exception e) { - // if a connection goes wrong, we want to just say no, but try to carry on unless it is an Error of some sort (in which case, we'll die. You might want an external helper program to revive the server when it dies) - sn.close(); - } - } - - return loopBroken; - } } // helper function to send a lot to a socket. Since this blocks for the buffer (possibly several times), you should probably call it in a separate thread or something. @@ -3532,46 +3513,35 @@ class ConnectionException : Exception { } } -alias int delegate(Socket) CMT; +alias void function(Socket) CMT; import core.thread; class ConnectionThread : Thread { - this(Socket s, shared(int)* breakSignifier, CMT dg) { - this.s = s; - this.breakSignifier = breakSignifier; + this(ListeningConnectionManager lcm, CMT dg) { + this.lcm = lcm; this.dg = dg; - super(&runAll); - } - - void runAll() { - if(s !is null) - run(); - /* - version(cgi_multiple_connections_per_thread) { - while(1) { - while(s is null) - sleep(dur!"msecs"(1)); - run(); - } - } - */ + super(&run); } void run() { - scope(exit) { - // I don't want to double close it, and it does this on close() according to source - // might be fragile, but meh - if(s.handle() != socket_t.init) - s.close(); - s = null; // so we know this thread is clear - } - if(auto result = dg(s)) { - *breakSignifier = result; + while(true) { + lcm.semaphore.wait(); + Socket socket; + synchronized(lcm) { + auto idx = lcm.nextIndexFront; + socket = lcm.queue[idx]; + lcm.queue[idx] = null; + atomicOp!"+="(lcm.nextIndexFront, 1); + atomicOp!"-="(lcm.queueLength, 1); + } + try + dg(socket); + catch(Exception e) + socket.close(); } } - Socket s; - shared(int)* breakSignifier; + ListeningConnectionManager lcm; CMT dg; }