SCGI support and associated network helpers

This commit is contained in:
Adam D. Ruppe 2012-03-23 23:57:27 -04:00
parent 5f5c157ded
commit ea00b0db7a
1 changed files with 321 additions and 21 deletions

342
cgi.d
View File

@ -287,28 +287,28 @@ class Cgi {
onRequestBodyDataReceived(amountReceived, originalContentLength);
}
else {
// we have a custom data source..
auto chunk = readdata();
while(chunk.length) {
// FIXME: DRY
if(chunk.length > contentLength) {
handleIncomingDataChunk(chunk[0..contentLength]);
amountReceived += contentLength;
contentLength = 0;
break;
} else {
handleIncomingDataChunk(chunk);
contentLength -= chunk.length;
amountReceived += chunk.length;
}
if(contentLength == 0)
break;
chunk = readdata();
onRequestBodyDataReceived(amountReceived, originalContentLength);
else {
// we have a custom data source..
auto chunk = readdata();
while(chunk.length) {
// FIXME: DRY
if(chunk.length > contentLength) {
handleIncomingDataChunk(chunk[0..contentLength]);
amountReceived += contentLength;
contentLength = 0;
break;
} else {
handleIncomingDataChunk(chunk);
contentLength -= chunk.length;
amountReceived += chunk.length;
}
if(contentLength == 0)
break;
chunk = readdata();
onRequestBodyDataReceived(amountReceived, originalContentLength);
}
}
onRequestBodyDataReceived(amountReceived, originalContentLength);
postArray = assumeUnique(pps._post);
@ -903,7 +903,16 @@ class Cgi {
}
/// This represents a file the user uploaded via a POST request.
struct UploadedFile {
static struct UploadedFile {
/// If you want to create one of these structs for yourself from some data,
/// use this function.
static UploadedFile fromData(immutable(void)[] data) {
Cgi.UploadedFile f;
f.content = cast(immutable(ubyte)[]) data;
f.contentInMemory = true;
return f;
}
string name; /// The name of the form element.
string filename; /// The filename the user set.
string contentType; /// The MIME type the user's browser reported. (Not reliable.)
@ -1564,6 +1573,109 @@ version(embedded_httpd)
return;
}
version(scgi) {
import std.exception;
import al = std.algorithm;
auto manager = new ListeningConnectionManager(4000);
// this threads...
foreach(connection; manager) {
// and now we can buffer
int state = 0;
size_t size;
size_t contentLength;
string[string] headers;
auto range = new BufferedInputRange(connection);
more_data:
auto chunk = range.front();
switch(state) {
default: assert(0);
case 0: // waiting for colon for header length
auto idx = al.indexOf(chunk, ':');
if(idx == -1) {
range.popFront();
goto more_data;
}
size = to!size_t(cast(string) chunk[0 .. idx]);
chunk = range.consume(idx + 1);
state = 1;
goto case;
case 1: // reading headers
if(chunk.length < size)
range.popFront(0, size + 1);
// we are now guaranteed to have enough
chunk = range.front();
assert(chunk.length > size);
int idx = 0;
string key;
string value;
foreach(part; al.splitter(chunk, '\0')) {
if(idx & 1) { // odd is value
value = cast(string)(part.idup);
headers[key] = value; // commit
if(key == "CONTENT_LENGTH")
contentLength = to!int(value);
} else
key = cast(string)(part.idup);
idx++;
}
enforce(chunk[size] == ','); // the terminator
range.consume(size + 1);
state = 2;
goto case;
case 2: // reading data
// this will be done by Cgi
}
// if we are here, we're set.
const(ubyte)[] getScgiChunk() {
// we are already primed
auto data = range.front();
if(data.length == 0 && !range.sourceClosed) {
range.popFront(0);
data = range.front();
}
return data;
}
void writeScgi(const(ubyte)[] data) {
sendAll(connection, data);
}
void flushScgi() {
// I don't *think* I have to do anything....
}
auto cgi = new CustomCgi(5_000_000, headers, &getScgiChunk, &writeScgi, &flushScgi);
try {
fun(cgi);
cgi.close();
cgi.dispose();
} catch(Throwable t) {
// no std err
auto msg = "Status: 500 Internal Server Error\n";
msg ~= "Content-Type: text/plain\n\n";
debug msg ~= t.toString;
else msg ~= "An unexpected error has occurred.";
sendAll(connection, msg);
connection.close();
// FIXME: what about cgi.dispose?
}
}
}
version(fastcgi) {
FCGX_Stream* input, output, error;
FCGX_ParamArray env;
@ -1763,6 +1875,194 @@ version(fastcgi) {
}
/* This might go int a separate module eventually. It is a network input helper class. */
import std.socket;
// it is a class primarily for reference semantics
class BufferedInputRange {
this(Socket source, ubyte[] buffer = null) {
this.source = source;
if(buffer is null) {
underlyingBuffer = new ubyte[4096];
allowGrowth = true;
} else {
underlyingBuffer = buffer;
}
assert(underlyingBuffer.length);
// we assume view.ptr is always inside underlyingBuffer
view = underlyingBuffer[0 .. 0];
popFront(); // prime
}
/**
A slight difference from regular ranges is you can give it the maximum
number of bytes to consume.
IMPORTANT NOTE: the default is to consume nothing, so if you don't call
consume() yourself and use a regular foreach, it will infinitely loop!
The default is to do what a normal range does, and consume the whole buffer
and wait for additional input.
You can also specify 0, to append to the buffer, or any other number
to remove the front n bytes and wait for more.
*/
void popFront(size_t maxBytesToConsume = 0 /*size_t.max*/, size_t minBytesToSettleFor = 0) {
if(sourceClosed)
throw new Exception("can't get any more data from a closed source");
consume(maxBytesToConsume);
// we might have to grow the buffer
if(minBytesToSettleFor > underlyingBuffer.length || view.length == underlyingBuffer.length) {
if(allowGrowth) {
auto viewStart = view.ptr - underlyingBuffer.ptr;
auto growth = 4096;
// make sure we have enough for what we're being asked for
if(minBytesToSettleFor - underlyingBuffer.length > growth)
growth = minBytesToSettleFor - underlyingBuffer.length;
underlyingBuffer.length += growth;
view = underlyingBuffer[viewStart .. view.length];
} else
throw new Exception("No room left in the buffer");
}
do {
auto freeSpace = underlyingBuffer[underlyingBuffer.ptr - view.ptr + view.length .. $];
auto ret = source.receive(freeSpace);
if(ret == Socket.ERROR)
throw new Exception("uh oh"); // FIXME
if(ret == 0) {
sourceClosed = true;
return;
}
view = underlyingBuffer[underlyingBuffer.ptr - view.ptr .. view.length + ret];
} while(view.length < minBytesToSettleFor);
}
/// Removes n bytes from the front of the buffer, and returns the new buffer slice.
/// You might want to idup the data you are consuming if you store it, since it may
/// be overwritten on the new popFront.
///
/// You do not need to call this if you always want to wait for more data when you
/// consume some.
ubyte[] consume(size_t bytes) {
view = view[bytes > $ ? $ : bytes .. $];
if(view.length == 0)
view = underlyingBuffer[0 .. 0]; // go ahead and reuse the beginning
return front;
}
bool empty() {
return sourceClosed && view.length == 0;
}
ubyte[] front() {
return view;
}
invariant() {
assert(view.ptr >= underlyingBuffer.ptr);
// it should never be equal, since if that happens view ought to be empty, and thus reusing the buffer
assert(view.ptr < underlyingBuffer.ptr + underlyingBuffer.length);
}
ubyte[] underlyingBuffer;
bool allowGrowth;
ubyte[] view;
Socket source;
bool sourceClosed;
}
/**
To use this thing:
auto manager = new ListeningConnectionManager(80);
foreach(connection; manager) {
// work with connection
// note: each connection may get its own thread, so this is a kind of concurrent foreach.
// this can have implications if you access local variables in the function, as they are
// implicitly shared!
// FIXME: break does not work
}
I suggest you use BufferedInputRange(connection) to handle the input. As a packet
comes in, you will get control. You can just continue; though to fetch more.
FIXME: should I offer an event based async thing like netman did too? Yeah, probably.
*/
class ListeningConnectionManager {
this(ushort port) {
listener = new TcpSocket();
listener.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true);
listener.bind(new InternetAddress(port));
listener.listen(10);
}
Socket listener;
int opApply(CMT dg) {
shared(int) broken;
while(!broken) {
auto sn = listener.accept();
auto thread = new ConnectionThread(sn, &broken, dg);
thread.start();
}
return broken;
}
}
/// helper function to send a lot to a socket. Since this blocks for the buffer (possibly several times), you should probably call it in a separate thread or something.
void sendAll(Socket s, const(void)[] data) {
if(data.length == 0) return;
ptrdiff_t amount;
do {
amount = s.send(data);
data = data[amount .. $];
} while(data.length);
}
alias int delegate(Socket) CMT;
import core.thread;
class ConnectionThread : Thread {
this(Socket s, shared(int)* breakSignifier, CMT dg) {
this.s = s;
this.breakSignifier = breakSignifier;
this.dg = dg;
super(&run);
}
void run() {
scope(exit) {
// I don't want to double close it, and it does this on close() according to source
// might be fragile, but meh
if(s.handle() != socket_t.init)
s.close();
}
if(auto result = dg(s)) {
*breakSignifier = result;
}
}
Socket s;
shared(int)* breakSignifier;
CMT dg;
}
/* Done with network helper */
/* Helpers for doing temporary files. Used both here and in web.d */
version(Windows) {