diff --git a/cgi.d b/cgi.d index 4cab7c4..2c2086d 100644 --- a/cgi.d +++ b/cgi.d @@ -287,28 +287,28 @@ class Cgi { onRequestBodyDataReceived(amountReceived, originalContentLength); } - else { - // we have a custom data source.. - auto chunk = readdata(); - while(chunk.length) { - // FIXME: DRY - if(chunk.length > contentLength) { - handleIncomingDataChunk(chunk[0..contentLength]); - amountReceived += contentLength; - contentLength = 0; - break; - } else { - handleIncomingDataChunk(chunk); - contentLength -= chunk.length; - amountReceived += chunk.length; - } - if(contentLength == 0) - break; - - chunk = readdata(); - onRequestBodyDataReceived(amountReceived, originalContentLength); + else { + // we have a custom data source.. + auto chunk = readdata(); + while(chunk.length) { + // FIXME: DRY + if(chunk.length > contentLength) { + handleIncomingDataChunk(chunk[0..contentLength]); + amountReceived += contentLength; + contentLength = 0; + break; + } else { + handleIncomingDataChunk(chunk); + contentLength -= chunk.length; + amountReceived += chunk.length; } + if(contentLength == 0) + break; + + chunk = readdata(); + onRequestBodyDataReceived(amountReceived, originalContentLength); } + } onRequestBodyDataReceived(amountReceived, originalContentLength); postArray = assumeUnique(pps._post); @@ -903,7 +903,16 @@ class Cgi { } /// This represents a file the user uploaded via a POST request. - struct UploadedFile { + static struct UploadedFile { + /// If you want to create one of these structs for yourself from some data, + /// use this function. + static UploadedFile fromData(immutable(void)[] data) { + Cgi.UploadedFile f; + f.content = cast(immutable(ubyte)[]) data; + f.contentInMemory = true; + return f; + } + string name; /// The name of the form element. string filename; /// The filename the user set. string contentType; /// The MIME type the user's browser reported. (Not reliable.) @@ -1564,6 +1573,109 @@ version(embedded_httpd) return; } + version(scgi) { + import std.exception; + import al = std.algorithm; + auto manager = new ListeningConnectionManager(4000); + + // this threads... + foreach(connection; manager) { + // and now we can buffer + + int state = 0; + size_t size; + size_t contentLength; + + string[string] headers; + + auto range = new BufferedInputRange(connection); + more_data: + auto chunk = range.front(); + switch(state) { + default: assert(0); + case 0: // waiting for colon for header length + auto idx = al.indexOf(chunk, ':'); + if(idx == -1) { + range.popFront(); + goto more_data; + } + + size = to!size_t(cast(string) chunk[0 .. idx]); + chunk = range.consume(idx + 1); + state = 1; + goto case; + case 1: // reading headers + if(chunk.length < size) + range.popFront(0, size + 1); + // we are now guaranteed to have enough + chunk = range.front(); + assert(chunk.length > size); + + int idx = 0; + string key; + string value; + foreach(part; al.splitter(chunk, '\0')) { + if(idx & 1) { // odd is value + value = cast(string)(part.idup); + headers[key] = value; // commit + if(key == "CONTENT_LENGTH") + contentLength = to!int(value); + } else + key = cast(string)(part.idup); + idx++; + } + + enforce(chunk[size] == ','); // the terminator + + range.consume(size + 1); + state = 2; + goto case; + case 2: // reading data + // this will be done by Cgi + } + + + // if we are here, we're set. + + const(ubyte)[] getScgiChunk() { + // we are already primed + auto data = range.front(); + if(data.length == 0 && !range.sourceClosed) { + range.popFront(0); + data = range.front(); + } + + return data; + } + + void writeScgi(const(ubyte)[] data) { + sendAll(connection, data); + } + + void flushScgi() { + // I don't *think* I have to do anything.... + } + + auto cgi = new CustomCgi(5_000_000, headers, &getScgiChunk, &writeScgi, &flushScgi); + try { + fun(cgi); + cgi.close(); + cgi.dispose(); + } catch(Throwable t) { + // no std err + auto msg = "Status: 500 Internal Server Error\n"; + msg ~= "Content-Type: text/plain\n\n"; + debug msg ~= t.toString; + else msg ~= "An unexpected error has occurred."; + + sendAll(connection, msg); + connection.close(); + + // FIXME: what about cgi.dispose? + } + } + } + version(fastcgi) { FCGX_Stream* input, output, error; FCGX_ParamArray env; @@ -1763,6 +1875,194 @@ version(fastcgi) { } +/* This might go int a separate module eventually. It is a network input helper class. */ + +import std.socket; + +// it is a class primarily for reference semantics +class BufferedInputRange { + this(Socket source, ubyte[] buffer = null) { + this.source = source; + if(buffer is null) { + underlyingBuffer = new ubyte[4096]; + allowGrowth = true; + } else { + underlyingBuffer = buffer; + } + + assert(underlyingBuffer.length); + + // we assume view.ptr is always inside underlyingBuffer + view = underlyingBuffer[0 .. 0]; + + popFront(); // prime + } + + /** + A slight difference from regular ranges is you can give it the maximum + number of bytes to consume. + + IMPORTANT NOTE: the default is to consume nothing, so if you don't call + consume() yourself and use a regular foreach, it will infinitely loop! + + The default is to do what a normal range does, and consume the whole buffer + and wait for additional input. + + You can also specify 0, to append to the buffer, or any other number + to remove the front n bytes and wait for more. + */ + void popFront(size_t maxBytesToConsume = 0 /*size_t.max*/, size_t minBytesToSettleFor = 0) { + if(sourceClosed) + throw new Exception("can't get any more data from a closed source"); + consume(maxBytesToConsume); + + // we might have to grow the buffer + if(minBytesToSettleFor > underlyingBuffer.length || view.length == underlyingBuffer.length) { + if(allowGrowth) { + auto viewStart = view.ptr - underlyingBuffer.ptr; + auto growth = 4096; + // make sure we have enough for what we're being asked for + if(minBytesToSettleFor - underlyingBuffer.length > growth) + growth = minBytesToSettleFor - underlyingBuffer.length; + underlyingBuffer.length += growth; + view = underlyingBuffer[viewStart .. view.length]; + } else + throw new Exception("No room left in the buffer"); + } + + do { + auto freeSpace = underlyingBuffer[underlyingBuffer.ptr - view.ptr + view.length .. $]; + + auto ret = source.receive(freeSpace); + if(ret == Socket.ERROR) + throw new Exception("uh oh"); // FIXME + if(ret == 0) { + sourceClosed = true; + return; + } + + view = underlyingBuffer[underlyingBuffer.ptr - view.ptr .. view.length + ret]; + } while(view.length < minBytesToSettleFor); + } + + /// Removes n bytes from the front of the buffer, and returns the new buffer slice. + /// You might want to idup the data you are consuming if you store it, since it may + /// be overwritten on the new popFront. + /// + /// You do not need to call this if you always want to wait for more data when you + /// consume some. + ubyte[] consume(size_t bytes) { + view = view[bytes > $ ? $ : bytes .. $]; + if(view.length == 0) + view = underlyingBuffer[0 .. 0]; // go ahead and reuse the beginning + return front; + } + + bool empty() { + return sourceClosed && view.length == 0; + } + + ubyte[] front() { + return view; + } + + invariant() { + assert(view.ptr >= underlyingBuffer.ptr); + // it should never be equal, since if that happens view ought to be empty, and thus reusing the buffer + assert(view.ptr < underlyingBuffer.ptr + underlyingBuffer.length); + } + + ubyte[] underlyingBuffer; + bool allowGrowth; + ubyte[] view; + Socket source; + bool sourceClosed; +} + +/** + To use this thing: + + auto manager = new ListeningConnectionManager(80); + foreach(connection; manager) { + // work with connection + // note: each connection may get its own thread, so this is a kind of concurrent foreach. + + // this can have implications if you access local variables in the function, as they are + // implicitly shared! + + // FIXME: break does not work + } + + + I suggest you use BufferedInputRange(connection) to handle the input. As a packet + comes in, you will get control. You can just continue; though to fetch more. + + + FIXME: should I offer an event based async thing like netman did too? Yeah, probably. +*/ +class ListeningConnectionManager { + this(ushort port) { + listener = new TcpSocket(); + listener.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true); + listener.bind(new InternetAddress(port)); + listener.listen(10); + } + + Socket listener; + + int opApply(CMT dg) { + shared(int) broken; + + while(!broken) { + auto sn = listener.accept(); + auto thread = new ConnectionThread(sn, &broken, dg); + thread.start(); + } + + return broken; + } +} + +/// helper function to send a lot to a socket. Since this blocks for the buffer (possibly several times), you should probably call it in a separate thread or something. +void sendAll(Socket s, const(void)[] data) { + if(data.length == 0) return; + ptrdiff_t amount; + do { + amount = s.send(data); + data = data[amount .. $]; + } while(data.length); +} + +alias int delegate(Socket) CMT; + +import core.thread; +class ConnectionThread : Thread { + this(Socket s, shared(int)* breakSignifier, CMT dg) { + this.s = s; + this.breakSignifier = breakSignifier; + this.dg = dg; + super(&run); + } + + void run() { + scope(exit) { + // I don't want to double close it, and it does this on close() according to source + // might be fragile, but meh + if(s.handle() != socket_t.init) + s.close(); + } + if(auto result = dg(s)) { + *breakSignifier = result; + } + } + + Socket s; + shared(int)* breakSignifier; + CMT dg; +} + +/* Done with network helper */ + /* Helpers for doing temporary files. Used both here and in web.d */ version(Windows) {