From ea00b0db7ac00b9a34f181c1a89d9cf82896de39 Mon Sep 17 00:00:00 2001
From: "Adam D. Ruppe" <destructionator@gmail.com>
Date: Fri, 23 Mar 2012 23:57:27 -0400
Subject: [PATCH] SCGI support and associated network helpers

---
 cgi.d | 342 ++++++++++++++++++++++++++++++++++++++++++++++++++++++----
 1 file changed, 321 insertions(+), 21 deletions(-)

diff --git a/cgi.d b/cgi.d
index 4cab7c4..2c2086d 100644
--- a/cgi.d
+++ b/cgi.d
@@ -287,28 +287,28 @@ class Cgi {
 
 					onRequestBodyDataReceived(amountReceived, originalContentLength);
 				}
-				else {
-					// we have a custom data source..
-					auto chunk = readdata();
-					while(chunk.length) {
-						// FIXME: DRY
-						if(chunk.length > contentLength) {
-							handleIncomingDataChunk(chunk[0..contentLength]);
-							amountReceived += contentLength;
-							contentLength = 0;
-							break;
-						} else {
-							handleIncomingDataChunk(chunk);
-							contentLength -= chunk.length;
-							amountReceived += chunk.length;
-						}
-						if(contentLength == 0)
-							break;
-
-						chunk = readdata();
-						onRequestBodyDataReceived(amountReceived, originalContentLength);
+			else {
+				// we have a custom data source..
+				auto chunk = readdata();
+				while(chunk.length) {
+					// FIXME: DRY
+					if(chunk.length > contentLength) {
+						handleIncomingDataChunk(chunk[0..contentLength]);
+						amountReceived += contentLength;
+						contentLength = 0;
+						break;
+					} else {
+						handleIncomingDataChunk(chunk);
+						contentLength -= chunk.length;
+						amountReceived += chunk.length;
 					}
+					if(contentLength == 0)
+						break;
+
+					chunk = readdata();
+					onRequestBodyDataReceived(amountReceived, originalContentLength);
 				}
+			}
 
 				onRequestBodyDataReceived(amountReceived, originalContentLength);
 				postArray = assumeUnique(pps._post);
@@ -903,7 +903,16 @@ class Cgi {
 	}
 
 	/// This represents a file the user uploaded via a POST request.
-	struct UploadedFile {
+	static struct UploadedFile {
+		/// If you want to create one of these structs for yourself from some data,
+		/// use this function.
+		static UploadedFile fromData(immutable(void)[] data) {
+			Cgi.UploadedFile f;
+			f.content = cast(immutable(ubyte)[]) data;
+			f.contentInMemory = true;
+			return f;
+		}
+
 		string name; 		/// The name of the form element.
 		string filename; 	/// The filename the user set.
 		string contentType; 	/// The MIME type the user's browser reported. (Not reliable.)
@@ -1564,6 +1573,109 @@ version(embedded_httpd)
 			return;
 		}
 
+		version(scgi) {
+			import std.exception;
+			import al = std.algorithm;
+			auto manager = new ListeningConnectionManager(4000);
+
+			// this threads...
+			foreach(connection; manager) {
+				// and now we can buffer
+
+				int state = 0;
+				size_t size;
+				size_t contentLength;
+
+				string[string] headers;
+
+				auto range = new BufferedInputRange(connection);
+				more_data:
+				auto chunk = range.front();
+				switch(state) {
+					default: assert(0);
+					case 0: // waiting for colon for header length
+						auto idx = al.indexOf(chunk, ':');
+						if(idx == -1) {
+							range.popFront();
+							goto more_data;
+						}
+
+						size = to!size_t(cast(string) chunk[0 .. idx]);
+						chunk = range.consume(idx + 1);
+						state = 1;
+						goto case;
+					case 1: // reading headers
+						if(chunk.length < size)
+							range.popFront(0, size + 1);
+						// we are now guaranteed to have enough
+						chunk = range.front();
+						assert(chunk.length > size);
+
+						int idx = 0;
+						string key;
+						string value;
+						foreach(part; al.splitter(chunk, '\0')) {
+							if(idx & 1) { // odd is value
+								value = cast(string)(part.idup);
+								headers[key] = value; // commit
+								if(key == "CONTENT_LENGTH")
+									contentLength = to!int(value);
+							} else
+								key = cast(string)(part.idup);
+							idx++;
+						}
+
+						enforce(chunk[size] == ','); // the terminator
+
+						range.consume(size + 1);
+						state = 2;
+						goto case;
+					case 2:	// reading data
+						// this will be done by Cgi
+				}
+
+
+				// if we are here, we're set.
+
+				const(ubyte)[] getScgiChunk() {
+					// we are already primed
+					auto data = range.front();
+					if(data.length == 0 && !range.sourceClosed) {
+						range.popFront(0);
+						data = range.front();
+					}
+
+					return data;
+				}
+
+				void writeScgi(const(ubyte)[] data) {
+					sendAll(connection, data);
+				}
+
+				void flushScgi() {
+					// I don't *think* I have to do anything....
+				}
+
+				auto cgi = new CustomCgi(5_000_000, headers, &getScgiChunk, &writeScgi, &flushScgi);
+				try {
+					fun(cgi);
+					cgi.close();
+					cgi.dispose();
+				} catch(Throwable t) {
+					// no std err
+					auto msg = "Status: 500 Internal Server Error\n";
+					msg ~= "Content-Type: text/plain\n\n";
+					debug msg ~= t.toString;
+					else  msg ~= "An unexpected error has occurred.";
+
+					sendAll(connection, msg);
+					connection.close();
+
+					// FIXME: what about cgi.dispose?
+				}
+			}
+		}
+
 		version(fastcgi) {
 			FCGX_Stream* input, output, error;
 			FCGX_ParamArray env;
@@ -1763,6 +1875,194 @@ version(fastcgi) {
 }
 
 
+/* This might go int a separate module eventually. It is a network input helper class. */
+
+import std.socket;
+
+// it is a class primarily for reference semantics
+class BufferedInputRange {
+	this(Socket source, ubyte[] buffer = null) {
+		this.source = source;
+		if(buffer is null) {
+			underlyingBuffer = new ubyte[4096];
+			allowGrowth = true;
+		} else {
+			underlyingBuffer = buffer;
+		}
+
+		assert(underlyingBuffer.length);
+
+		// we assume view.ptr is always inside underlyingBuffer
+		view = underlyingBuffer[0 .. 0];
+
+		popFront(); // prime
+	}
+
+	/**
+		A slight difference from regular ranges is you can give it the maximum
+		number of bytes to consume.
+
+		IMPORTANT NOTE: the default is to consume nothing, so if you don't call
+		consume() yourself and use a regular foreach, it will infinitely loop!
+
+		The default is to do what a normal range does, and consume the whole buffer
+		and wait for additional input.
+
+		You can also specify 0, to append to the buffer, or any other number
+		to remove the front n bytes and wait for more.
+	*/
+	void popFront(size_t maxBytesToConsume = 0 /*size_t.max*/, size_t minBytesToSettleFor = 0) {
+		if(sourceClosed)
+			throw new Exception("can't get any more data from a closed source");
+		consume(maxBytesToConsume);
+
+		// we might have to grow the buffer
+		if(minBytesToSettleFor > underlyingBuffer.length || view.length == underlyingBuffer.length) {
+			if(allowGrowth) {
+				auto viewStart = view.ptr - underlyingBuffer.ptr;
+				auto growth = 4096;
+				// make sure we have enough for what we're being asked for
+				if(minBytesToSettleFor - underlyingBuffer.length > growth)
+					growth = minBytesToSettleFor - underlyingBuffer.length;
+				underlyingBuffer.length += growth;
+				view = underlyingBuffer[viewStart .. view.length];
+			} else
+				throw new Exception("No room left in the buffer");
+		}
+
+		do {
+			auto freeSpace = underlyingBuffer[underlyingBuffer.ptr - view.ptr + view.length .. $];
+
+			auto ret = source.receive(freeSpace);
+			if(ret == Socket.ERROR)
+				throw new Exception("uh oh"); // FIXME
+			if(ret == 0) {
+				sourceClosed = true;
+				return;
+			}
+
+			view = underlyingBuffer[underlyingBuffer.ptr - view.ptr .. view.length + ret];
+		} while(view.length < minBytesToSettleFor);
+	}
+
+	/// Removes n bytes from the front of the buffer, and returns the new buffer slice.
+	/// You might want to idup the data you are consuming if you store it, since it may
+	/// be overwritten on the new popFront.
+	///
+	/// You do not need to call this if you always want to wait for more data when you
+	/// consume some.
+	ubyte[] consume(size_t bytes) {
+		view = view[bytes > $ ? $ : bytes .. $];
+		if(view.length == 0)
+			view = underlyingBuffer[0 .. 0]; // go ahead and reuse the beginning
+		return front;
+	}
+
+	bool empty() {
+		return sourceClosed && view.length == 0;
+	}
+
+	ubyte[] front() {
+		return view;
+	}
+
+	invariant() {
+		assert(view.ptr >= underlyingBuffer.ptr);
+		// it should never be equal, since if that happens view ought to be empty, and thus reusing the buffer
+		assert(view.ptr < underlyingBuffer.ptr + underlyingBuffer.length);
+	}
+
+	ubyte[] underlyingBuffer;
+	bool allowGrowth;
+	ubyte[] view;
+	Socket source;
+	bool sourceClosed;
+}
+
+/**
+	To use this thing:
+
+	auto manager = new ListeningConnectionManager(80);
+	foreach(connection; manager) {
+		// work with connection
+		// note: each connection may get its own thread, so this is a kind of concurrent foreach.
+
+		// this can have implications if you access local variables in the function, as they are
+		// implicitly shared!
+
+		// FIXME: break does not work
+	}
+
+
+	I suggest you use BufferedInputRange(connection) to handle the input. As a packet
+	comes in, you will get control. You can just continue; though to fetch more.
+
+
+	FIXME: should I offer an event based async thing like netman did too? Yeah, probably.
+*/
+class ListeningConnectionManager {
+	this(ushort port) {
+		listener = new TcpSocket();
+		listener.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true);
+		listener.bind(new InternetAddress(port));
+		listener.listen(10);
+	}
+
+	Socket listener;
+
+	int opApply(CMT dg) {
+		shared(int) broken;
+
+		while(!broken) {
+			auto sn = listener.accept();
+			auto thread = new ConnectionThread(sn, &broken, dg);
+			thread.start();
+		}
+
+		return broken;
+	}
+}
+
+/// helper function to send a lot to a socket. Since this blocks for the buffer (possibly several times), you should probably call it in a separate thread or something.
+void sendAll(Socket s, const(void)[] data) {
+	if(data.length == 0) return;
+	ptrdiff_t amount;
+	do {
+		amount = s.send(data);
+		data = data[amount .. $];
+	} while(data.length);
+}
+
+alias int delegate(Socket) CMT;
+
+import core.thread;
+class ConnectionThread : Thread {
+	this(Socket s, shared(int)* breakSignifier, CMT dg) {
+		this.s = s;
+	 	this.breakSignifier = breakSignifier;
+		this.dg = dg;
+		super(&run);
+	}
+
+	void run() {
+		scope(exit) {
+			// I don't want to double close it, and it does this on close() according to source
+			// might be fragile, but meh
+			if(s.handle() != socket_t.init)
+				s.close();
+		}
+		if(auto result = dg(s)) {
+			*breakSignifier = result;
+		}
+	}
+
+	Socket s;
+	shared(int)* breakSignifier;
+	CMT dg;
+}
+
+/* Done with network helper */
+
 /* Helpers for doing temporary files. Used both here and in web.d */
 
 version(Windows) {