diff --git a/cgi.d b/cgi.d index 583dcbc..87f3833 100644 --- a/cgi.d +++ b/cgi.d @@ -1963,11 +1963,17 @@ class Cgi { string getCurrentCompleteUri() const { ushort defaultPort = https ? 443 : 80; - return format("http%s://%s%s%s", - https ? "s" : "", - host, - (!port || port == defaultPort) ? "" : ":" ~ to!string(port), - requestUri); + string uri = "http"; + if(https) + uri ~= "s"; + uri ~= "://"; + uri ~= host; + if(!(!port || port == defaultPort)) { + uri ~= ":"; + uri ~= to!string(port); + } + uri ~= requestUri; + return uri; } /// You can override this if your site base url isn't the same as the script name @@ -2116,110 +2122,114 @@ class Cgi { private bool websocketMode; void flushHeaders(const(void)[] t, bool isAll = false) { - string[] hd; - // Flush the headers + StackBuffer buffer = StackBuffer(0); + + prepHeaders(t, isAll, &buffer); + + if(rawDataOutput !is null) + rawDataOutput(cast(const(ubyte)[]) buffer.get()); + else { + stdout.rawWrite(buffer.get()); + } + } + + private void prepHeaders(const(void)[] t, bool isAll, StackBuffer* buffer) { + string terminator = "\n"; + if(rawDataOutput !is null) + terminator = "\r\n"; + if(responseStatus !is null) { if(nph) { if(http10) - hd ~= "HTTP/1.0 " ~ responseStatus; + buffer.add("HTTP/1.0 ", responseStatus, terminator); else - hd ~= "HTTP/1.1 " ~ responseStatus; + buffer.add("HTTP/1.1 ", responseStatus, terminator); } else - hd ~= "Status: " ~ responseStatus; + buffer.add("Status: ", responseStatus, terminator); } else if (nph) { if(http10) - hd ~= "HTTP/1.0 200 OK"; + buffer.add("HTTP/1.0 200 OK", terminator); else - hd ~= "HTTP/1.1 200 OK"; + buffer.add("HTTP/1.1 200 OK", terminator); } if(websocketMode) goto websocket; if(nph) { // we're responsible for setting the date too according to http 1.1 - hd ~= "Date: " ~ printDate(cast(DateTime) Clock.currTime(UTC())); + char[29] db = void; + printDateToBuffer(cast(DateTime) Clock.currTime(UTC()), db[]); + buffer.add("Date: ", db[], terminator); } // FIXME: what if the user wants to set his own content-length? // The custom header function can do it, so maybe that's best. // Or we could reuse the isAll param. if(responseLocation !is null) { - hd ~= "Location: " ~ responseLocation; + buffer.add("Location: ", responseLocation, terminator); } if(!noCache && responseExpires != long.min) { // an explicit expiration date is set if(responseExpiresRelative) { - hd ~= "Cache-Control: "~(responseIsPublic ? "public" : "private")~", max-age="~to!string(responseExpires)~", no-cache=\"set-cookie, set-cookie2\""; + buffer.add("Cache-Control: ", responseIsPublic ? "public" : "private", ", max-age="); + buffer.add(responseExpires); + buffer.add(", no-cache=\"set-cookie, set-cookie2\"", terminator); } else { auto expires = SysTime(unixTimeToStdTime(cast(int)(responseExpires / 1000)), UTC()); - hd ~= "Expires: " ~ printDate( - cast(DateTime) expires); + char[29] db = void; + printDateToBuffer(cast(DateTime) expires, db[]); + buffer.add("Expires: ", db[], terminator); // FIXME: assuming everything is private unless you use nocache - generally right for dynamic pages, but not necessarily - hd ~= "Cache-Control: "~(responseIsPublic ? "public" : "private")~", no-cache=\"set-cookie, set-cookie2\""; + buffer.add("Cache-Control: ", (responseIsPublic ? "public" : "private"), ", no-cache=\"set-cookie, set-cookie2\""); + buffer.add(terminator); } } if(responseCookies !is null && responseCookies.length > 0) { foreach(c; responseCookies) - hd ~= "Set-Cookie: " ~ c; + buffer.add("Set-Cookie: ", c, terminator); } if(noCache) { // we specifically do not want caching (this is actually the default) - hd ~= "Cache-Control: private, no-cache=\"set-cookie\""; - hd ~= "Expires: 0"; - hd ~= "Pragma: no-cache"; + buffer.add("Cache-Control: private, no-cache=\"set-cookie\"", terminator); + buffer.add("Expires: 0", terminator); + buffer.add("Pragma: no-cache", terminator); } else { if(responseExpires == long.min) { // caching was enabled, but without a date set - that means assume cache forever - hd ~= "Cache-Control: public"; - hd ~= "Expires: Tue, 31 Dec 2030 14:00:00 GMT"; // FIXME: should not be more than one year in the future + buffer.add("Cache-Control: public", terminator); + buffer.add("Expires: Tue, 31 Dec 2030 14:00:00 GMT", terminator); // FIXME: should not be more than one year in the future } } if(responseContentType !is null) { - hd ~= "Content-Type: " ~ responseContentType; + buffer.add("Content-Type: ", responseContentType, terminator); } else - hd ~= "Content-Type: text/html; charset=utf-8"; + buffer.add("Content-Type: text/html; charset=utf-8", terminator); if(gzipResponse && acceptsGzip && isAll) { // FIXME: isAll really shouldn't be necessary - hd ~= "Content-Encoding: gzip"; + buffer.add("Content-Encoding: gzip", terminator); } if(!isAll) { if(nph && !http10) { - hd ~= "Transfer-Encoding: chunked"; + buffer.add("Transfer-Encoding: chunked", terminator); responseChunked = true; } } else { - hd ~= "Content-Length: " ~ to!string(t.length); + buffer.add("Content-Length: "); + buffer.add(t.length); + buffer.add(terminator); if(nph && keepAliveRequested) { - hd ~= "Connection: Keep-Alive"; + buffer.add("Connection: Keep-Alive", terminator); } } websocket: - if(customHeaders !is null) - hd ~= customHeaders; + + foreach(hd; customHeaders) + buffer.add(hd, terminator); // FIXME: what about duplicated headers? - foreach(h; hd) { - if(rawDataOutput !is null) - rawDataOutput(cast(const(ubyte)[]) (h ~ "\r\n")); - else { - version(CRuntime_Musl) { - stdout.rawWrite(h); - stdout.rawWrite("\n"); - } else { - writeln(h); - } - } - } - if(rawDataOutput !is null) - rawDataOutput(cast(const(ubyte)[]) ("\r\n")); - else { - version(CRuntime_Musl) { - stdout.rawWrite("\n"); - } else { - writeln(""); - } - } + // end of header indicator + buffer.add(terminator); outputtedResponseData = true; } @@ -2228,6 +2238,8 @@ class Cgi { void write(const(void)[] t, bool isAll = false, bool maybeAutoClose = true) { assert(!closed, "Output has already been closed"); + StackBuffer buffer = StackBuffer(0); + if(gzipResponse && acceptsGzip && isAll) { // FIXME: isAll really shouldn't be necessary // actually gzip the data here @@ -2242,11 +2254,11 @@ class Cgi { } if(!outputtedResponseData && (!autoBuffer || isAll)) { - flushHeaders(t, isAll); + prepHeaders(t, isAll, &buffer); } if(requestMethod != RequestMethod.HEAD && t.length > 0) { - if (autoBuffer) { + if (autoBuffer && !isAll) { outputBuffer ~= cast(ubyte[]) t; } if(!autoBuffer || isAll) { @@ -2255,20 +2267,22 @@ class Cgi { //rawDataOutput(makeChunk(cast(const(ubyte)[]) t)); // we're making the chunk here instead of in a function // to avoid unneeded gc pressure - rawDataOutput(cast(const(ubyte)[]) toHex(t.length)); - rawDataOutput(cast(const(ubyte)[]) "\r\n"); - rawDataOutput(cast(const(ubyte)[]) t); - rawDataOutput(cast(const(ubyte)[]) "\r\n"); - - + buffer.add(toHex(t.length)); + buffer.add("\r\n"); + buffer.add(cast(char[]) t, "\r\n"); } else { - rawDataOutput(cast(const(ubyte)[]) t); + buffer.add(cast(char[]) t); } else - stdout.rawWrite(t); + buffer.add(cast(char[]) t); } } + if(rawDataOutput !is null) + rawDataOutput(cast(const(ubyte)[]) buffer.get()); + else + stdout.rawWrite(buffer.get()); + if(maybeAutoClose && isAll) close(); // if you say it is all, that means we're definitely done // maybeAutoClose can be false though to avoid this (important if you call from inside close()! @@ -3205,7 +3219,8 @@ mixin template CustomCgiMain(CustomCgi, alias fun, long maxContentLength = defau version(embedded_httpd_processes) int processPoolSize = 8; -void cgiMainImpl(alias fun, CustomCgi = Cgi, long maxContentLength = defaultMaxContentLength)(string[] args) if(is(CustomCgi : Cgi)) { +// Returns true if run. You should exit the program after that. +bool tryAddonServers(string[] args) { if(args.length > 1) { // run the special separate processes if needed switch(args[1]) { @@ -3214,39 +3229,43 @@ void cgiMainImpl(alias fun, CustomCgi = Cgi, long maxContentLength = defaultMaxC runWebsocketServer(); else printf("Add-on servers not compiled in.\n"); - return; + return true; case "--session-server": version(with_addon_servers) runSessionServer(); else printf("Add-on servers not compiled in.\n"); - return; + return true; case "--event-server": version(with_addon_servers) runEventServer(); else printf("Add-on servers not compiled in.\n"); - return; + return true; case "--timer-server": version(with_addon_servers) runTimerServer(); else printf("Add-on servers not compiled in.\n"); - return; + return true; case "--timed-jobs": import core.demangle; version(with_addon_servers_connections) foreach(k, v; scheduledJobHandlers) writeln(k, "\t", demangle(k)); - return; + return true; case "--timed-job": scheduledJobHandlers[args[2]](args[3 .. $]); - return; + return true; default: // intentionally blank - do nothing and carry on to run normally } } + return false; +} +/// Tries to simulate a request from the command line. Returns true if it does, false if it didn't find the args. +bool trySimulatedRequest(alias fun, CustomCgi = Cgi)(string[] args) if(is(CustomCgi : Cgi)) { // we support command line thing for easy testing everywhere // it needs to be called ./app method uri [other args...] if(args.length >= 3 && isCgiRequestMethod(args[1])) { @@ -3254,424 +3273,600 @@ void cgiMainImpl(alias fun, CustomCgi = Cgi, long maxContentLength = defaultMaxC scope(exit) cgi.dispose(); fun(cgi); cgi.close(); - return; + return true; + } + return false; +} + +/++ + A server control and configuration struct, as a potential alternative to calling [GenericMain] or [cgiMainImpl]. See the source of [cgiMainImpl] to an example of how you can use it. + + History: + Added Sept 26, 2020 (release version 8.5). ++/ +struct RequestServer { + /// + string listeningHost; + /// + ushort listeningPort = defaultListeningPort(); + + /// + this(ushort defaultPort) { + listeningPort = defaultPort; } - - ushort listeningPort(ushort def) { - bool found = false; + /// Reads the args into the other values. + void configureFromCommandLine(string[] args) { + bool foundPort = false; + bool foundHost = false; foreach(arg; args) { - if(found) - return to!ushort(arg); - if(arg == "--port" || arg == "-p" || arg == "/port" || arg == "--listening-port") - found = true; - } - return def; - } - - string listeningHost() { - bool found = false; - foreach(arg; args) { - if(found) - return arg; + if(foundPort) { + listeningPort = to!ushort(arg); + foundPort = false; + } + if(foundHost) { + listeningHost = arg; + foundHost = false; + } if(arg == "--listening-host" || arg == "-h" || arg == "/listening-host") - found = true; + foundHost = true; + else if(arg == "--port" || arg == "-p" || arg == "/port" || arg == "--listening-port") + foundPort = true; } - return ""; } - version(netman_httpd) { - import arsd.httpd; - // what about forwarding the other constructor args? - // this probably needs a whole redoing... - serveHttp!CustomCgi(&fun, listeningPort(8080));//5005); - return; - } else - version(embedded_httpd_processes) { - import core.sys.posix.unistd; - import core.sys.posix.sys.socket; - import core.sys.posix.netinet.in_; - //import std.c.linux.socket; - int sock = socket(AF_INET, SOCK_STREAM, 0); - if(sock == -1) - throw new Exception("socket"); + /++ + Starts serving requests according to the current configuration. + +/ + void serve(alias fun, CustomCgi = Cgi, long maxContentLength = defaultMaxContentLength)() { + version(netman_httpd) { + // Obsolete! - cloexec(sock); + import arsd.httpd; + // what about forwarding the other constructor args? + // this probably needs a whole redoing... + serveHttp!CustomCgi(&fun, listeningPort);//5005); + return; + } else + version(embedded_httpd_processes) { + serveEmbeddedHttpdProcesses!(fun, CustomCgi)(this); + } else + version(embedded_httpd_threads) { + auto manager = new ListeningConnectionManager(listeningHost, listeningPort, &doThreadHttpConnection!(CustomCgi, fun)); + manager.listen(); + } else + version(scgi) { + auto manager = new ListeningConnectionManager(listeningHost, listeningPort, &doThreadScgiConnection!(CustomCgi, fun, maxContentLength)); + manager.listen(); + } else + version(fastcgi) { + serveFastCgi!(fun, CustomCgi, maxContentLength)(this); + } else { + //version=plain_cgi; + handleCgiRequest!(fun, CustomCgi, maxContentLength)(); + } + } + + void stop() { + + } +} + +version(embedded_httpd_processes) +void serveEmbeddedHttpdProcesses(alias fun, CustomCgi = Cgi)(RequestServer params) { + import core.sys.posix.unistd; + import core.sys.posix.sys.socket; + import core.sys.posix.netinet.in_; + //import std.c.linux.socket; + + int sock = socket(AF_INET, SOCK_STREAM, 0); + if(sock == -1) + throw new Exception("socket"); + + cloexec(sock); + + { + + sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons(params.listeningPort); + auto lh = params.listeningHost; + if(lh.length) { + if(inet_pton(AF_INET, lh.toStringz(), &addr.sin_addr.s_addr) != 1) + throw new Exception("bad listening host given, please use an IP address.\nExample: --listening-host 127.0.0.1 means listen only on Localhost.\nExample: --listening-host 0.0.0.0 means listen on all interfaces.\nOr you can pass any other single numeric IPv4 address."); + } else + addr.sin_addr.s_addr = INADDR_ANY; + + // HACKISH + int on = 1; + setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &on, on.sizeof); + // end hack + + + if(bind(sock, cast(sockaddr*) &addr, addr.sizeof) == -1) { + close(sock); + throw new Exception("bind"); + } + + // FIXME: if this queue is full, it will just ignore it + // and wait for the client to retransmit it. This is an + // obnoxious timeout condition there. + if(sock.listen(128) == -1) { + close(sock); + throw new Exception("listen"); + } + } + + version(embedded_httpd_processes_accept_after_fork) {} else { + int pipeReadFd; + int pipeWriteFd; { - sockaddr_in addr; - addr.sin_family = AF_INET; - addr.sin_port = htons(listeningPort(8085)); - auto lh = listeningHost(); - if(lh.length) { - if(inet_pton(AF_INET, lh.toStringz(), &addr.sin_addr.s_addr) != 1) - throw new Exception("bad listening host given, please use an IP address.\nExample: --listening-host 127.0.0.1 means listen only on Localhost.\nExample: --listening-host 0.0.0.0 means listen on all interfaces.\nOr you can pass any other single numeric IPv4 address."); - } else - addr.sin_addr.s_addr = INADDR_ANY; - - // HACKISH - int on = 1; - setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &on, on.sizeof); - // end hack - - - if(bind(sock, cast(sockaddr*) &addr, addr.sizeof) == -1) { - close(sock); - throw new Exception("bind"); + int[2] pipeFd; + if(socketpair(AF_UNIX, SOCK_DGRAM, 0, pipeFd)) { + import core.stdc.errno; + throw new Exception("pipe failed " ~ to!string(errno)); } - // FIXME: if this queue is full, it will just ignore it - // and wait for the client to retransmit it. This is an - // obnoxious timeout condition there. - if(sock.listen(128) == -1) { - close(sock); - throw new Exception("listen"); - } + pipeReadFd = pipeFd[0]; + pipeWriteFd = pipeFd[1]; } + } - version(embedded_httpd_processes_accept_after_fork) {} else { - int pipeReadFd; - int pipeWriteFd; - { - int[2] pipeFd; - if(socketpair(AF_UNIX, SOCK_DGRAM, 0, pipeFd)) { - import core.stdc.errno; - throw new Exception("pipe failed " ~ to!string(errno)); + int processCount; + pid_t newPid; + reopen: + while(processCount < processPoolSize) { + newPid = fork(); + if(newPid == 0) { + // start serving on the socket + //ubyte[4096] backingBuffer; + for(;;) { + bool closeConnection; + uint i; + sockaddr addr; + i = addr.sizeof; + version(embedded_httpd_processes_accept_after_fork) { + int s = accept(sock, &addr, &i); + int opt = 1; + import core.sys.posix.netinet.tcp; + // the Cgi class does internal buffering, so disabling this + // helps with latency in many cases... + setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &opt, opt.sizeof); + cloexec(s); + } else { + int s; + auto readret = read_fd(pipeReadFd, &s, s.sizeof, &s); + if(readret != s.sizeof) { + import core.stdc.errno; + throw new Exception("pipe read failed " ~ to!string(errno)); + } + + //writeln("process ", getpid(), " got socket ", s); } - pipeReadFd = pipeFd[0]; - pipeWriteFd = pipeFd[1]; + try { + + if(s == -1) + throw new Exception("accept"); + + scope(failure) close(s); + //ubyte[__traits(classInstanceSize, BufferedInputRange)] bufferedRangeContainer; + auto ir = new BufferedInputRange(s); + //auto ir = emplace!BufferedInputRange(bufferedRangeContainer, s, backingBuffer); + + while(!ir.empty) { + //ubyte[__traits(classInstanceSize, CustomCgi)] cgiContainer; + + Cgi cgi; + try { + cgi = new CustomCgi(ir, &closeConnection); + cgi._outputFileHandle = s; + // if we have a single process and the browser tries to leave the connection open while concurrently requesting another, it will block everything an deadlock since there's no other server to accept it. By closing after each request in this situation, it tells the browser to serialize for us. + if(processPoolSize <= 1) + closeConnection = true; + //cgi = emplace!CustomCgi(cgiContainer, ir, &closeConnection); + } catch(Throwable t) { + // a construction error is either bad code or bad request; bad request is what it should be since this is bug free :P + // anyway let's kill the connection + version(CRuntime_Musl) { + // LockingTextWriter fails here + // so working around it + auto estr = t.toString(); + stderr.rawWrite(estr); + stderr.rawWrite("\n"); + } else + stderr.writeln(t.toString()); + sendAll(ir.source, plainHttpError(false, "400 Bad Request", t)); + closeConnection = true; + break; + } + assert(cgi !is null); + scope(exit) + cgi.dispose(); + + try { + fun(cgi); + cgi.close(); + if(cgi.websocketMode) + closeConnection = true; + } catch(ConnectionException ce) { + closeConnection = true; + } catch(Throwable t) { + // a processing error can be recovered from + version(CRuntime_Musl) { + // LockingTextWriter fails here + // so working around it + auto estr = t.toString(); + stderr.rawWrite(estr); + } else { + stderr.writeln(t.toString); + } + if(!handleException(cgi, t)) + closeConnection = true; + } + + if(closeConnection) { + ir.source.close(); + break; + } else { + if(!ir.empty) + ir.popFront(); // get the next + else if(ir.sourceClosed) { + ir.source.close(); + } + } + } + + ir.source.close(); + } catch(Throwable t) { + version(CRuntime_Musl) {} else + debug writeln(t); + // most likely cause is a timeout + } } + } else { + processCount++; } + } + // the parent should wait for its children... + if(newPid) { + import core.sys.posix.sys.wait; - int processCount; - pid_t newPid; - reopen: - while(processCount < processPoolSize) { - newPid = fork(); - if(newPid == 0) { - // start serving on the socket - //ubyte[4096] backingBuffer; - for(;;) { - bool closeConnection; + version(embedded_httpd_processes_accept_after_fork) {} else { + import core.sys.posix.sys.select; + int[] fdQueue; + while(true) { + // writeln("select call"); + int nfds = pipeWriteFd; + if(sock > pipeWriteFd) + nfds = sock; + nfds += 1; + fd_set read_fds; + fd_set write_fds; + FD_ZERO(&read_fds); + FD_ZERO(&write_fds); + FD_SET(sock, &read_fds); + if(fdQueue.length) + FD_SET(pipeWriteFd, &write_fds); + auto ret = select(nfds, &read_fds, &write_fds, null, null); + if(ret == -1) { + import core.stdc.errno; + if(errno == EINTR) + goto try_wait; + else + throw new Exception("wtf select"); + } + + int s = -1; + if(FD_ISSET(sock, &read_fds)) { uint i; sockaddr addr; i = addr.sizeof; - version(embedded_httpd_processes_accept_after_fork) { - int s = accept(sock, &addr, &i); - int opt = 1; - import core.sys.posix.netinet.tcp; - // the Cgi class does internal buffering, so disabling this - // helps with latency in many cases... - setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &opt, opt.sizeof); - cloexec(s); - } else { - int s; - auto readret = read_fd(pipeReadFd, &s, s.sizeof, &s); - if(readret != s.sizeof) { - import core.stdc.errno; - throw new Exception("pipe read failed " ~ to!string(errno)); - } - - //writeln("process ", getpid(), " got socket ", s); - } - - try { - - if(s == -1) - throw new Exception("accept"); - - scope(failure) close(s); - //ubyte[__traits(classInstanceSize, BufferedInputRange)] bufferedRangeContainer; - auto ir = new BufferedInputRange(s); - //auto ir = emplace!BufferedInputRange(bufferedRangeContainer, s, backingBuffer); - - while(!ir.empty) { - ubyte[__traits(classInstanceSize, CustomCgi)] cgiContainer; - - Cgi cgi; - try { - cgi = new CustomCgi(ir, &closeConnection); - cgi._outputFileHandle = s; - // if we have a single process and the browser tries to leave the connection open while concurrently requesting another, it will block everything an deadlock since there's no other server to accept it. By closing after each request in this situation, it tells the browser to serialize for us. - if(processPoolSize <= 1) - closeConnection = true; - //cgi = emplace!CustomCgi(cgiContainer, ir, &closeConnection); - } catch(Throwable t) { - // a construction error is either bad code or bad request; bad request is what it should be since this is bug free :P - // anyway let's kill the connection - version(CRuntime_Musl) { - // LockingTextWriter fails here - // so working around it - auto estr = t.toString(); - stderr.rawWrite(estr); - stderr.rawWrite("\n"); - } else - stderr.writeln(t.toString()); - sendAll(ir.source, plainHttpError(false, "400 Bad Request", t)); - closeConnection = true; - break; - } - assert(cgi !is null); - scope(exit) - cgi.dispose(); - - try { - fun(cgi); - cgi.close(); - if(cgi.websocketMode) - closeConnection = true; - } catch(ConnectionException ce) { - closeConnection = true; - } catch(Throwable t) { - // a processing error can be recovered from - version(CRuntime_Musl) { - // LockingTextWriter fails here - // so working around it - auto estr = t.toString(); - stderr.rawWrite(estr); - } else { - stderr.writeln(t.toString); - } - if(!handleException(cgi, t)) - closeConnection = true; - } - - if(closeConnection) { - ir.source.close(); - break; - } else { - if(!ir.empty) - ir.popFront(); // get the next - else if(ir.sourceClosed) { - ir.source.close(); - } - } - } - - ir.source.close(); - } catch(Throwable t) { - version(CRuntime_Musl) {} else - debug writeln(t); - // most likely cause is a timeout - } + s = accept(sock, &addr, &i); + cloexec(s); + import core.sys.posix.netinet.tcp; + int opt = 1; + setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &opt, opt.sizeof); } - } else { - processCount++; + + if(FD_ISSET(pipeWriteFd, &write_fds)) { + if(s == -1 && fdQueue.length) { + s = fdQueue[0]; + fdQueue = fdQueue[1 .. $]; // FIXME reuse buffer + } + write_fd(pipeWriteFd, &s, s.sizeof, s); + close(s); // we are done with it, let the other process take ownership + } else + fdQueue ~= s; } } - // the parent should wait for its children... - if(newPid) { - import core.sys.posix.sys.wait; + try_wait: - version(embedded_httpd_processes_accept_after_fork) {} else { - import core.sys.posix.sys.select; - int[] fdQueue; - while(true) { - // writeln("select call"); - int nfds = pipeWriteFd; - if(sock > pipeWriteFd) - nfds = sock; - nfds += 1; - fd_set read_fds; - fd_set write_fds; - FD_ZERO(&read_fds); - FD_ZERO(&write_fds); - FD_SET(sock, &read_fds); - if(fdQueue.length) - FD_SET(pipeWriteFd, &write_fds); - auto ret = select(nfds, &read_fds, &write_fds, null, null); - if(ret == -1) { - import core.stdc.errno; - if(errno == EINTR) - goto try_wait; - else - throw new Exception("wtf select"); - } - - int s = -1; - if(FD_ISSET(sock, &read_fds)) { - uint i; - sockaddr addr; - i = addr.sizeof; - s = accept(sock, &addr, &i); - cloexec(s); - import core.sys.posix.netinet.tcp; - int opt = 1; - setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &opt, opt.sizeof); - } - - if(FD_ISSET(pipeWriteFd, &write_fds)) { - if(s == -1 && fdQueue.length) { - s = fdQueue[0]; - fdQueue = fdQueue[1 .. $]; // FIXME reuse buffer - } - write_fd(pipeWriteFd, &s, s.sizeof, s); - close(s); // we are done with it, let the other process take ownership - } else - fdQueue ~= s; - } - } - - try_wait: - - int status; - while(-1 != wait(&status)) { - version(CRuntime_Musl) {} else { + int status; + while(-1 != wait(&status)) { + version(CRuntime_Musl) {} else { import std.stdio; writeln("Process died ", status); - } - processCount--; - goto reopen; } - close(sock); + processCount--; + goto reopen; } - } else - version(embedded_httpd_threads) { - auto manager = new ListeningConnectionManager(listeningHost(), listeningPort(8085), &doThreadHttpConnection!(CustomCgi, fun)); - manager.listen(); - } else - version(scgi) { - import std.exception; - import al = std.algorithm; - auto manager = new ListeningConnectionManager(listeningHost(), listeningPort(4000), &doThreadScgiConnection!(CustomCgi, fun, maxContentLength)); - manager.listen(); - } else - version(fastcgi) { - // SetHandler fcgid-script - FCGX_Stream* input, output, error; - FCGX_ParamArray env; + close(sock); + } +} + +version(fastcgi) +void serveFastCgi(alias fun, CustomCgi = Cgi, long maxContentLength = defaultMaxContentLength)(RequestServer params) { + // SetHandler fcgid-script + FCGX_Stream* input, output, error; + FCGX_ParamArray env; - const(ubyte)[] getFcgiChunk() { - const(ubyte)[] ret; - while(FCGX_HasSeenEOF(input) != -1) - ret ~= cast(ubyte) FCGX_GetChar(input); - return ret; + const(ubyte)[] getFcgiChunk() { + const(ubyte)[] ret; + while(FCGX_HasSeenEOF(input) != -1) + ret ~= cast(ubyte) FCGX_GetChar(input); + return ret; + } + + void writeFcgi(const(ubyte)[] data) { + FCGX_PutStr(data.ptr, data.length, output); + } + + void doARequest() { + string[string] fcgienv; + + for(auto e = env; e !is null && *e !is null; e++) { + string cur = to!string(*e); + auto idx = cur.indexOf("="); + string name, value; + if(idx == -1) + name = cur; + else { + name = cur[0 .. idx]; + value = cur[idx + 1 .. $]; + } + + fcgienv[name] = value; } - void writeFcgi(const(ubyte)[] data) { - FCGX_PutStr(data.ptr, data.length, output); + void flushFcgi() { + FCGX_FFlush(output); } - void doARequest() { - string[string] fcgienv; - - for(auto e = env; e !is null && *e !is null; e++) { - string cur = to!string(*e); - auto idx = cur.indexOf("="); - string name, value; - if(idx == -1) - name = cur; - else { - name = cur[0 .. idx]; - value = cur[idx + 1 .. $]; - } - - fcgienv[name] = value; - } - - void flushFcgi() { - FCGX_FFlush(output); - } - - Cgi cgi; - try { - cgi = new CustomCgi(maxContentLength, fcgienv, &getFcgiChunk, &writeFcgi, &flushFcgi); - } catch(Throwable t) { - FCGX_PutStr(cast(ubyte*) t.msg.ptr, t.msg.length, error); - writeFcgi(cast(const(ubyte)[]) plainHttpError(true, "400 Bad Request", t)); - return; //continue; - } - assert(cgi !is null); - scope(exit) cgi.dispose(); - try { - fun(cgi); - cgi.close(); - } catch(Throwable t) { - // log it to the error stream - FCGX_PutStr(cast(ubyte*) t.msg.ptr, t.msg.length, error); - // handle it for the user, if we can - if(!handleException(cgi, t)) - return; // continue; - } - } - - auto lp = listeningPort(0); - FCGX_Request request; - if(lp) { - // if a listening port was specified on the command line, we want to spawn ourself - // (needed for nginx without spawn-fcgi, e.g. on Windows) - FCGX_Init(); - auto sock = FCGX_OpenSocket(toStringz(listeningHost() ~ ":" ~ to!string(lp)), 12); - if(sock < 0) - throw new Exception("Couldn't listen on the port"); - FCGX_InitRequest(&request, sock, 0); - while(FCGX_Accept_r(&request) >= 0) { - input = request.inStream; - output = request.outStream; - error = request.errStream; - env = request.envp; - doARequest(); - } - } else { - // otherwise, assume the httpd is doing it (the case for Apache, IIS, and Lighttpd) - // using the version with a global variable since we are separate processes anyway - while(FCGX_Accept(&input, &output, &error, &env) >= 0) { - doARequest(); - } - } - } else { - // standard CGI is the default version Cgi cgi; try { - cgi = new CustomCgi(maxContentLength); - version(Posix) - cgi._outputFileHandle = 1; // stdout - else version(Windows) - cgi._outputFileHandle = GetStdHandle(STD_OUTPUT_HANDLE); - else static assert(0); + cgi = new CustomCgi(maxContentLength, fcgienv, &getFcgiChunk, &writeFcgi, &flushFcgi); } catch(Throwable t) { - version(CRuntime_Musl) { - // LockingTextWriter fails here - // so working around it - auto s = t.toString(); - stderr.rawWrite(s); - stdout.rawWrite(plainHttpError(true, "400 Bad Request", t)); - } else { - stderr.writeln(t.msg); - // the real http server will probably handle this; - // most likely, this is a bug in Cgi. But, oh well. - stdout.write(plainHttpError(true, "400 Bad Request", t)); - } - return; + FCGX_PutStr(cast(ubyte*) t.msg.ptr, t.msg.length, error); + writeFcgi(cast(const(ubyte)[]) plainHttpError(true, "400 Bad Request", t)); + return; //continue; } assert(cgi !is null); scope(exit) cgi.dispose(); - try { fun(cgi); cgi.close(); - } catch (Throwable t) { - version(CRuntime_Musl) { - // LockingTextWriter fails here - // so working around it - auto s = t.msg; - stderr.rawWrite(s); - } else { - stderr.writeln(t.msg); - } + } catch(Throwable t) { + // log it to the error stream + FCGX_PutStr(cast(ubyte*) t.msg.ptr, t.msg.length, error); + // handle it for the user, if we can if(!handleException(cgi, t)) - return; + return; // continue; + } + } + + auto lp = params.listeningPort; + + FCGX_Request request; + if(lp) { + // if a listening port was specified on the command line, we want to spawn ourself + // (needed for nginx without spawn-fcgi, e.g. on Windows) + FCGX_Init(); + auto sock = FCGX_OpenSocket(toStringz(params.listeningHost ~ ":" ~ to!string(lp)), 12); + if(sock < 0) + throw new Exception("Couldn't listen on the port"); + FCGX_InitRequest(&request, sock, 0); + while(FCGX_Accept_r(&request) >= 0) { + input = request.inStream; + output = request.outStream; + error = request.errStream; + env = request.envp; + doARequest(); + } + } else { + // otherwise, assume the httpd is doing it (the case for Apache, IIS, and Lighttpd) + // using the version with a global variable since we are separate processes anyway + while(FCGX_Accept(&input, &output, &error, &env) >= 0) { + doARequest(); + } + } +} + +/// Returns the default listening port for the current cgi configuration. 8085 for embedded httpd, 4000 for scgi, irrelevant for others. +ushort defaultListeningPort() { + version(netman_httpd) + return 8080; + else version(embedded_httpd_processes) + return 8085; + else version(embedded_httpd_threads) + return 8085; + else version(scgi) + return 4000; + else + return 0; +} + +/++ + This is the function [GenericMain] calls. View its source for some simple boilerplate you can copy/paste and modify, or you can call it yourself from your `main`. + + Params: + fun = Your request handler + CustomCgi = a subclass of Cgi, if you wise to customize it further + maxContentLength = max POST size you want to allow + args = command-line arguments + + History: + Documented Sept 26, 2020. ++/ +void cgiMainImpl(alias fun, CustomCgi = Cgi, long maxContentLength = defaultMaxContentLength)(string[] args) if(is(CustomCgi : Cgi)) { + if(tryAddonServers(args)) + return; + + if(trySimulatedRequest!(fun, CustomCgi)(args)) + return; + + RequestServer server; + // you can change the port here if you like + // server.listeningPort = 9000; + + // then call this to let the command line args override your default + server.configureFromCommandLine(args); + + // and serve the request(s). + server.serve!(fun, CustomCgi, maxContentLength)(); +} + +//version(plain_cgi) +void handleCgiRequest(alias fun, CustomCgi = Cgi, long maxContentLength = defaultMaxContentLength)() { + // standard CGI is the default version + Cgi cgi; + try { + cgi = new CustomCgi(maxContentLength); + version(Posix) + cgi._outputFileHandle = 1; // stdout + else version(Windows) + cgi._outputFileHandle = GetStdHandle(STD_OUTPUT_HANDLE); + else static assert(0); + } catch(Throwable t) { + version(CRuntime_Musl) { + // LockingTextWriter fails here + // so working around it + auto s = t.toString(); + stderr.rawWrite(s); + stdout.rawWrite(plainHttpError(true, "400 Bad Request", t)); + } else { + stderr.writeln(t.msg); + // the real http server will probably handle this; + // most likely, this is a bug in Cgi. But, oh well. + stdout.write(plainHttpError(true, "400 Bad Request", t)); + } + return; + } + assert(cgi !is null); + scope(exit) cgi.dispose(); + + try { + fun(cgi); + cgi.close(); + } catch (Throwable t) { + version(CRuntime_Musl) { + // LockingTextWriter fails here + // so working around it + auto s = t.msg; + stderr.rawWrite(s); + } else { + stderr.writeln(t.msg); + } + if(!handleException(cgi, t)) + return; + } +} + +/+ + The event loop for embedded_httpd_threads will prolly fiber dispatch + cgi constructors too, so slow posts will not monopolize a worker thread. + + May want to provide the worker task system just need to ensure all the fibers + has a big enough stack for real work... would also ideally like to reuse them. + + + So prolly bir would switch it to nonblocking. If it would block, it epoll + registers one shot with this existing fiber to take it over. + + new connection comes in. it picks a fiber off the free list, + or if there is none, it creates a new one. this fiber handles + this connection the whole time. + + epoll triggers the fiber when something comes in. it is called by + a random worker thread, it might change at any time. at least during + the constructor. maybe into the main body it will stay tied to a thread + just so TLS stuff doesn't randomly change in the middle. but I could + specify if you yield all bets are off. + + when the request is finished, if there's more data buffered, it just + keeps going. if there is no more data buffered, it epoll ctls to + get triggered when more data comes in. all one shot. + + when a connection is closed, the fiber returns and is then reset + and added to the free list. if the free list is full, the fiber is + just freed, this means it will balloon to a certain size but not generally + grow beyond that unless the activity keeps going. + + 256 KB stack i thnk per fiber. 4,000 active fibers per gigabyte of memory. + + So the fiber has its own magic methods to read and write. if they would block, it registers + for epoll and yields. when it returns, it read/writes and then returns back normal control. + + basically you issue the command and it tells you when it is done + + it needs to DEL the epoll thing when it is closed. add it when opened. mod it when anther thing issued + ++/ + +version(cgi_use_fiber) +class CgiFiber : Fiber { + this(void function(Socket) handler) { + this.handler = handler; + // FIXME: stack size + super(&run); + } + + Socket connection; + void function(Socket) handler; + + void run() { + handler(connection); + } + + void delegate() postYield; + + private void setPostYield(scope void delegate() py) @nogc { + postYield = cast(void delegate()) py; + } + + void proceed() { + call(); + auto py = postYield; + postYield = null; + if(py !is null) + py(); + if(state == State.TERM) { + import core.memory; + GC.removeRoot(cast(void*) this); } } } version(embedded_httpd_threads) void doThreadHttpConnection(CustomCgi, alias fun)(Socket connection) { + version(cgi_use_fiber) { + auto fiber = new CgiFiber(&doThreadHttpConnectionGuts!(CustomCgi, fun)); + import core.memory; + GC.addRoot(cast(void*) fiber); + fiber.connection = connection; + fiber.proceed(); + } else { + doThreadHttpConnectionGuts!(CustomCgi, fun)(connection); + } +} + +version(embedded_httpd_threads) +void doThreadHttpConnectionGuts(CustomCgi, alias fun)(Socket connection) { scope(failure) { // catch all for other errors sendAll(connection, plainHttpError(false, "500 Internal Server Error", null)); @@ -3679,6 +3874,17 @@ void doThreadHttpConnection(CustomCgi, alias fun)(Socket connection) { } bool closeConnection; + + /+ + ubyte[4096] inputBuffer = void; + ubyte[__traits(classInstanceSize, BufferedInputRange)] birBuffer = void; + ubyte[__traits(classInstanceSize, CustomCgi)] cgiBuffer = void; + + birBuffer[] = cast(ubyte[]) typeid(BufferedInputRange).initializer()[]; + BufferedInputRange ir = cast(BufferedInputRange) cast(void*) birBuffer.ptr; + ir.__ctor(connection, inputBuffer[], true); + +/ + auto ir = new BufferedInputRange(connection); while(!ir.empty) { @@ -3734,12 +3940,14 @@ void doThreadHttpConnection(CustomCgi, alias fun)(Socket connection) { if(closeConnection) { connection.close(); + ir.dispose(); break; } else { if(ir.front.length) { ir.popFront(); // we can't just discard the buffer, so get the next bit and keep chugging along } else if(ir.sourceClosed) { ir.source.close(); + ir.dispose(); } else { continue; // break; // this was for a keepalive experiment @@ -3747,8 +3955,10 @@ void doThreadHttpConnection(CustomCgi, alias fun)(Socket connection) { } } - if(closeConnection) + if(closeConnection) { connection.close(); + ir.dispose(); + } // I am otherwise NOT closing it here because the parent thread might still be able to make use of the keep-alive connection! } @@ -3853,15 +4063,47 @@ void doThreadScgiConnection(CustomCgi, alias fun, long maxContentLength)(Socket } string printDate(DateTime date) { - return format( - "%.3s, %02d %.3s %d %02d:%02d:%02d GMT", // could be UTC too - to!string(date.dayOfWeek).capitalize, - date.day, - to!string(date.month).capitalize, - date.year, - date.hour, - date.minute, - date.second); + char[29] buffer = void; + printDateToBuffer(date, buffer[]); + return buffer.idup; +} + +int printDateToBuffer(DateTime date, char[] buffer) @nogc { + assert(buffer.length >= 29); + // 29 static length ? + + static immutable daysOfWeek = [ + "Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat" + ]; + + static immutable months = [ + null, "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" + ]; + + buffer[0 .. 3] = daysOfWeek[date.dayOfWeek]; + buffer[3 .. 5] = ", "; + buffer[5] = date.day / 10 + '0'; + buffer[6] = date.day % 10 + '0'; + buffer[7] = ' '; + buffer[8 .. 11] = months[date.month]; + buffer[11] = ' '; + auto y = date.year; + buffer[12] = cast(char) (y / 1000 + '0'); y %= 1000; + buffer[13] = cast(char) (y / 100 + '0'); y %= 100; + buffer[14] = cast(char) (y / 10 + '0'); y %= 10; + buffer[15] = cast(char) (y + '0'); + buffer[16] = ' '; + buffer[17] = date.hour / 10 + '0'; + buffer[18] = date.hour % 10 + '0'; + buffer[19] = ':'; + buffer[20] = date.minute / 10 + '0'; + buffer[21] = date.minute % 10 + '0'; + buffer[22] = ':'; + buffer[23] = date.second / 10 + '0'; + buffer[24] = date.second % 10 + '0'; + buffer[25 .. $] = " GMT"; + + return 29; } @@ -3952,6 +4194,54 @@ version(fastcgi) { import std.socket; +version(cgi_use_fiber) { + import core.thread; + import core.sys.linux.epoll; + + __gshared int epfd; +} + + +version(cgi_use_fiber) +private enum WakeupEvent { + Read = EPOLLIN, + Write = EPOLLOUT +} + +version(cgi_use_fiber) +private void registerEventWakeup(bool* registered, Socket source, WakeupEvent e) @nogc { + + // static cast since I know what i have in here and don't want to pay for dynamic cast + auto f = cast(CgiFiber) cast(void*) Fiber.getThis(); + + f.setPostYield = () { + if(*registered) { + // rearm + epoll_event evt; + evt.events = e | EPOLLONESHOT; + evt.data.ptr = cast(void*) f; + if(epoll_ctl(epfd, EPOLL_CTL_MOD, source.handle, &evt) == -1) + throw new Exception("epoll_ctl"); + } else { + // initial registration + epoll_event evt; + evt.events = e | EPOLLONESHOT; + evt.data.ptr = cast(void*) f; + if(epoll_ctl(epfd, EPOLL_CTL_ADD, source.handle, &evt) == -1) + throw new Exception("epoll_ctl"); + *registered = true ; + } + }; + + Fiber.yield(); +} + +version(cgi_use_fiber) +void unregisterSource(Socket s) { + epoll_event evt; + epoll_ctl(epfd, EPOLL_CTL_DEL, s.handle(), &evt); +} + // it is a class primarily for reference semantics // I might change this interface /// This is NOT ACTUALLY an input range! It is too different. Historical mistake kinda. @@ -3961,16 +4251,21 @@ class BufferedInputRange { this(new Socket(cast(socket_t) source, AddressFamily.INET), buffer); } - this(Socket source, ubyte[] buffer = null) { + this(Socket source, ubyte[] buffer = null, bool allowGrowth = true) { // if they connect but never send stuff to us, we don't want it wasting the process // so setting a time out - source.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, dur!"seconds"(3)); + version(cgi_use_fiber) + source.blocking = false; + else + source.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, dur!"seconds"(3)); + this.source = source; if(buffer is null) { underlyingBuffer = new ubyte[4096]; - allowGrowth = true; + this.allowGrowth = true; } else { underlyingBuffer = buffer; + this.allowGrowth = allowGrowth; } assert(underlyingBuffer.length); @@ -3981,6 +4276,17 @@ class BufferedInputRange { popFront(); // prime } + version(cgi_use_fiber) { + bool registered; + } + + void dispose() { + version(cgi_use_fiber) { + if(registered) + unregisterSource(source); + } + } + /** A slight difference from regular ranges is you can give it the maximum number of bytes to consume. @@ -4022,15 +4328,24 @@ class BufferedInputRange { auto ret = source.receive(freeSpace); if(ret == Socket.ERROR) { if(wouldHaveBlocked()) { - // gonna treat a timeout here as a close - sourceClosed = true; - return; + version(cgi_use_fiber) { + registerEventWakeup(®istered, source, WakeupEvent.Read); + goto try_again; + } else { + // gonna treat a timeout here as a close + sourceClosed = true; + return; + } } version(Posix) { import core.stdc.errno; if(errno == EINTR || errno == EAGAIN) { goto try_again; } + if(errno == ECONNRESET) { + sourceClosed = true; + return; + } } throw new Exception(lastSocketError); // FIXME } @@ -4041,6 +4356,7 @@ class BufferedInputRange { //import std.stdio; writeln(view.ptr); writeln(underlyingBuffer.ptr); writeln(view.length, " ", ret, " = ", view.length + ret); view = underlyingBuffer[view.ptr - underlyingBuffer.ptr .. view.length + ret]; + //import std.stdio; writeln(cast(string) view); } while(view.length < minBytesToSettleFor); } @@ -4113,6 +4429,11 @@ class ListeningConnectionManager { running = true; shared(int) loopBroken; + version(Posix) { + import core.sys.posix.signal; + signal(SIGPIPE, SIG_IGN); + } + version(cgi_no_threads) { // NEVER USE THIS // it exists only for debugging and other special occasions @@ -4132,36 +4453,73 @@ class ListeningConnectionManager { } else { semaphore = new Semaphore(); - ConnectionThread[16] threads; - foreach(i, ref thread; threads) { - thread = new ConnectionThread(this, handler, cast(int) i); - thread.start(); - } + import std.parallelism; - /+ - version(linux) { + + version(cgi_use_fiber) { import core.sys.linux.epoll; - epoll_fd = epoll_create1(EPOLL_CLOEXEC); - if(epoll_fd == -1) + epfd = epoll_create1(EPOLL_CLOEXEC); + if(epfd == -1) throw new Exception("epoll_create1 " ~ to!string(errno)); scope(exit) { import core.sys.posix.unistd; - close(epoll_fd); + close(epfd); } - epoll_event[64] events; - epoll_event ev; - ev.events = EPOLLIN; + ev.events = EPOLLIN | EPOLLEXCLUSIVE; // EPOLLEXCLUSIVE is only available on kernels since like 2017 but that's prolly good enough. ev.data.fd = listener.handle; - if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listener.handle, &ev) == -1) + if(epoll_ctl(epfd, EPOLL_CTL_ADD, listener.handle, &ev) == -1) throw new Exception("epoll_ctl " ~ to!string(errno)); + + WorkerThread[] threads = new WorkerThread[](totalCPUs + 1); + foreach(i, ref thread; threads) { + thread = new WorkerThread(this, handler, cast(int) i); + thread.start(); + } + + bool fiber_crash_check() { + bool hasAnyRunning; + foreach(thread; threads) { + if(!thread.isRunning) { + thread.join(); + } else hasAnyRunning = true; + } + + return (!hasAnyRunning); + } + + + while(running) { + Thread.sleep(1.seconds); + if(fiber_crash_check()) + break; + } + + } else { + // I times 4 here because there's a good chance some will be blocked on i/o. + ConnectionThread[] threads = new ConnectionThread[](totalCPUs * 4); + foreach(i, ref thread; threads) { + thread = new ConnectionThread(this, handler, cast(int) i); + thread.start(); + } } - +/ while(!loopBroken && running) { Socket sn; + bool crash_check() { + bool hasAnyRunning; + foreach(thread; threads) { + if(!thread.isRunning) { + thread.join(); + } else hasAnyRunning = true; + } + + return (!hasAnyRunning); + } + + void accept_new_connection() { sn = listener.accept(); cloexec(sn); @@ -4191,50 +4549,9 @@ class ListeningConnectionManager { semaphore.notify(); } - bool crash_check() { - bool hasAnyRunning; - foreach(thread; threads) { - if(!thread.isRunning) { - thread.join(); - } else hasAnyRunning = true; - } - return (!hasAnyRunning); - } - - - /+ - version(linux) { - auto nfds = epoll_wait(epoll_fd, events.ptr, events.length, -1); - if(nfds == -1) { - if(errno == EINTR) - continue; - throw new Exception("epoll_wait " ~ to!string(errno)); - } - - foreach(idx; 0 .. nfds) { - auto flags = events[idx].events; - auto fd = events[idx].data.fd; - - if(fd == listener.handle) { - accept_new_connection(); - existing_connection_new_data(); - } else { - if(flags & (EPOLLHUP | EPOLLERR | EPOLLRDHUP)) { - import core.sys.posix.unistd; - close(fd); - } else { - sn = new Socket(cast(socket_t) fd, tcp ? AddressFamily.INET : AddressFamily.UNIX); - import std.stdio; writeln("existing_connection_new_data"); - existing_connection_new_data(); - } - } - } - } else { - +/ - accept_new_connection(); - existing_connection_new_data(); - //} + accept_new_connection(); + existing_connection_new_data(); if(crash_check()) break; @@ -4306,10 +4623,19 @@ class ListeningConnectionManager { void sendAll(Socket s, const(void)[] data, string file = __FILE__, size_t line = __LINE__) { if(data.length == 0) return; ptrdiff_t amount; + //import std.stdio; writeln("***",cast(string) data,"///"); do { amount = s.send(data); - if(amount == Socket.ERROR) + if(amount == Socket.ERROR) { + version(cgi_use_fiber) { + if(wouldHaveBlocked()) { + bool registered = true; + registerEventWakeup(®istered, s, WakeupEvent.Write); + continue; + } + } throw new ConnectionException(s, lastSocketError, file, line); + } assert(amount > 0); data = data[amount .. $]; } while(data.length); @@ -4431,6 +4757,57 @@ class ConnectionThread : Thread { int myThreadNumber; } +version(cgi_use_fiber) +class WorkerThread : Thread { + this(ListeningConnectionManager lcm, CMT dg, int myThreadNumber) { + this.lcm = lcm; + this.dg = dg; + this.myThreadNumber = myThreadNumber; + super(&run); + } + + void run() { + while(lcm.running) { + Socket sn; + + epoll_event[64] events; + auto nfds = epoll_wait(epfd, events.ptr, events.length, -1); + if(nfds == -1) { + if(errno == EINTR) + continue; + throw new Exception("epoll_wait " ~ to!string(errno)); + } + + foreach(idx; 0 .. nfds) { + auto flags = events[idx].events; + + if(cast(size_t) events[idx].data.ptr == cast(size_t) lcm.listener.handle) { + sn = lcm.listener.accept(); + cloexec(sn); + if(lcm.tcp) { + // disable Nagle's algorithm to avoid a 40ms delay when we send/recv + // on the socket because we do some buffering internally. I think this helps, + // certainly does for small requests, and I think it does for larger ones too + sn.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1); + + sn.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, dur!"seconds"(10)); + } + + dg(sn); + } else { + auto fiber = cast(CgiFiber) events[idx].data.ptr; + fiber.proceed(); + } + } + } + } + + ListeningConnectionManager lcm; + CMT dg; + int myThreadNumber; +} + + /* Done with network helper */ /* Helpers for doing temporary files. Used both here and in web.d */ @@ -7713,20 +8090,25 @@ html", true, true); enum methodMeta = null; } + void presentSuccessfulReturn(T, Meta)(Cgi cgi, T ret, Meta meta, string format) { + // FIXME? format? + (cast(CRTP) this).presentSuccessfulReturnAsHtml(cgi, ret, meta); + } + /// typeof(null) (which is also used to represent functions returning `void`) do nothing /// in the default presenter - allowing the function to have full low-level control over the /// response. - void presentSuccessfulReturnAsHtml(T : typeof(null))(Cgi cgi, T ret, typeof(null) meta) { + void presentSuccessfulReturn(T : typeof(null))(Cgi cgi, T ret, typeof(null) meta, string format) { // nothing intentionally! } /// Redirections are forwarded to [Cgi.setResponseLocation] - void presentSuccessfulReturnAsHtml(T : Redirection)(Cgi cgi, T ret, typeof(null) meta) { + void presentSuccessfulReturn(T : Redirection)(Cgi cgi, T ret, typeof(null) meta, string format) { cgi.setResponseLocation(ret.to, true, getHttpCodeText(ret.code)); } /// Multiple responses deconstruct the algebraic type and forward to the appropriate handler at runtime - void presentSuccessfulReturnAsHtml(T : MultipleResponses!Types, Types...)(Cgi cgi, T ret, typeof(null) meta) { + void presentSuccessfulReturn(T : MultipleResponses!Types, Types...)(Cgi cgi, T ret, typeof(null) meta, string format) { bool outputted = false; foreach(index, type; Types) { if(ret.contains == index) { @@ -7740,13 +8122,13 @@ html", true, true); } /// An instance of the [arsd.dom.FileResource] interface has its own content type; assume it is a download of some sort. - void presentSuccessfulReturnAsHtml(T : FileResource)(Cgi cgi, T ret, typeof(null) meta) { + void presentSuccessfulReturn(T : FileResource)(Cgi cgi, T ret, typeof(null) meta, string format) { cgi.setCache(true); // not necessarily true but meh cgi.setResponseContentType(ret.contentType); cgi.write(ret.getData(), true); } - /// And the default handler will call [formatReturnValueAsHtml] and place it inside the [htmlContainer]. + /// And the default handler for HTML will call [formatReturnValueAsHtml] and place it inside the [htmlContainer]. void presentSuccessfulReturnAsHtml(T)(Cgi cgi, T ret, typeof(null) meta) { auto container = this.htmlContainer(); container.appendChild(formatReturnValueAsHtml(ret)); @@ -8501,7 +8883,7 @@ private auto serveApiInternal(T)(string urlPrefix) { // a void return (or typeof(null) lol) means you, the user, is doing it yourself. Gives full control. try { auto ret = callFromCgi!(__traits(getOverloads, obj, methodName)[idx])(&(__traits(getOverloads, obj, methodName)[idx]), cgi); - presenter.presentSuccessfulReturnAsHtml(cgi, ret, presenter.methodMeta!(__traits(getOverloads, obj, methodName)[idx])); + presenter.presentSuccessfulReturn(cgi, ret, presenter.methodMeta!(__traits(getOverloads, obj, methodName)[idx]), "html"); } catch(Throwable t) { presenter.presentExceptionAsHtml!(__traits(getOverloads, obj, methodName)[idx])(cgi, t, &(__traits(getOverloads, obj, methodName)[idx])); } @@ -9462,6 +9844,54 @@ template dispatcher(definitions...) { }); +private struct StackBuffer { + char[1024] initial = void; + char[] buffer; + size_t position; + + this(int a) { + buffer = initial[]; + } + + void add(in char[] what) { + if(position + what.length > buffer.length) + buffer.length = position + what.length + 1024; // reallocate with GC to handle special cases + buffer[position .. position + what.length] = what[]; + position += what.length; + } + + void add(in char[] w1, in char[] w2, in char[] w3 = null) { + add(w1); + add(w2); + add(w3); + } + + void add(long v) { + char[16] buffer = void; + auto pos = buffer.length; + bool negative; + if(v < 0) { + negative = true; + v = -v; + } + do { + buffer[--pos] = cast(char) (v % 10 + '0'); + v /= 10; + } while(v); + + if(negative) + buffer[--pos] = '-'; + + auto res = buffer[pos .. $]; + + add(res[]); + } + + char[] get() @nogc { + return buffer[0 .. position]; + } +} + /+ /++ This is the beginnings of my web.d 2.0 - it dispatches web requests to a class object.