diff --git a/cgi.d b/cgi.d index 66a9be5..eaf57ad 100644 --- a/cgi.d +++ b/cgi.d @@ -66,6 +66,34 @@ void main() { --- + Compile_versions: + + -version=plain_cgi + The default - a traditional, plain CGI executable will be generated. + -version=fastcgi + A FastCGI executable will be generated. + -version=scgi + A SCGI (SimpleCGI) executable will be generated. + -version=embedded_httpd + A HTTP server will be embedded in the generated executable. + -version=embedded_httpd_threads + The embedded HTTP server will use a single process with a thread pool. + -version=embedded_httpd_processes + The embedded HTTP server will use a prefork style process pool. + + -version=cgi_with_websocket + The CGI class has websocket server support. + + -version=with_openssl # not currently used + + -version=embedded_httpd_processes_accept_after_fork + It will call accept() in each child process, after forking. This is currently the only option, though I am experimenting with other ideas. + + -version=cgi_embedded_sessions + The session server will be embedded in the cgi.d server process + -version=cgi_session_server_process + The session will be provided in a separate process, provided by cgi.d. + Compile_and_run: For CGI, `dmd yourfile.d cgi.d` then put the executable in your cgi-bin directory. @@ -253,7 +281,7 @@ void main() { Copyright: - cgi.d copyright 2008-2018, Adam D. Ruppe. Provided under the Boost Software License. + cgi.d copyright 2008-2019, Adam D. Ruppe. Provided under the Boost Software License. Yes, this file is almost ten years old, and yes, it is still actively maintained and used. +/ @@ -264,8 +292,9 @@ static import std.file; version(embedded_httpd) { version(linux) version=embedded_httpd_processes; - else + else { version=embedded_httpd_threads; + } /* version(with_openssl) { @@ -275,6 +304,31 @@ version(embedded_httpd) { */ } +version(embedded_httpd_processes) + version=embedded_httpd_processes_accept_after_fork; // I am getting much better average performance on this, so just keeping it. But the other way MIGHT help keep the variation down so i wanna keep the code to play with later + +version(embedded_httpd_threads) { + // unless the user overrides the default.. + version(cgi_session_server_process) + {} + else + version=cgi_embedded_sessions; +} +version(scgi) { + // unless the user overrides the default.. + version(cgi_session_server_process) + {} + else + version=cgi_embedded_sessions; +} + +// fall back if the other is not defined so we can cleanly version it below +version(cgi_embedded_sessions) {} +else version=cgi_session_server_process; + + +version=cgi_with_websocket; + enum long defaultMaxContentLength = 5_000_000; /* @@ -681,6 +735,14 @@ class Cgi { this.postJson = null; } + version(Posix) + int getOutputFileHandle() { + return _outputFileHandle; + } + + version(Posix) + int _outputFileHandle = -1; + /** Initializes it using a CGI or CGI-like interface */ this(long maxContentLength = defaultMaxContentLength, // use this to override the environment variable listing @@ -2832,315 +2894,402 @@ bool isCgiRequestMethod(string s) { /// If you want to use a subclass of Cgi with generic main, use this mixin. mixin template CustomCgiMain(CustomCgi, alias fun, long maxContentLength = defaultMaxContentLength) if(is(CustomCgi : Cgi)) { // kinda hacky - the T... is passed to Cgi's constructor in standard cgi mode, and ignored elsewhere - mixin CustomCgiMainImpl!(CustomCgi, fun, maxContentLength) customCgiMainImpl_; - void main(string[] args) { - customCgiMainImpl_.cgiMainImpl(args); + cgiMainImpl!(fun, CustomCgi, maxContentLength)(args); } } version(embedded_httpd_processes) int processPoolSize = 8; -mixin template CustomCgiMainImpl(CustomCgi, alias fun, long maxContentLength = defaultMaxContentLength) if(is(CustomCgi : Cgi)) { - void cgiMainImpl(string[] args) { - - - // we support command line thing for easy testing everywhere - // it needs to be called ./app method uri [other args...] - if(args.length >= 3 && isCgiRequestMethod(args[1])) { - Cgi cgi = new CustomCgi(args); - scope(exit) cgi.dispose(); - fun(cgi); - cgi.close(); - return; +void cgiMainImpl(alias fun, CustomCgi = Cgi, long maxContentLength = defaultMaxContentLength)(string[] args) if(is(CustomCgi : Cgi)) { + if(args.length > 1) { + // run the special separate processes if needed + switch(args[1]) { + case "--websocket-server": + runWebsocketServer(); + return; + case "--session-server": + runSessionServer(); + return; + case "--event-server": + runEventServer(); + return; + default: + // intentionally blank - do nothing and carry on to run normally } + } + + // we support command line thing for easy testing everywhere + // it needs to be called ./app method uri [other args...] + if(args.length >= 3 && isCgiRequestMethod(args[1])) { + Cgi cgi = new CustomCgi(args); + scope(exit) cgi.dispose(); + fun(cgi); + cgi.close(); + return; + } - ushort listeningPort(ushort def) { - bool found = false; - foreach(arg; args) { - if(found) - return to!ushort(arg); - if(arg == "--port" || arg == "-p" || arg == "/port" || arg == "--listening-port") - found = true; + ushort listeningPort(ushort def) { + bool found = false; + foreach(arg; args) { + if(found) + return to!ushort(arg); + if(arg == "--port" || arg == "-p" || arg == "/port" || arg == "--listening-port") + found = true; + } + return def; + } + + string listeningHost() { + bool found = false; + foreach(arg; args) { + if(found) + return arg; + if(arg == "--listening-host" || arg == "-h" || arg == "/listening-host") + found = true; + } + return ""; + } + version(netman_httpd) { + import arsd.httpd; + // what about forwarding the other constructor args? + // this probably needs a whole redoing... + serveHttp!CustomCgi(&fun, listeningPort(8080));//5005); + return; + } else + version(embedded_httpd_processes) { + import core.sys.posix.unistd; + import core.sys.posix.sys.socket; + import core.sys.posix.netinet.in_; + //import std.c.linux.socket; + + int sock = socket(AF_INET, SOCK_STREAM, 0); + if(sock == -1) + throw new Exception("socket"); + + { + sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons(listeningPort(8085)); + auto lh = listeningHost(); + if(lh.length) { + if(inet_pton(AF_INET, lh.toStringz(), &addr.sin_addr.s_addr) != 1) + throw new Exception("bad listening host given, please use an IP address.\nExample: --listening-host 127.0.0.1 means listen only on Localhost.\nExample: --listening-host 0.0.0.0 means listen on all interfaces.\nOr you can pass any other single numeric IPv4 address."); + } else + addr.sin_addr.s_addr = INADDR_ANY; + + // HACKISH + int on = 1; + setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &on, on.sizeof); + // end hack + + + if(bind(sock, cast(sockaddr*) &addr, addr.sizeof) == -1) { + close(sock); + throw new Exception("bind"); } - return def; - } - string listeningHost() { - bool found = false; - foreach(arg; args) { - if(found) - return arg; - if(arg == "--listening-host" || arg == "-h" || arg == "/listening-host") - found = true; + // FIXME: if this queue is full, it will just ignore it + // and wait for the client to retransmit it. This is an + // obnoxious timeout condition there. + if(sock.listen(128) == -1) { + close(sock); + throw new Exception("listen"); } - return ""; } - version(netman_httpd) { - import arsd.httpd; - // what about forwarding the other constructor args? - // this probably needs a whole redoing... - serveHttp!CustomCgi(&fun, listeningPort(8080));//5005); - return; - } else - version(embedded_httpd_processes) { - import core.sys.posix.unistd; - import core.sys.posix.sys.socket; - import core.sys.posix.netinet.in_; - //import std.c.linux.socket; - int sock = socket(AF_INET, SOCK_STREAM, 0); - if(sock == -1) - throw new Exception("socket"); + version(embedded_httpd_processes_accept_after_fork) {} else { + int pipeReadFd; + int pipeWriteFd; { - sockaddr_in addr; - addr.sin_family = AF_INET; - addr.sin_port = htons(listeningPort(8085)); - auto lh = listeningHost(); - if(lh.length) { - if(inet_pton(AF_INET, lh.toStringz(), &addr.sin_addr.s_addr) != 1) - throw new Exception("bad listening host given, please use an IP address.\nExample: --listening-host 127.0.0.1 means listen only on Localhost.\nExample: --listening-host 0.0.0.0 means listen on all interfaces.\nOr you can pass any other single numeric IPv4 address."); - } else - addr.sin_addr.s_addr = INADDR_ANY; - - // HACKISH - int on = 1; - setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &on, on.sizeof); - // end hack - - - if(bind(sock, cast(sockaddr*) &addr, addr.sizeof) == -1) { - close(sock); - throw new Exception("bind"); + int[2] pipeFd; + if(socketpair(AF_UNIX, SOCK_DGRAM, 0, pipeFd)) { + import core.stdc.errno; + throw new Exception("pipe failed " ~ to!string(errno)); } - // FIXME: if this queue is full, it will just ignore it - // and wait for the client to retransmit it. This is an - // obnoxious timeout condition there. - if(sock.listen(128) == -1) { - close(sock); - throw new Exception("listen"); - } + pipeReadFd = pipeFd[0]; + pipeWriteFd = pipeFd[1]; } + } - int processCount; - pid_t newPid; - reopen: - while(processCount < processPoolSize) { - newPid = fork(); - if(newPid == 0) { - // start serving on the socket - //ubyte[4096] backingBuffer; - for(;;) { - bool closeConnection; + int processCount; + pid_t newPid; + reopen: + while(processCount < processPoolSize) { + newPid = fork(); + if(newPid == 0) { + // start serving on the socket + //ubyte[4096] backingBuffer; + for(;;) { + bool closeConnection; + uint i; + sockaddr addr; + i = addr.sizeof; + version(embedded_httpd_processes_accept_after_fork) + int s = accept(sock, &addr, &i); + else { + int s; + auto readret = read_fd(pipeReadFd, &s, s.sizeof, &s); + if(readret != s.sizeof) { + import core.stdc.errno; + throw new Exception("pipe read failed " ~ to!string(errno)); + } + + //writeln("process ", getpid(), " got socket ", s); + } + + try { + + if(s == -1) + throw new Exception("accept"); + + scope(failure) close(s); + //ubyte[__traits(classInstanceSize, BufferedInputRange)] bufferedRangeContainer; + auto ir = new BufferedInputRange(s); + //auto ir = emplace!BufferedInputRange(bufferedRangeContainer, s, backingBuffer); + + while(!ir.empty) { + ubyte[__traits(classInstanceSize, CustomCgi)] cgiContainer; + + Cgi cgi; + try { + cgi = new CustomCgi(ir, &closeConnection); + cgi._outputFileHandle = s; + // if we have a single process and the browser tries to leave the connection open while concurrently requesting another, it will block everything an deadlock since there's no other server to accept it. By closing after each request in this situation, it tells the browser to serialize for us. + if(processPoolSize == 1) + closeConnection = true; + //cgi = emplace!CustomCgi(cgiContainer, ir, &closeConnection); + } catch(Throwable t) { + // a construction error is either bad code or bad request; bad request is what it should be since this is bug free :P + // anyway let's kill the connection + stderr.writeln(t.toString()); + sendAll(ir.source, plainHttpError(false, "400 Bad Request", t)); + closeConnection = true; + break; + } + assert(cgi !is null); + scope(exit) + cgi.dispose(); + + try { + fun(cgi); + cgi.close(); + } catch(ConnectionException ce) { + closeConnection = true; + } catch(Throwable t) { + // a processing error can be recovered from + stderr.writeln(t.toString); + if(!handleException(cgi, t)) + closeConnection = true; + } + + if(closeConnection) { + ir.source.close(); + break; + } else { + if(!ir.empty) + ir.popFront(); // get the next + else if(ir.sourceClosed) { + ir.source.close(); + } + } + } + + ir.source.close(); + } catch(Throwable t) { + debug writeln(t); + // most likely cause is a timeout + } + } + } else { + processCount++; + } + } + + // the parent should wait for its children... + if(newPid) { + import core.sys.posix.sys.wait; + + version(embedded_httpd_processes_accept_after_fork) {} else { + import core.sys.posix.sys.select; + int[] fdQueue; + while(true) { + // writeln("select call"); + int nfds = pipeWriteFd; + if(sock > pipeWriteFd) + nfds = sock; + nfds += 1; + fd_set read_fds; + fd_set write_fds; + FD_ZERO(&read_fds); + FD_ZERO(&write_fds); + FD_SET(sock, &read_fds); + if(fdQueue.length) + FD_SET(pipeWriteFd, &write_fds); + auto ret = select(nfds, &read_fds, &write_fds, null, null); + if(ret == -1) { + import core.stdc.errno; + if(errno == EINTR) + goto try_wait; + else + throw new Exception("wtf select"); + } + + int s = -1; + if(FD_ISSET(sock, &read_fds)) { uint i; sockaddr addr; i = addr.sizeof; - int s = accept(sock, &addr, &i); + s = accept(sock, &addr, &i); + } - try { - - if(s == -1) - throw new Exception("accept"); - - scope(failure) close(s); - //ubyte[__traits(classInstanceSize, BufferedInputRange)] bufferedRangeContainer; - auto ir = new BufferedInputRange(s); - //auto ir = emplace!BufferedInputRange(bufferedRangeContainer, s, backingBuffer); - - while(!ir.empty) { - ubyte[__traits(classInstanceSize, CustomCgi)] cgiContainer; - - Cgi cgi; - try { - cgi = new CustomCgi(ir, &closeConnection); - // if we have a single process and the browser tries to leave the connection open while concurrently requesting another, it will block everything an deadlock since there's no other server to accept it. By closing after each request in this situation, it tells the browser to serialize for us. - if(processPoolSize == 1) - closeConnection = true; - //cgi = emplace!CustomCgi(cgiContainer, ir, &closeConnection); - } catch(Throwable t) { - // a construction error is either bad code or bad request; bad request is what it should be since this is bug free :P - // anyway let's kill the connection - stderr.writeln(t.toString()); - sendAll(ir.source, plainHttpError(false, "400 Bad Request", t)); - closeConnection = true; - break; - } - assert(cgi !is null); - scope(exit) - cgi.dispose(); - - try { - fun(cgi); - cgi.close(); - } catch(ConnectionException ce) { - closeConnection = true; - } catch(Throwable t) { - // a processing error can be recovered from - stderr.writeln(t.toString); - if(!handleException(cgi, t)) - closeConnection = true; - } - - if(closeConnection) { - ir.source.close(); - break; - } else { - if(!ir.empty) - ir.popFront(); // get the next - else if(ir.sourceClosed) { - ir.source.close(); - } - } - } - - ir.source.close(); - } catch(Throwable t) { - debug writeln(t); - // most likely cause is a timeout + if(FD_ISSET(pipeWriteFd, &write_fds)) { + if(s == -1 && fdQueue.length) { + s = fdQueue[0]; + fdQueue = fdQueue[1 .. $]; // FIXME reuse buffer } - } - } else { - processCount++; + write_fd(pipeWriteFd, &s, s.sizeof, s); + close(s); // we are done with it, let the other process take ownership + } else + fdQueue ~= s; } } - // the parent should wait for its children... - if(newPid) { - import core.sys.posix.sys.wait; - int status; - // FIXME: maybe we should respawn if one dies unexpectedly - while(-1 != wait(&status)) { + try_wait: + + int status; + while(-1 != wait(&status)) { import std.stdio; writeln("Process died ", status); - processCount--; - goto reopen; - } - close(sock); + processCount--; + goto reopen; } - } else - version(embedded_httpd_threads) { - auto manager = new ListeningConnectionManager(listeningHost(), listeningPort(8085), &doThreadHttpConnection!(CustomCgi, fun)); - manager.listen(); - } else - version(scgi) { - import std.exception; - import al = std.algorithm; - auto manager = new ListeningConnectionManager(listeningHost(), listeningPort(4000), &doThreadScgiConnection!(CustomCgi, fun, maxContentLength)); - manager.listen(); - } else - version(fastcgi) { - // SetHandler fcgid-script - FCGX_Stream* input, output, error; - FCGX_ParamArray env; + close(sock); + } + } else + version(embedded_httpd_threads) { + auto manager = new ListeningConnectionManager(listeningHost(), listeningPort(8085), &doThreadHttpConnection!(CustomCgi, fun)); + manager.listen(); + } else + version(scgi) { + import std.exception; + import al = std.algorithm; + auto manager = new ListeningConnectionManager(listeningHost(), listeningPort(4000), &doThreadScgiConnection!(CustomCgi, fun, maxContentLength)); + manager.listen(); + } else + version(fastcgi) { + // SetHandler fcgid-script + FCGX_Stream* input, output, error; + FCGX_ParamArray env; - const(ubyte)[] getFcgiChunk() { - const(ubyte)[] ret; - while(FCGX_HasSeenEOF(input) != -1) - ret ~= cast(ubyte) FCGX_GetChar(input); - return ret; + const(ubyte)[] getFcgiChunk() { + const(ubyte)[] ret; + while(FCGX_HasSeenEOF(input) != -1) + ret ~= cast(ubyte) FCGX_GetChar(input); + return ret; + } + + void writeFcgi(const(ubyte)[] data) { + FCGX_PutStr(data.ptr, data.length, output); + } + + void doARequest() { + string[string] fcgienv; + + for(auto e = env; e !is null && *e !is null; e++) { + string cur = to!string(*e); + auto idx = cur.indexOf("="); + string name, value; + if(idx == -1) + name = cur; + else { + name = cur[0 .. idx]; + value = cur[idx + 1 .. $]; + } + + fcgienv[name] = value; } - void writeFcgi(const(ubyte)[] data) { - FCGX_PutStr(data.ptr, data.length, output); + void flushFcgi() { + FCGX_FFlush(output); } - void doARequest() { - string[string] fcgienv; - - for(auto e = env; e !is null && *e !is null; e++) { - string cur = to!string(*e); - auto idx = cur.indexOf("="); - string name, value; - if(idx == -1) - name = cur; - else { - name = cur[0 .. idx]; - value = cur[idx + 1 .. $]; - } - - fcgienv[name] = value; - } - - void flushFcgi() { - FCGX_FFlush(output); - } - - Cgi cgi; - try { - cgi = new CustomCgi(maxContentLength, fcgienv, &getFcgiChunk, &writeFcgi, &flushFcgi); - } catch(Throwable t) { - FCGX_PutStr(cast(ubyte*) t.msg.ptr, t.msg.length, error); - writeFcgi(cast(const(ubyte)[]) plainHttpError(true, "400 Bad Request", t)); - return; //continue; - } - assert(cgi !is null); - scope(exit) cgi.dispose(); - try { - fun(cgi); - cgi.close(); - } catch(Throwable t) { - // log it to the error stream - FCGX_PutStr(cast(ubyte*) t.msg.ptr, t.msg.length, error); - // handle it for the user, if we can - if(!handleException(cgi, t)) - return; // continue; - } - } - - auto lp = listeningPort(0); - FCGX_Request request; - if(lp) { - // if a listening port was specified on the command line, we want to spawn ourself - // (needed for nginx without spawn-fcgi, e.g. on Windows) - FCGX_Init(); - auto sock = FCGX_OpenSocket(toStringz(listeningHost() ~ ":" ~ to!string(lp)), 12); - if(sock < 0) - throw new Exception("Couldn't listen on the port"); - FCGX_InitRequest(&request, sock, 0); - while(FCGX_Accept_r(&request) >= 0) { - input = request.inStream; - output = request.outStream; - error = request.errStream; - env = request.envp; - doARequest(); - } - } else { - // otherwise, assume the httpd is doing it (the case for Apache, IIS, and Lighttpd) - // using the version with a global variable since we are separate processes anyway - while(FCGX_Accept(&input, &output, &error, &env) >= 0) { - doARequest(); - } - } - } else { - // standard CGI is the default version Cgi cgi; try { - cgi = new CustomCgi(maxContentLength); + cgi = new CustomCgi(maxContentLength, fcgienv, &getFcgiChunk, &writeFcgi, &flushFcgi); } catch(Throwable t) { - stderr.writeln(t.msg); - // the real http server will probably handle this; - // most likely, this is a bug in Cgi. But, oh well. - stdout.write(plainHttpError(true, "400 Bad Request", t)); - return; + FCGX_PutStr(cast(ubyte*) t.msg.ptr, t.msg.length, error); + writeFcgi(cast(const(ubyte)[]) plainHttpError(true, "400 Bad Request", t)); + return; //continue; } assert(cgi !is null); scope(exit) cgi.dispose(); - try { fun(cgi); cgi.close(); - } catch (Throwable t) { - stderr.writeln(t.msg); + } catch(Throwable t) { + // log it to the error stream + FCGX_PutStr(cast(ubyte*) t.msg.ptr, t.msg.length, error); + // handle it for the user, if we can if(!handleException(cgi, t)) - return; + return; // continue; } } + + auto lp = listeningPort(0); + FCGX_Request request; + if(lp) { + // if a listening port was specified on the command line, we want to spawn ourself + // (needed for nginx without spawn-fcgi, e.g. on Windows) + FCGX_Init(); + auto sock = FCGX_OpenSocket(toStringz(listeningHost() ~ ":" ~ to!string(lp)), 12); + if(sock < 0) + throw new Exception("Couldn't listen on the port"); + FCGX_InitRequest(&request, sock, 0); + while(FCGX_Accept_r(&request) >= 0) { + input = request.inStream; + output = request.outStream; + error = request.errStream; + env = request.envp; + doARequest(); + } + } else { + // otherwise, assume the httpd is doing it (the case for Apache, IIS, and Lighttpd) + // using the version with a global variable since we are separate processes anyway + while(FCGX_Accept(&input, &output, &error, &env) >= 0) { + doARequest(); + } + } + } else { + // standard CGI is the default version + Cgi cgi; + try { + cgi = new CustomCgi(maxContentLength); + cgi._outputFileHandle = 1; // stdout + } catch(Throwable t) { + stderr.writeln(t.msg); + // the real http server will probably handle this; + // most likely, this is a bug in Cgi. But, oh well. + stdout.write(plainHttpError(true, "400 Bad Request", t)); + return; + } + assert(cgi !is null); + scope(exit) cgi.dispose(); + + try { + fun(cgi); + cgi.close(); + } catch (Throwable t) { + stderr.writeln(t.msg); + if(!handleException(cgi, t)) + return; + } } } @@ -3161,6 +3310,7 @@ void doThreadHttpConnection(CustomCgi, alias fun)(Socket connection) { Cgi cgi; try { cgi = new CustomCgi(ir, &closeConnection); + cgi._outputFileHandle = connection.handle; } catch(ConnectionException ce) { // broken pipe or something, just abort the connection closeConnection = true; @@ -3276,6 +3426,7 @@ void doThreadScgiConnection(CustomCgi, alias fun, long maxContentLength)(Socket Cgi cgi; try { cgi = new CustomCgi(maxContentLength, headers, &getScgiChunk, &writeScgi, &flushScgi); + cgi._outputFileHandle = connection.handle; } catch(Throwable t) { sendAll(connection, plainHttpError(true, "400 Bad Request", t)); connection.close(); @@ -3438,7 +3589,7 @@ import std.socket; // it is a class primarily for reference semantics // I might change this interface -/// +/// This is NOT ACTUALLY an input range! It is too different. Historical mistake kinda. class BufferedInputRange { version(Posix) this(int source, ubyte[] buffer = null) { @@ -4051,10 +4202,11 @@ version(cgi_with_websocket) { cgi.header("Connection: upgrade"); string key = cgi.requestHeaders["sec-websocket-key"]; - key ~= "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; + key ~= "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; // the defined guid from the websocket spec - import arsd.sha; - auto accept = Base64.encode(SHA1(key)); + import std.digest.sha; + auto hash = sha1Of(key); + auto accept = Base64.encode(hash); cgi.header(("Sec-WebSocket-Accept: " ~ accept).idup); @@ -4266,12 +4418,721 @@ version(Windows) else static assert(0); } +version(Posix) +private extern(C) int posix_spawn(pid_t*, const char*, void*, void*, const char**, const char**); + + +// FIXME: these aren't quite public yet. +//private: + +// template for laziness +void startWebsocketServer()() { + version(linux) { + import core.sys.posix.unistd; + pid_t pid; + const(char)*[16] args; + args[0] = "ARSD_CGI_WEBSOCKET_SERVER"; + args[1] = "--websocket-server"; + posix_spawn(&pid, "/proc/self/exe", + null, + null, + args.ptr, + null // env + ); + } else version(Windows) { + wchar[2048] filename; + auto len = GetModuleFileNameW(null, filename.ptr, cast(DWORD) filename.length); + if(len == 0 || len == filename.length) + throw new Exception("could not get process name to start helper server"); + + STARTUPINFOW startupInfo; + startupInfo.cb = cast(DWORD) startupInfo.sizeof; + PROCESS_INFORMATION processInfo; + + // I *MIGHT* need to run it as a new job or a service... + auto ret = CreateProcessW( + filename.ptr, + "--websocket-server"w, + null, // process attributes + null, // thread attributes + false, // inherit handles + 0, // creation flags + null, // environment + null, // working directory + &startupInfo, + &processInfo + ); + + if(!ret) + throw new Exception("create process failed"); + + // when done with those, if we set them + /* + CloseHandle(hStdInput); + CloseHandle(hStdOutput); + CloseHandle(hStdError); + */ + + } else static assert(0, "Websocket server not implemented on this system yet (email me, i can prolly do it if you need it)"); +} + +// template for laziness /* -Copyright: Adam D. Ruppe, 2008 - 2016 + The websocket server is a single-process, single-thread, event + I/O thing. It is passed websockets from other CGI processes + and is then responsible for handling their messages and responses. + Note that the CGI process is responsible for websocket setup, + including authentication, etc. + + It also gets data sent to it by other processes and is responsible + for distributing that, as necessary. +*/ +void runWebsocketServer()() { + assert(0, "not implemented"); +} + +void sendToWebsocketServer(WebSocket ws, string group) { + assert(0, "not implemented"); +} + +void sendToWebsocketServer(string content, string group) { + assert(0, "not implemented"); +} + + +void runEventServer()() { + runAddonServer("/tmp/arsd_cgi_event_server"); +} + +// sends this cgi request to the event server so it will be fed events. You should not do anything else with the cgi object after this. +void sendConnectionToEventServer()(Cgi cgi, in char[] eventUrl) { + + cgi.setResponseContentType("text/event-stream"); + cgi.write(":\n"); // to initialize the chunking and send headers before keeping the fd for later + cgi.flush(); + + cgi.closed = true; + auto s = openLocalServerConnection("/tmp/arsd_cgi_event_server"); + scope(exit) + closeLocalServerConnection(s); + + version(fastcgi) + static assert(0, "sending fcgi connections not supported"); + + int fd = cgi.getOutputFileHandle(); + if(fd == -1) + throw new Exception("bad fd from cgi!"); + + char[1024] buffer; + buffer[0] = cgi.responseChunked ? 1 : 0; + + buffer[1 .. eventUrl.length + 1] = eventUrl[]; + + auto res = write_fd(s, buffer.ptr, 1 + eventUrl.length, fd); + assert(res == 1 + eventUrl.length); +} + +void sendEventToEventServer()(string url, string event, string data, int lifetime) { + auto s = openLocalServerConnection("/tmp/arsd_cgi_event_server"); + scope(exit) + closeLocalServerConnection(s); + + SendableEvent sev; + sev.populate(url, event, data, lifetime); + + auto ret = send(s, &sev, sev.sizeof, 0); + assert(ret == sev.sizeof); +} + +version(Posix) + alias LocalServerConnectionHandle = int; +else version(Windows) + alias LocalServerConnectionHandle = HANDLE; + +LocalServerConnectionHandle openLocalServerConnection(string name) { + version(Posix) { + import core.sys.posix.unistd; + import core.sys.posix.sys.un; + + int sock = socket(AF_UNIX, SOCK_STREAM, 0); + if(sock == -1) + throw new Exception("socket " ~ to!string(errno)); + + scope(failure) + close(sock); + + // add-on server processes are assumed to be local, and thus will + // use unix domain sockets. Besides, I want to pass sockets to them, + // so it basically must be local (except for the session server, but meh). + sockaddr_un addr; + addr.sun_family = AF_UNIX; + version(linux) { + // on linux, we will use the abstract namespace + addr.sun_path[0] = 0; + addr.sun_path[1 .. name.length + 1] = cast(typeof(addr.sun_path[])) name[]; + } else { + // but otherwise, just use a file cuz we must. + addr.sun_path[0 .. name.length] = cast(typeof(addr.sun_path[])) name[]; + } + + if(connect(sock, cast(sockaddr*) &addr, addr.sizeof) == -1) + throw new Exception("connect " ~ to!string(errno)); + + return sock; + } +} + +void closeLocalServerConnection(LocalServerConnectionHandle handle) { + version(Posix) { + import core.sys.posix.unistd; + close(handle); + } else version(Windows) + CloseHandle(handle); +} + +void runSessionServer()() { + /+ + The session server api should prolly be: + + setSessionValues + getSessionValues + changeSessionId + createSession + destroySesson + +/ + assert(0, "not implemented"); +} + +version(Posix) +private void makeNonBlocking(int fd) { + import core.sys.posix.fcntl; + auto flags = fcntl(fd, F_GETFL, 0); + if(flags == -1) + throw new Exception("fcntl get"); + flags |= O_NONBLOCK; + auto s = fcntl(fd, F_SETFL, flags); + if(s == -1) + throw new Exception("fcntl set"); +} + +import core.stdc.errno; + +struct IoOp { + @disable this(); + @disable this(this); + + enum Read = 1; + enum Write = 2; + enum Accept = 3; + enum ReadSocketHandle = 4; + + // Your handler may be called in a different thread than the one that initiated the IO request! + // It is also possible to have multiple io requests being called simultaneously. Use proper thread safety caution. + private void function(IoOp*, int) handler; + private void function(IoOp*) closeHandler; + private void function(IoOp*) completeHandler; + private int internalFd; + private int operation; + private int bufferLengthAllocated; + private int bufferLengthUsed; + private ubyte[1] internalBuffer; // it can be overallocated! + + ubyte[] allocatedBuffer() { + return internalBuffer.ptr[0 .. bufferLengthAllocated]; + } + + ubyte[] usedBuffer() { + return allocatedBuffer[0 .. bufferLengthUsed]; + } + + void reset() { + bufferLengthUsed = 0; + } + + int fd() { + return internalFd; + } +} + +IoOp* allocateIoOp(int fd, int operation, int bufferSize, void function(IoOp*, int) handler) { + import core.stdc.stdlib; + + auto ptr = malloc(IoOp.sizeof + bufferSize); + if(ptr is null) + assert(0); // out of memory! + + auto op = cast(IoOp*) ptr; + + op.handler = handler; + op.internalFd = fd; + op.operation = operation; + op.bufferLengthAllocated = bufferSize; + op.bufferLengthUsed = 0; + + return op; +} + +void freeIoOp(ref IoOp* ptr) { + import core.stdc.stdlib; + free(ptr); + ptr = null; +} + +/// +struct SendableEvent { + int urlLength; + char[256] urlBuffer = 0; + int typeLength; + char[32] typeBuffer = 0; + int messageLength; + char[2048] messageBuffer = 0; + int _lifetime; + + char[] message() { + return messageBuffer[0 .. messageLength]; + } + char[] type() { + return typeBuffer[0 .. typeLength]; + } + char[] url() { + return urlBuffer[0 .. urlLength]; + } + int lifetime() { + return _lifetime; + } + + /// + void populate(string url, string type, string message, int lifetime) + in { + assert(url.length < this.urlBuffer.length); + assert(type.length < this.typeBuffer.length); + assert(message.length < this.messageBuffer.length); + } + do { + this.urlLength = cast(int) url.length; + this.typeLength = cast(int) type.length; + this.messageLength = cast(int) message.length; + this._lifetime = lifetime; + + this.urlBuffer[0 .. url.length] = url[]; + this.typeBuffer[0 .. type.length] = type[]; + this.messageBuffer[0 .. message.length] = message[]; + } +} + +struct EventConnection { + int fd; + bool needsChunking; +} + +private EventConnection[][string] eventConnectionsByUrl; + +private void handleInputEvent(scope SendableEvent* event) { + static int eventId; + + static struct StoredEvent { + int id; + string type; + string message; + int lifetimeRemaining; + } + + StoredEvent[][string] byUrl; + + int thisId = ++eventId; + + if(event.lifetime) + byUrl[event.url.idup] ~= StoredEvent(thisId, event.type.idup, event.message.idup, event.lifetime); + + auto connectionsPtr = event.url in eventConnectionsByUrl; + EventConnection[] connections; + if(connectionsPtr is null) + return; + else + connections = *connectionsPtr; + + char[4096] buffer; + char[] formattedMessage; + + void append(const char[] a) { + // the 6's here are to leave room for a HTTP chunk header, if it proves necessary + buffer[6 + formattedMessage.length .. 6 + formattedMessage.length + a.length] = a[]; + formattedMessage = buffer[6 .. 6 + formattedMessage.length + a.length]; + } + + /* + rawDataOutput(cast(const(ubyte)[]) toHex(t.length)); + rawDataOutput(cast(const(ubyte)[]) "\r\n"); + rawDataOutput(cast(const(ubyte)[]) t); + rawDataOutput(cast(const(ubyte)[]) "\r\n"); + */ + import std.algorithm.iteration; + + if(connections.length) { + append("id: "); + append(to!string(thisId)); + append("\n"); + + append("event: "); + append(event.type); + append("\n"); + + foreach(line; event.message.splitter("\n")) { + append("data: "); + append(line); + append("\n"); + } + + append("\n"); + } + + // chunk it for HTTP! + auto len = toHex(formattedMessage.length); + buffer[4 .. 6] = "\r\n"[]; + buffer[4 - len.length .. 4] = len[]; + + auto chunkedMessage = buffer[4 - len.length .. 6 + formattedMessage.length]; + // done + + // FIXME: send back requests when needed + // FIXME: send a single ":\n" every 15 seconds to keep alive + + foreach(connection; connections) { + if(connection.needsChunking) + nonBlockingWrite(connection.fd, chunkedMessage); + else + nonBlockingWrite(connection.fd, formattedMessage); + } +} + +void nonBlockingWrite(int connection, const void[] data) { + import core.sys.posix.unistd; + + auto ret = write(connection, data.ptr, data.length); + // FIXME: what if the file closed? + if(ret != data.length) { + if(ret == 0 || errno == EPIPE) { + // the file is closed, remove it + outer: foreach(url, ref connections; eventConnectionsByUrl) { + foreach(idx, conn; connections) { + if(conn.fd == connection) { + connections[idx] = connections[$-1]; + connections = connections[0 .. $ - 1]; + continue outer; + } + } + } + } else + throw new Exception("alas " ~ to!string(ret) ~ " " ~ to!string(errno)); // FIXME + } +} + +void runAddonServer()(string name) { + version(Posix) { + + import core.sys.posix.unistd; + import core.sys.posix.fcntl; + import core.sys.posix.sys.un; + + import core.sys.posix.signal; + signal(SIGPIPE, SIG_IGN); + + static void handleLocalConnectionData(IoOp* op, int receivedFd) { + if(receivedFd != -1) { + //writeln("GOT FD ", receivedFd, " -- ", op.usedBuffer); + + //core.sys.posix.unistd.write(receivedFd, "hello".ptr, 5); + + string url = (cast(char[]) op.usedBuffer[1 .. $]).idup; + eventConnectionsByUrl[url] ~= EventConnection(receivedFd, op.usedBuffer[0] > 0 ? true : false); + + // FIXME: catch up on past messages here + } else { + auto data = op.usedBuffer; + auto event = cast(SendableEvent*) data.ptr; + + handleInputEvent(event); + } + } + + static void handleLocalConnectionClose(IoOp* op) { + //writeln("CLOSED"); + } + static void handleLocalConnectionComplete(IoOp* op) { + //writeln("COMPLETED"); + } + int sock = socket(AF_UNIX, SOCK_STREAM, 0); + if(sock == -1) + throw new Exception("socket " ~ to!string(errno)); + + scope(failure) + close(sock); + + // add-on server processes are assumed to be local, and thus will + // use unix domain sockets. Besides, I want to pass sockets to them, + // so it basically must be local (except for the session server, but meh). + sockaddr_un addr; + addr.sun_family = AF_UNIX; + version(linux) { + // on linux, we will use the abstract namespace + addr.sun_path[0] = 0; + addr.sun_path[1 .. name.length + 1] = cast(typeof(addr.sun_path[])) name[]; + } else { + // but otherwise, just use a file cuz we must. + addr.sun_path[0 .. name.length] = cast(typeof(addr.sun_path[])) name[]; + } + + if(bind(sock, cast(sockaddr*) &addr, addr.sizeof) == -1) + throw new Exception("bind " ~ to!string(errno)); + + if(listen(sock, 128) == -1) + throw new Exception("listen " ~ to!string(errno)); + + version(linux) { + + makeNonBlocking(sock); + + import core.sys.linux.epoll; + auto epoll_fd = epoll_create1(EPOLL_CLOEXEC); + if(epoll_fd == -1) + throw new Exception("epoll_create1 " ~ to!string(errno)); + scope(failure) + close(epoll_fd); + + auto acceptOp = allocateIoOp(sock, IoOp.Read, 0, null); + scope(exit) + freeIoOp(acceptOp); + + epoll_event ev; + ev.events = EPOLLIN | EPOLLET; + ev.data.ptr = acceptOp; + if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock, &ev) == -1) + throw new Exception("epoll_ctl " ~ to!string(errno)); + + epoll_event[64] events; + + while(true) { + int timeout_milliseconds = -1; // infinite + //writeln("waiting for ", name); + auto nfds = epoll_wait(epoll_fd, events.ptr, events.length, timeout_milliseconds); + if(nfds == -1) { + if(errno == EINTR) + continue; + throw new Exception("epoll_wait " ~ to!string(errno)); + } + + foreach(idx; 0 .. nfds) { + auto flags = events[idx].events; + auto ioop = cast(IoOp*) events[idx].data.ptr; + + //writeln(flags, " ", ioop.fd); + + if(ioop.fd == sock && (flags & EPOLLIN)) { + // on edge triggering, it is important that we get it all + while(true) { + auto size = addr.sizeof; + auto ns = accept(sock, cast(sockaddr*) &addr, &size); + if(ns == -1) { + if(errno == EAGAIN || errno == EWOULDBLOCK) { + // all done, got it all + break; + } + throw new Exception("accept " ~ to!string(errno)); + } + + makeNonBlocking(ns); + epoll_event nev; + nev.events = EPOLLIN | EPOLLET; + auto niop = allocateIoOp(ns, IoOp.ReadSocketHandle, 4096, &handleLocalConnectionData); + niop.closeHandler = &handleLocalConnectionClose; + niop.completeHandler = &handleLocalConnectionComplete; + scope(failure) freeIoOp(niop); + nev.data.ptr = niop; + if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, ns, &nev) == -1) + throw new Exception("epoll_ctl " ~ to!string(errno)); + } + } else if(ioop.operation == IoOp.ReadSocketHandle) { + while(true) { + int in_fd; + auto got = read_fd(ioop.fd, ioop.allocatedBuffer.ptr, ioop.allocatedBuffer.length, &in_fd); + if(got == -1) { + if(errno == EAGAIN || errno == EWOULDBLOCK) { + // all done, got it all + if(ioop.completeHandler) + ioop.completeHandler(ioop); + break; + } + throw new Exception("recv " ~ to!string(errno)); + } + + if(got == 0) { + if(ioop.closeHandler) + ioop.closeHandler(ioop); + close(ioop.fd); + freeIoOp(ioop); + break; + } + + ioop.bufferLengthUsed = got; + ioop.handler(ioop, in_fd); + } + } else if(ioop.operation == IoOp.Read) { + while(true) { + auto got = recv(ioop.fd, ioop.allocatedBuffer.ptr, ioop.allocatedBuffer.length, 0); + if(got == -1) { + if(errno == EAGAIN || errno == EWOULDBLOCK) { + // all done, got it all + if(ioop.completeHandler) + ioop.completeHandler(ioop); + break; + } + throw new Exception("recv " ~ to!string(errno)); + } + + if(got == 0) { + if(ioop.closeHandler) + ioop.closeHandler(ioop); + close(ioop.fd); + freeIoOp(ioop); + break; + } + + ioop.bufferLengthUsed = got; + ioop.handler(ioop, -1); + } + } + + // EPOLLHUP? + } + } + } else { + // this isn't seriously implemented. + static assert(0); + } + + // then we need to run the event loop. a user-defined function may be called here to help + // the event loop needs to process the websocket messages + + + } else version(Windows) { + + // set up a named pipe + // https://msdn.microsoft.com/en-us/library/windows/desktop/ms724251(v=vs.85).aspx + // https://docs.microsoft.com/en-us/windows/desktop/api/winsock2/nf-winsock2-wsaduplicatesocketw + // https://docs.microsoft.com/en-us/windows/desktop/api/winbase/nf-winbase-getnamedpipeserverprocessid + + } else static assert(0); +} + + +version(Posix) +// copied from the web and ported from C +// see https://stackoverflow.com/questions/2358684/can-i-share-a-file-descriptor-to-another-process-on-linux-or-are-they-local-to-t +ssize_t write_fd(int fd, void *ptr, size_t nbytes, int sendfd) { + msghdr msg; + iovec[1] iov; + + union ControlUnion { + cmsghdr cm; + char[CMSG_SPACE(int.sizeof)] control; + } + + ControlUnion control_un; + cmsghdr* cmptr; + + msg.msg_control = control_un.control.ptr; + msg.msg_controllen = control_un.control.length; + + cmptr = CMSG_FIRSTHDR(&msg); + cmptr.cmsg_len = CMSG_LEN(int.sizeof); + cmptr.cmsg_level = SOL_SOCKET; + cmptr.cmsg_type = SCM_RIGHTS; + *(cast(int *) CMSG_DATA(cmptr)) = sendfd; + + msg.msg_name = null; + msg.msg_namelen = 0; + + iov[0].iov_base = ptr; + iov[0].iov_len = nbytes; + msg.msg_iov = iov.ptr; + msg.msg_iovlen = 1; + + return sendmsg(fd, &msg, 0); +} + +version(Posix) +// copied from the web and ported from C +ssize_t read_fd(int fd, void *ptr, size_t nbytes, int *recvfd) { + msghdr msg; + iovec[1] iov; + ssize_t n; + int newfd; + + union ControlUnion { + cmsghdr cm; + char[CMSG_SPACE(int.sizeof)] control; + } + ControlUnion control_un; + cmsghdr* cmptr; + + msg.msg_control = control_un.control.ptr; + msg.msg_controllen = control_un.control.length; + + msg.msg_name = null; + msg.msg_namelen = 0; + + iov[0].iov_base = ptr; + iov[0].iov_len = nbytes; + msg.msg_iov = iov.ptr; + msg.msg_iovlen = 1; + + if ( (n = recvmsg(fd, &msg, 0)) <= 0) + return n; + + if ( (cmptr = CMSG_FIRSTHDR(&msg)) != null && + cmptr.cmsg_len == CMSG_LEN(int.sizeof)) { + if (cmptr.cmsg_level != SOL_SOCKET) + throw new Exception("control level != SOL_SOCKET"); + if (cmptr.cmsg_type != SCM_RIGHTS) + throw new Exception("control type != SCM_RIGHTS"); + *recvfd = *(cast(int *) CMSG_DATA(cmptr)); + } else + *recvfd = -1; /* descriptor was not passed */ + + return n; +} +/* end read_fd */ + + +/* + Event source stuff + + The api is: + + sendEvent(string url, string type, string data, int timeout = 60*10); + + attachEventListener(string url, int fd, lastId) + + + It just sends to all attached listeners, and stores it until the timeout + for replaying via lastEventId. +*/ + +/* + Session process stuff + + it stores it all. the cgi object has a session object that can grab it + + session may be done in the same process if possible, there is a version + switch to choose if you want to override. +*/ + +/* +Copyright: Adam D. Ruppe, 2008 - 2019 License: Boost License 1.0. Authors: Adam D. Ruppe - Copyright Adam D. Ruppe 2008 - 2016. + Copyright Adam D. Ruppe 2008 - 2019. Distributed under the Boost Software License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)