multiple listen specs

This commit is contained in:
Adam D. Ruppe 2023-07-31 19:23:01 -04:00
parent 6b96fb87d6
commit e91e04be32
1 changed files with 216 additions and 45 deletions

261
cgi.d
View File

@ -3763,13 +3763,51 @@ bool trySimulatedRequest(alias fun, CustomCgi = Cgi)(string[] args) if(is(Custom
History:
Added Sept 26, 2020 (release version 8.5).
The `listen` member was added July 31, 2023.
+/
struct RequestServer {
///
/++
Sets the host and port the server will listen on. This is semi-deprecated; the new (as of July 31, 2023) [listenSpec] parameter obsoletes these. You cannot use both together; the listeningHost and listeningPort are ONLY used if listenSpec is null.
+/
string listeningHost = defaultListeningHost();
///
/// ditto
ushort listeningPort = defaultListeningPort();
/++
The array of addresses you want to listen on. The format looks like a url but has a few differences.
`http://localhost:8080`
`http://unix:filename/here`
`scgi://abstract:/name/here`
`http://[::1]:4444`
Note that IPv6 addresses must be enclosed in brackets. If you want to listen on an interface called `unix` or `abstract`, contact me, that is not supported but I could add some kind of escape mechanism.
If you leave off the protocol, it assumes the default based on compile flags. If you only give a number, it is assumed to be a port on any tcp interface.
`localhost:8080` serves the default protocol.
`8080` or `:8080` assumes default protocol on localhost.
The protocols can be `http:`, `https:`, `scgi:`, and (if compiled in), `fcgi:`. Original `cgi` is not supported with this, since it is transactional with a single process.
Valid hosts are an IPv4 address (with a mandatory port), an IPv6 address (with a mandatory port), just a port alone, `unix:/path/to/unix/socket` (which may be a relative path without a leading slash), or `abstract:/path/to/linux/abstract/namespace`.
`http://unix:foo` will serve http over the unix domain socket named `foo` in the current working directory.
$(PITFALL
If you set this to anything non-null (including a non-null, zero-length array) any `listenSpec` entries, [listeningHost] and [listeningPort] are ignored.
)
History:
Added July 31, 2023 (dub v11.0)
+/
string[] listenSpec;
/++
Uses a fork() call, if available, to provide additional crash resiliency and possibly improved performance. On the
other hand, if you fork, you must not assume any memory is shared between requests (you shouldn't be anyway though! But
@ -3792,13 +3830,23 @@ struct RequestServer {
+/
int numberOfThreads = 0;
///
/++
Creates a server configured to listen to multiple URLs.
History:
Added July 31, 2023 (dub v11.0)
+/
this(string[] listenTo) {
this.listenSpec = listenTo;
}
/// Creates a server object configured to listen on a single host and port.
this(string defaultHost, ushort defaultPort) {
this.listeningHost = defaultHost;
this.listeningPort = defaultPort;
}
///
/// ditto
this(ushort defaultPort) {
listeningPort = defaultPort;
}
@ -3806,29 +3854,45 @@ struct RequestServer {
/++
Reads the command line arguments into the values here.
Possible arguments are `--listening-host`, `--listening-port` (or `--port`), `--uid`, and `--gid`.
Possible arguments are `--listen` (can appear multiple times), `--listening-host`, `--listening-port` (or `--port`), `--uid`, and `--gid`.
Please note you cannot combine `--listen` with `--listening-host` or `--listening-port` / `--port`. Use one or the other style.
+/
void configureFromCommandLine(string[] args) {
bool portOrHostFound = false;
bool foundPort = false;
bool foundHost = false;
bool foundUid = false;
bool foundGid = false;
bool foundListen = false;
foreach(arg; args) {
if(foundPort) {
listeningPort = to!ushort(arg);
portOrHostFound = true;
foundPort = false;
continue;
}
if(foundHost) {
listeningHost = arg;
portOrHostFound = true;
foundHost = false;
continue;
}
if(foundUid) {
privilegesDropToUid = to!uid_t(arg);
foundUid = false;
continue;
}
if(foundGid) {
privilegesDropToGid = to!gid_t(arg);
foundGid = false;
continue;
}
if(foundListen) {
this.listenSpec ~= arg;
foundListen = false;
continue;
}
if(arg == "--listening-host" || arg == "-h" || arg == "/listening-host")
foundHost = true;
@ -3838,6 +3902,12 @@ struct RequestServer {
foundUid = true;
else if(arg == "--gid")
foundGid = true;
else if(arg == "--listen")
foundListen = true;
}
if(portOrHostFound && listenSpec.length) {
throw new Exception("You passed both a --listening-host or --listening-port and a --listen argument. You should fix your script to ONLY use --listen arguments.");
}
}
@ -3956,7 +4026,9 @@ struct RequestServer {
__traits(child, _this, fun)(cgi);
else static assert(0, "Not implemented in your compiler version!");
}
auto manager = new ListeningConnectionManager(listeningHost, listeningPort, &doThreadHttpConnection!(CustomCgi, funToUse), null, useFork, numberOfThreads);
auto manager = this.listenSpec is null ?
new ListeningConnectionManager(listeningHost, listeningPort, &doThreadHttpConnection!(CustomCgi, funToUse), null, useFork, numberOfThreads) :
new ListeningConnectionManager(this.listenSpec, &doThreadHttpConnection!(CustomCgi, funToUse), null, useFork, numberOfThreads);
manager.listen();
}
@ -3965,7 +4037,9 @@ struct RequestServer {
+/
void serveScgi(alias fun, CustomCgi = Cgi, long maxContentLength = defaultMaxContentLength)() {
globalStopFlag = false;
auto manager = new ListeningConnectionManager(listeningHost, listeningPort, &doThreadScgiConnection!(CustomCgi, fun, maxContentLength), null, useFork, numberOfThreads);
auto manager = this.listenSpec is null ?
new ListeningConnectionManager(listeningHost, listeningPort, &doThreadScgiConnection!(CustomCgi, fun, maxContentLength), null, useFork, numberOfThreads) :
new ListeningConnectionManager(this.listenSpec, &doThreadScgiConnection!(CustomCgi, fun, maxContentLength), null, useFork, numberOfThreads);
manager.listen();
}
@ -5544,10 +5618,17 @@ class ListeningConnectionManager {
import core.sys.posix.sys.select;
fd_set read_fds;
FD_ZERO(&read_fds);
FD_SET(listener.handle, &read_fds);
if(cancelfd != -1)
int max = 0;
foreach(listener; listeners) {
FD_SET(listener.handle, &read_fds);
if(listener.handle > max)
max = listener.handle;
}
if(cancelfd != -1) {
FD_SET(cancelfd, &read_fds);
auto max = listener.handle > cancelfd ? listener.handle : cancelfd;
if(cancelfd > max)
max = cancelfd;
}
auto ret = select(max + 1, &read_fds, null, null, null);
if(ret == -1) {
import core.stdc.errno;
@ -5561,24 +5642,27 @@ class ListeningConnectionManager {
return null;
}
if(FD_ISSET(listener.handle, &read_fds))
return listener.accept();
foreach(listener; listeners) {
if(FD_ISSET(listener.handle, &read_fds))
return listener.accept();
}
return null;
} else {
Socket socket = listener;
auto check = new SocketSet();
keep_looping:
check.reset();
check.add(socket);
foreach(listener; listeners)
check.add(listener);
// just to check the stop flag on a kinda busy loop. i hate this FIXME
auto got = Socket.select(check, null, null, 3.seconds);
if(got > 0)
return listener.accept();
foreach(listener; listeners)
if(check.isSet(listener))
return listener.accept();
if(globalStopFlag)
return null;
else
@ -5639,7 +5723,9 @@ class ListeningConnectionManager {
version(cgi_use_fiber) {
version(Windows) {
listener.accept();
// please note these are overlapped sockets! so the accept just kicks things off
foreach(listener; listeners)
listener.accept();
}
WorkerThread[] threads = new WorkerThread[](numberOfThreads);
@ -5753,27 +5839,105 @@ class ListeningConnectionManager {
private void dg_handler(Socket s) {
fhandler(s);
}
this(string[] listenSpec, void function(Socket) handler, void delegate() dropPrivs = null, bool useFork = cgi_use_fork_default, int numberOfThreads = 0) {
fhandler = handler;
this(listenSpec, &dg_handler, dropPrivs, useFork, numberOfThreads);
}
this(string[] listenSpec, void delegate(Socket) handler, void delegate() dropPrivs = null, bool useFork = cgi_use_fork_default, int numberOfThreads = 0) {
string[] host;
ushort[] port;
foreach(spec; listenSpec) {
/+
The format:
protocol://
address_spec
Protocol is optional. Must be http, https, scgi, or fastcgi.
address_spec is either:
ipv4 address : port
[ipv6 address] : port
unix:filename
abstract:name
port <which is tcp but on any interface>
+/
string protocol;
string address_spec;
auto protocolIdx = spec.indexOf("://");
if(protocolIdx != -1) {
protocol = spec[0 .. protocolIdx];
address_spec = spec[protocolIdx + "://".length .. $];
} else {
address_spec = spec;
}
if(address_spec.startsWith("unix:") || address_spec.startsWith("abstract:")) {
host ~= address_spec;
port ~= 0;
} else {
auto idx = address_spec.lastIndexOf(":");
if(idx == -1) {
host ~= null;
} else {
auto as = address_spec[0 .. idx];
if(as.length >= 3 && as[0] == '[' && as[$-1] == ']')
as = as[1 .. $-1];
host ~= as;
}
port ~= address_spec[idx + 1 .. $].to!ushort;
}
}
this(host, port, handler, dropPrivs, useFork, numberOfThreads);
}
this(string host, ushort port, void function(Socket) handler, void delegate() dropPrivs = null, bool useFork = cgi_use_fork_default, int numberOfThreads = 0) {
this([host], [port], handler, dropPrivs, useFork, numberOfThreads);
}
this(string host, ushort port, void delegate(Socket) handler, void delegate() dropPrivs = null, bool useFork = cgi_use_fork_default, int numberOfThreads = 0) {
this([host], [port], handler, dropPrivs, useFork, numberOfThreads);
}
this(string[] host, ushort[] port, void function(Socket) handler, void delegate() dropPrivs = null, bool useFork = cgi_use_fork_default, int numberOfThreads = 0) {
fhandler = handler;
this(host, port, &dg_handler, dropPrivs, useFork, numberOfThreads);
}
this(string host, ushort port, void delegate(Socket) handler, void delegate() dropPrivs = null, bool useFork = cgi_use_fork_default, int numberOfThreads = 0) {
this(string[] host, ushort[] port, void delegate(Socket) handler, void delegate() dropPrivs = null, bool useFork = cgi_use_fork_default, int numberOfThreads = 0) {
assert(host.length == port.length);
this.handler = handler;
this.useFork = useFork;
this.numberOfThreads = numberOfThreads ? numberOfThreads : defaultNumberOfThreads();
listener = startListening(host, port, tcp, cleanup, 128, dropPrivs);
listeners.reserve(host.length);
foreach(i; 0 .. host.length)
if(host[i] == "localhost") {
listeners ~= startListening("127.0.0.1", port[i], tcp, cleanup, 128, dropPrivs);
listeners ~= startListening("::1", port[i], tcp, cleanup, 128, dropPrivs);
} else {
listeners ~= startListening(host[i], port[i], tcp, cleanup, 128, dropPrivs);
}
version(cgi_use_fiber)
if(useFork)
listener.blocking = false;
if(useFork) {
foreach(listener; listeners)
listener.blocking = false;
}
// this is the UI control thread and thus gets more priority
Thread.getThis.priority = Thread.PRIORITY_MAX;
}
Socket listener;
Socket[] listeners;
void delegate(Socket) handler;
immutable bool useFork;
@ -6048,11 +6212,13 @@ class WorkerThread : Thread {
epoll_ctl(epfd, EPOLL_CTL_ADD, cancelfd, &ev);
}
foreach(listener; lcm.listeners) {
epoll_event ev;
ev.events = EPOLLIN | EPOLLEXCLUSIVE; // EPOLLEXCLUSIVE is only available on kernels since like 2017 but that's prolly good enough.
ev.data.fd = lcm.listener.handle;
if(epoll_ctl(epfd, EPOLL_CTL_ADD, lcm.listener.handle, &ev) == -1)
throw new Exception("epoll_ctl " ~ to!string(errno));
ev.events = EPOLLIN | EPOLLEXCLUSIVE; // EPOLLEXCLUSIVE is only available on kernels since like 2017 but that's prolly good enough.
ev.data.fd = listener.handle;
if(epoll_ctl(epfd, EPOLL_CTL_ADD, listener.handle, &ev) == -1)
throw new Exception("epoll_ctl " ~ to!string(errno));
}
@ -6067,35 +6233,40 @@ class WorkerThread : Thread {
throw new Exception("epoll_wait " ~ to!string(errno));
}
foreach(idx; 0 .. nfds) {
outer: foreach(idx; 0 .. nfds) {
auto flags = events[idx].events;
if(cast(size_t) events[idx].data.ptr == cast(size_t) cancelfd) {
globalStopFlag = true;
//import std.stdio; writeln("exit heard");
break;
} else if(cast(size_t) events[idx].data.ptr == cast(size_t) lcm.listener.handle) {
//import std.stdio; writeln(myThreadNumber, " woken up ", flags);
// this try/catch is because it is set to non-blocking mode
// and Phobos' stupid api throws an exception instead of returning
// if it would block. Why would it block? because a forked process
// might have beat us to it, but the wakeup event thundered our herds.
try
sn = lcm.listener.accept(); // don't need to do the acceptCancelable here since the epoll checks it better
catch(SocketAcceptException e) { continue; }
} else {
foreach(listener; lcm.listeners) {
if(cast(size_t) events[idx].data.ptr == cast(size_t) listener.handle) {
//import std.stdio; writeln(myThreadNumber, " woken up ", flags);
// this try/catch is because it is set to non-blocking mode
// and Phobos' stupid api throws an exception instead of returning
// if it would block. Why would it block? because a forked process
// might have beat us to it, but the wakeup event thundered our herds.
try
sn = listener.accept(); // don't need to do the acceptCancelable here since the epoll checks it better
catch(SocketAcceptException e) { continue; }
cloexec(sn);
if(lcm.tcp) {
// disable Nagle's algorithm to avoid a 40ms delay when we send/recv
// on the socket because we do some buffering internally. I think this helps,
// certainly does for small requests, and I think it does for larger ones too
sn.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1);
cloexec(sn);
if(lcm.tcp) {
// disable Nagle's algorithm to avoid a 40ms delay when we send/recv
// on the socket because we do some buffering internally. I think this helps,
// certainly does for small requests, and I think it does for larger ones too
sn.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1);
sn.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, dur!"seconds"(10));
sn.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, dur!"seconds"(10));
}
dg(sn);
continue outer;
}
}
dg(sn);
} else {
if(cast(size_t) events[idx].data.ptr < 1024) {
throw new Exception("this doesn't look like a fiber pointer...");
}