Windows hybrid server support

This commit is contained in:
Adam D. Ruppe 2021-12-22 18:46:29 -05:00
parent 9b52121b32
commit 2fe3355560
1 changed files with 416 additions and 98 deletions

370
cgi.d
View File

@ -397,7 +397,7 @@ void cloexec(Socket s) {
version(embedded_httpd_hybrid) { version(embedded_httpd_hybrid) {
version=embedded_httpd_threads; version=embedded_httpd_threads;
version(cgi_no_fork) {} else version(cgi_no_fork) {} else version(Posix)
version=cgi_use_fork; version=cgi_use_fork;
version=cgi_use_fiber; version=cgi_use_fiber;
} }
@ -4195,10 +4195,239 @@ class CgiFiber : Fiber {
} }
} }
version(cgi_use_fiber)
version(Windows) {
extern(Windows) private {
import core.sys.windows.mswsock;
alias GROUP=uint;
alias LPWSAPROTOCOL_INFOW = void*;
SOCKET WSASocketW(int af, int type, int protocol, LPWSAPROTOCOL_INFOW lpProtocolInfo, GROUP g, DWORD dwFlags);
int WSASend(SOCKET s, LPWSABUF lpBuffers, DWORD dwBufferCount, LPDWORD lpNumberOfBytesSent, DWORD dwFlags, LPWSAOVERLAPPED lpOverlapped, LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine);
int WSARecv(SOCKET s, LPWSABUF lpBuffers, DWORD dwBufferCount, LPDWORD lpNumberOfBytesRecvd, LPDWORD lpFlags, LPWSAOVERLAPPED lpOverlapped, LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine);
struct WSABUF {
ULONG len;
CHAR *buf;
}
alias LPWSABUF = WSABUF*;
alias WSAOVERLAPPED = OVERLAPPED;
alias LPWSAOVERLAPPED = LPOVERLAPPED;
/+
alias LPFN_ACCEPTEX =
BOOL
function(
SOCKET sListenSocket,
SOCKET sAcceptSocket,
//_Out_writes_bytes_(dwReceiveDataLength+dwLocalAddressLength+dwRemoteAddressLength) PVOID lpOutputBuffer,
void* lpOutputBuffer,
WORD dwReceiveDataLength,
WORD dwLocalAddressLength,
WORD dwRemoteAddressLength,
LPDWORD lpdwBytesReceived,
LPOVERLAPPED lpOverlapped
);
enum WSAID_ACCEPTEX = GUID([0xb5367df1,0xcbac,0x11cf,[0x95,0xca,0x00,0x80,0x5f,0x48,0xa1,0x92]]);
+/
enum WSAID_GETACCEPTEXSOCKADDRS = GUID(0xb5367df2,0xcbac,0x11cf,[0x95,0xca,0x00,0x80,0x5f,0x48,0xa1,0x92]);
}
private class PseudoblockingOverlappedSocket : Socket {
SOCKET handle;
CgiFiber fiber;
this(AddressFamily af, SocketType st) {
auto handle = WSASocketW(af, st, 0, null, 0, 1 /*WSA_FLAG_OVERLAPPED*/);
if(!handle)
throw new Exception("WSASocketW");
this.handle = handle;
iocp = CreateIoCompletionPort(cast(HANDLE) handle, iocp, cast(ULONG_PTR) cast(void*) this, 0);
if(iocp is null) {
writeln(GetLastError());
throw new Exception("CreateIoCompletionPort");
}
super(cast(socket_t) handle, af);
}
this() pure nothrow @trusted { assert(0); }
override void blocking(bool) {} // meaningless to us, just ignore it.
protected override Socket accepting() pure nothrow {
assert(0);
}
bool addressesParsed;
Address la;
Address ra;
private void populateAddresses() {
if(addressesParsed)
return;
addressesParsed = true;
int lalen, ralen;
sockaddr_in* la;
sockaddr_in* ra;
lpfnGetAcceptExSockaddrs(
scratchBuffer.ptr,
0, // same as in the AcceptEx call!
sockaddr_in.sizeof + 16,
sockaddr_in.sizeof + 16,
cast(sockaddr**) &la,
&lalen,
cast(sockaddr**) &ra,
&ralen
);
if(la)
this.la = new InternetAddress(*la);
if(ra)
this.ra = new InternetAddress(*ra);
}
override @property @trusted Address localAddress() {
populateAddresses();
return la;
}
override @property @trusted Address remoteAddress() {
populateAddresses();
return ra;
}
PseudoblockingOverlappedSocket accepted;
__gshared static LPFN_ACCEPTEX lpfnAcceptEx;
__gshared static typeof(&GetAcceptExSockaddrs) lpfnGetAcceptExSockaddrs;
override Socket accept() @trusted {
__gshared static LPFN_ACCEPTEX lpfnAcceptEx;
if(lpfnAcceptEx is null) {
DWORD dwBytes;
GUID GuidAcceptEx = WSAID_ACCEPTEX;
auto iResult = WSAIoctl(handle, 0xc8000006 /*SIO_GET_EXTENSION_FUNCTION_POINTER*/,
&GuidAcceptEx, GuidAcceptEx.sizeof,
&lpfnAcceptEx, lpfnAcceptEx.sizeof,
&dwBytes, null, null);
GuidAcceptEx = WSAID_GETACCEPTEXSOCKADDRS;
iResult = WSAIoctl(handle, 0xc8000006 /*SIO_GET_EXTENSION_FUNCTION_POINTER*/,
&GuidAcceptEx, GuidAcceptEx.sizeof,
&lpfnGetAcceptExSockaddrs, lpfnGetAcceptExSockaddrs.sizeof,
&dwBytes, null, null);
}
auto pfa = new PseudoblockingOverlappedSocket(AddressFamily.INET, SocketType.STREAM);
accepted = pfa;
SOCKET pendingForAccept = pfa.handle;
DWORD ignored;
auto ret = lpfnAcceptEx(handle,
pendingForAccept,
// buffer to receive up front
pfa.scratchBuffer.ptr,
0,
// size of local and remote addresses. normally + 16.
sockaddr_in.sizeof + 16,
sockaddr_in.sizeof + 16,
&ignored, // bytes would be given through the iocp instead but im not even requesting the thing
&overlapped
);
return pfa;
}
override void connect(Address to) { assert(0); }
DWORD lastAnswer;
ubyte[1024] scratchBuffer;
static assert(scratchBuffer.length > sockaddr_in.sizeof * 2 + 32);
WSABUF[1] buffer;
OVERLAPPED overlapped;
override ptrdiff_t send(const(void)[] buf, SocketFlags flags) @trusted {
overlapped = overlapped.init;
buffer[0].len = cast(DWORD) buf.length;
buffer[0].buf = cast(CHAR*) buf.ptr;
fiber.setPostYield( () {
if(!WSASend(handle, buffer.ptr, cast(DWORD) buffer.length, null, 0, &overlapped, null)) {
if(GetLastError() != 997) {
//throw new Exception("WSASend fail");
}
}
});
Fiber.yield();
return lastAnswer;
}
override ptrdiff_t receive(void[] buf, SocketFlags flags) @trusted {
overlapped = overlapped.init;
buffer[0].len = cast(DWORD) buf.length;
buffer[0].buf = cast(CHAR*) buf.ptr;
DWORD flags2 = 0;
fiber.setPostYield(() {
if(!WSARecv(handle, buffer.ptr, cast(DWORD) buffer.length, null, &flags2 /* flags */, &overlapped, null)) {
if(GetLastError() != 997) {
//writeln("WSARecv ", WSAGetLastError());
//throw new Exception("WSARecv fail");
}
}
});
Fiber.yield();
return lastAnswer;
}
// I might go back and implement these for udp things.
override ptrdiff_t receiveFrom(void[] buf, SocketFlags flags, ref Address from) @trusted {
assert(0);
}
override ptrdiff_t receiveFrom(void[] buf, SocketFlags flags) @trusted {
assert(0);
}
override ptrdiff_t sendTo(const(void)[] buf, SocketFlags flags, Address to) @trusted {
assert(0);
}
override ptrdiff_t sendTo(const(void)[] buf, SocketFlags flags) @trusted {
assert(0);
}
// lol overload sets
alias send = typeof(super).send;
alias receive = typeof(super).receive;
alias sendTo = typeof(super).sendTo;
alias receiveFrom = typeof(super).receiveFrom;
}
}
void doThreadHttpConnection(CustomCgi, alias fun)(Socket connection) { void doThreadHttpConnection(CustomCgi, alias fun)(Socket connection) {
assert(connection !is null); assert(connection !is null);
version(cgi_use_fiber) { version(cgi_use_fiber) {
auto fiber = new CgiFiber(&doThreadHttpConnectionGuts!(CustomCgi, fun)); auto fiber = new CgiFiber(&doThreadHttpConnectionGuts!(CustomCgi, fun));
version(Windows) {
(cast(PseudoblockingOverlappedSocket) connection).fiber = fiber;
}
import core.memory; import core.memory;
GC.addRoot(cast(void*) fiber); GC.addRoot(cast(void*) fiber);
fiber.connection = connection; fiber.connection = connection;
@ -4558,17 +4787,29 @@ import std.socket;
version(cgi_use_fiber) { version(cgi_use_fiber) {
import core.thread; import core.thread;
version(linux) {
import core.sys.linux.epoll; import core.sys.linux.epoll;
__gshared int epfd; int epfd = -1; // thread local because EPOLLEXCLUSIVE works much better this way... weirdly.
} else version(Windows) {
__gshared HANDLE iocp;
} else static assert(0, "The hybrid fiber server is not implemented on your OS.");
} }
version(cgi_use_fiber) version(cgi_use_fiber) {
version(linux)
private enum WakeupEvent { private enum WakeupEvent {
Read = EPOLLIN, Read = EPOLLIN,
Write = EPOLLOUT Write = EPOLLOUT
} }
else version(Windows)
private enum WakeupEvent {
Read, Write
}
else static assert(0);
}
version(cgi_use_fiber) version(cgi_use_fiber)
private void registerEventWakeup(bool* registered, Socket source, WakeupEvent e) @nogc { private void registerEventWakeup(bool* registered, Socket source, WakeupEvent e) @nogc {
@ -4576,6 +4817,7 @@ private void registerEventWakeup(bool* registered, Socket source, WakeupEvent e)
// static cast since I know what i have in here and don't want to pay for dynamic cast // static cast since I know what i have in here and don't want to pay for dynamic cast
auto f = cast(CgiFiber) cast(void*) Fiber.getThis(); auto f = cast(CgiFiber) cast(void*) Fiber.getThis();
version(linux) {
f.setPostYield = () { f.setPostYield = () {
if(*registered) { if(*registered) {
// rearm // rearm
@ -4599,12 +4841,21 @@ private void registerEventWakeup(bool* registered, Socket source, WakeupEvent e)
Fiber.yield(); Fiber.yield();
f.setPostYield(null); f.setPostYield(null);
} else version(Windows) {
Fiber.yield();
}
else static assert(0);
} }
version(cgi_use_fiber) version(cgi_use_fiber)
void unregisterSource(Socket s) { void unregisterSource(Socket s) {
version(linux) {
epoll_event evt; epoll_event evt;
epoll_ctl(epfd, EPOLL_CTL_DEL, s.handle(), &evt); epoll_ctl(epfd, EPOLL_CTL_DEL, s.handle(), &evt);
} else version(Windows) {
// intentionally blank
}
else static assert(0);
} }
// it is a class primarily for reference semantics // it is a class primarily for reference semantics
@ -4871,20 +5122,10 @@ class ListeningConnectionManager {
} }
version(cgi_use_fiber) { version(cgi_use_fiber) {
import core.sys.linux.epoll;
epfd = epoll_create1(EPOLL_CLOEXEC);
if(epfd == -1)
throw new Exception("epoll_create1 " ~ to!string(errno));
scope(exit) {
import core.sys.posix.unistd;
close(epfd);
}
epoll_event ev; version(Windows) {
ev.events = EPOLLIN | EPOLLEXCLUSIVE; // EPOLLEXCLUSIVE is only available on kernels since like 2017 but that's prolly good enough. listener.accept();
ev.data.fd = listener.handle; }
if(epoll_ctl(epfd, EPOLL_CTL_ADD, listener.handle, &ev) == -1)
throw new Exception("epoll_ctl " ~ to!string(errno));
WorkerThread[] threads = new WorkerThread[](totalCPUs * 1 + 1); WorkerThread[] threads = new WorkerThread[](totalCPUs * 1 + 1);
foreach(i, ref thread; threads) { foreach(i, ref thread; threads) {
@ -4919,9 +5160,8 @@ class ListeningConnectionManager {
thread = new ConnectionThread(this, handler, cast(int) i); thread = new ConnectionThread(this, handler, cast(int) i);
thread.start(); thread.start();
} }
}
while(!loopBroken && running) { while(!loopBroken && !globalStopFlag) {
Socket sn; Socket sn;
bool crash_check() { bool crash_check() {
@ -4937,7 +5177,8 @@ class ListeningConnectionManager {
void accept_new_connection() { void accept_new_connection() {
sn = listener.accept(); sn = acceptCancelable();
if(sn is null) return;
cloexec(sn); cloexec(sn);
if(tcp) { if(tcp) {
// disable Nagle's algorithm to avoid a 40ms delay when we send/recv // disable Nagle's algorithm to avoid a 40ms delay when we send/recv
@ -4967,11 +5208,19 @@ class ListeningConnectionManager {
accept_new_connection(); accept_new_connection();
if(sn !is null)
existing_connection_new_data(); existing_connection_new_data();
else if(sn is null && globalStopFlag) {
foreach(thread; threads) {
semaphore.notify();
}
Thread.sleep(50.msecs);
}
if(crash_check()) if(crash_check())
break; break;
} }
}
// FIXME: i typically stop this with ctrl+c which never // FIXME: i typically stop this with ctrl+c which never
// actually gets here. i need to do a sigint handler. // actually gets here. i need to do a sigint handler.
@ -5045,7 +5294,14 @@ Socket startListening(string host, ushort port, ref bool tcp, ref void delegate(
throw new Exception("abstract unix sockets not supported on this system"); throw new Exception("abstract unix sockets not supported on this system");
} }
} else { } else {
version(cgi_use_fiber) {
version(Windows)
listener = new PseudoblockingOverlappedSocket(AddressFamily.INET, SocketType.STREAM);
else
listener = new TcpSocket(); listener = new TcpSocket();
} else {
listener = new TcpSocket();
}
cloexec(listener); cloexec(listener);
listener.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true); listener.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true);
listener.bind(host.length ? parseAddress(host, port) : new InternetAddress(port)); listener.bind(host.length ? parseAddress(host, port) : new InternetAddress(port));
@ -5213,8 +5469,69 @@ class WorkerThread : Thread {
super(&run); super(&run);
} }
version(Windows)
void run() { void run() {
while(lcm.running) { auto timeout = INFINITE;
PseudoblockingOverlappedSocket key;
OVERLAPPED* overlapped;
DWORD bytes;
while(!globalStopFlag && GetQueuedCompletionStatus(iocp, &bytes, cast(PULONG_PTR) &key, &overlapped, timeout)) {
if(key is null)
continue;
key.lastAnswer = bytes;
if(key.fiber) {
key.fiber.proceed();
} else {
// we have a new connection, issue the first receive on it and issue the next accept
auto sn = key.accepted;
key.accept();
cloexec(sn);
if(lcm.tcp) {
// disable Nagle's algorithm to avoid a 40ms delay when we send/recv
// on the socket because we do some buffering internally. I think this helps,
// certainly does for small requests, and I think it does for larger ones too
sn.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1);
sn.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, dur!"seconds"(10));
}
dg(sn);
}
}
//SleepEx(INFINITE, TRUE);
}
version(linux)
void run() {
import core.sys.linux.epoll;
epfd = epoll_create1(EPOLL_CLOEXEC);
if(epfd == -1)
throw new Exception("epoll_create1 " ~ to!string(errno));
scope(exit) {
import core.sys.posix.unistd;
close(epfd);
}
{
epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = cancelfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, cancelfd, &ev);
}
epoll_event ev;
ev.events = EPOLLIN | EPOLLEXCLUSIVE; // EPOLLEXCLUSIVE is only available on kernels since like 2017 but that's prolly good enough.
ev.data.fd = lcm.listener.handle;
if(epoll_ctl(epfd, EPOLL_CTL_ADD, lcm.listener.handle, &ev) == -1)
throw new Exception("epoll_ctl " ~ to!string(errno));
while(!globalStopFlag) {
Socket sn; Socket sn;
epoll_event[64] events; epoll_event[64] events;
@ -5228,18 +5545,19 @@ class WorkerThread : Thread {
foreach(idx; 0 .. nfds) { foreach(idx; 0 .. nfds) {
auto flags = events[idx].events; auto flags = events[idx].events;
if(cast(size_t) events[idx].data.ptr == cast(size_t) lcm.listener.handle) { if(cast(size_t) events[idx].data.ptr == cast(size_t) cancelfd) {
globalStopFlag = true;
//import std.stdio; writeln("exit heard");
break;
} else if(cast(size_t) events[idx].data.ptr == cast(size_t) lcm.listener.handle) {
//import std.stdio; writeln(myThreadNumber, " woken up ", flags);
// this try/catch is because it is set to non-blocking mode // this try/catch is because it is set to non-blocking mode
// and Phobos' stupid api throws an exception instead of returning // and Phobos' stupid api throws an exception instead of returning
// if it would block. Why would it block? because a forked process // if it would block. Why would it block? because a forked process
// might have beat us to it, but the wakeup event thundered our herds. // might have beat us to it, but the wakeup event thundered our herds.
version(cgi_use_fork) {
try try
sn = lcm.listener.accept(); sn = lcm.listener.accept(); // don't need to do the acceptCancelable here since the epoll checks it better
catch(SocketAcceptException e) { continue; } catch(SocketAcceptException e) { continue; }
} else {
sn = lcm.listener.accept();
}
cloexec(sn); cloexec(sn);
if(lcm.tcp) { if(lcm.tcp) {