rewrite the thread system (finally!)

This commit is contained in:
Adam D. Ruppe 2018-10-07 21:21:38 -04:00
parent 6508cbad3a
commit 77867da284
1 changed files with 70 additions and 100 deletions

170
cgi.d
View File

@ -7,6 +7,8 @@
// FIXME: to do: add openssl optionally
// make sure embedded_httpd doesn't send two answers if one writes() then dies
// future direction: websocket as a separate process that you can sendfile to for an async passoff of those long-lived connections
/++
Provides a uniform server-side API for CGI, FastCGI, SCGI, and HTTP web applications.
@ -3100,6 +3102,7 @@ void doThreadScgiConnection(CustomCgi, alias fun, long maxContentLength)(Socket
try {
fun(cgi);
cgi.close();
connection.close();
} catch(Throwable t) {
// no std err
if(!handleException(cgi, t)) {
@ -3401,6 +3404,9 @@ class ConnectionThread2 : Thread {
void function(Socket) handler;
}
import core.sync.semaphore;
import core.atomic;
/**
To use this thing:
@ -3415,38 +3421,57 @@ class ConnectionThread2 : Thread {
FIXME: should I offer an event based async thing like netman did too? Yeah, probably.
*/
class ListeningConnectionManager {
Semaphore semaphore;
Socket[256] queue;
shared(ubyte) nextIndexFront;
ubyte nextIndexBack;
shared(int) queueLength;
void listen() {
version(cgi_multiple_connections_per_thread) {
import std.concurrency;
import std.random;
ConnectionThread2[16] pool;
foreach(ref p; pool) {
p = new ConnectionThread2(handler);
p.start();
}
running = true;
shared(int) loopBroken;
while(true) {
auto connection = listener.accept();
version(cgi_no_threads) {
// NEVER USE THIS
// it exists only for debugging and other special occasions
bool handled = false;
retry:
foreach(p; pool)
if(p.available) {
handled = true;
send(p.tid, cast(size_t) cast(void*) connection);
break;
}
// none available right now, make it wait a bit then try again
if(!handled) {
Thread.sleep(dur!"msecs"(25));
goto retry;
// the thread mode is faster and less likely to stall the whole
// thing when a request is slow
while(!loopBroken && running) {
auto sn = listener.accept();
try {
handler(sn);
} catch(Exception e) {
// if a connection goes wrong, we want to just say no, but try to carry on unless it is an Error of some sort (in which case, we'll die. You might want an external helper program to revive the server when it dies)
sn.close();
}
}
} else {
foreach(connection; this)
handler(connection);
semaphore = new Semaphore();
ConnectionThread[16] threads;
foreach(ref thread; threads) {
thread = new ConnectionThread(this, handler);
thread.start();
}
while(!loopBroken && running) {
auto sn = listener.accept();
while(queueLength >= queue.length)
Thread.sleep(1.msecs);
synchronized(this) {
queue[nextIndexBack] = sn;
nextIndexBack++;
atomicOp!"+="(queueLength, 1);
}
semaphore.notify();
foreach(thread; threads) {
if(!thread.isRunning) {
thread.join();
}
}
}
}
}
@ -3465,50 +3490,6 @@ class ListeningConnectionManager {
void quit() {
running = false;
}
int opApply(scope CMT dg) {
running = true;
shared(int) loopBroken;
while(!loopBroken && running) {
auto sn = listener.accept();
try {
version(cgi_no_threads) {
// NEVER USE THIS
// it exists only for debugging and other special occasions
// the thread mode is faster and less likely to stall the whole
// thing when a request is slow
dg(sn);
} else {
/*
version(cgi_multiple_connections_per_thread) {
bool foundOne = false;
tryAgain:
foreach(t; pool)
if(t.s is null) {
t.s = sn;
foundOne = true;
break;
}
Thread.sleep(dur!"msecs"(1));
if(!foundOne)
goto tryAgain;
} else {
*/
auto thread = new ConnectionThread(sn, &loopBroken, dg);
thread.start();
//}
}
// loopBroken = dg(sn);
} catch(Exception e) {
// if a connection goes wrong, we want to just say no, but try to carry on unless it is an Error of some sort (in which case, we'll die. You might want an external helper program to revive the server when it dies)
sn.close();
}
}
return loopBroken;
}
}
// helper function to send a lot to a socket. Since this blocks for the buffer (possibly several times), you should probably call it in a separate thread or something.
@ -3532,46 +3513,35 @@ class ConnectionException : Exception {
}
}
alias int delegate(Socket) CMT;
alias void function(Socket) CMT;
import core.thread;
class ConnectionThread : Thread {
this(Socket s, shared(int)* breakSignifier, CMT dg) {
this.s = s;
this.breakSignifier = breakSignifier;
this(ListeningConnectionManager lcm, CMT dg) {
this.lcm = lcm;
this.dg = dg;
super(&runAll);
}
void runAll() {
if(s !is null)
run();
/*
version(cgi_multiple_connections_per_thread) {
while(1) {
while(s is null)
sleep(dur!"msecs"(1));
run();
}
}
*/
super(&run);
}
void run() {
scope(exit) {
// I don't want to double close it, and it does this on close() according to source
// might be fragile, but meh
if(s.handle() != socket_t.init)
s.close();
s = null; // so we know this thread is clear
}
if(auto result = dg(s)) {
*breakSignifier = result;
while(true) {
lcm.semaphore.wait();
Socket socket;
synchronized(lcm) {
auto idx = lcm.nextIndexFront;
socket = lcm.queue[idx];
lcm.queue[idx] = null;
atomicOp!"+="(lcm.nextIndexFront, 1);
atomicOp!"-="(lcm.queueLength, 1);
}
try
dg(socket);
catch(Exception e)
socket.close();
}
}
Socket s;
shared(int)* breakSignifier;
ListeningConnectionManager lcm;
CMT dg;
}