out of fd fix maybe

This commit is contained in:
Adam D. Ruppe 2022-08-14 07:34:12 -04:00
parent 08e8e6e037
commit 91db768130
1 changed files with 124 additions and 6 deletions

130
http2.d
View File

@ -1333,6 +1333,36 @@ class HttpRequest {
version(arsd_http_internal_implementation) {
/++
Changes the limit of number of open, inactive sockets. Reusing connections can provide a significant
performance improvement, but the operating system can also impose a global limit on the number of open
sockets and/or files that you don't want to run into. This lets you choose a balance right for you.
When the total number of cached, inactive sockets approaches this maximum, it will check for ones closed by the
server first. If there are none already closed by the server, it will select sockets at random from its connection
cache and close them to make room for the new ones.
Please note:
$(LIST
* there is always a limit of six open sockets per domain, per the common practice suggested by the http standard
* the limit given here is thread-local. If you run multiple http clients/requests from multiple threads, don't set this too high or you might bump into the global limit from the OS.
* setting this too low can waste connections because the server might close them, but they will never be garbage collected since my current implementation won't check for dead connections except when it thinks it is running close to the limit.
)
Setting it just right for your use case may provide an up to 10x performance boost.
This implementation is subject to change. If it does, I'll document it, but may not bump the version number.
History:
Added August 10, 2022 (dub v10.9)
+/
static void setConnectionCacheSize(int max = 32) {
connectionCacheSize = max;
}
private static {
// we manage the actual connections. When a request is made on a particular
// host, we try to reuse connections. We may open more than one connection per
@ -1341,6 +1371,86 @@ class HttpRequest {
// The key is the *domain name* and the port. Multiple domains on the same address will have separate connections.
Socket[][string] socketsPerHost;
// only one request can be active on a given socket (at least HTTP < 2.0) so this is that
HttpRequest[Socket] activeRequestOnSocket;
HttpRequest[] pending; // and these are the requests that are waiting
int cachedSockets;
int connectionCacheSize = 32;
/+
This is a somewhat expensive, but essential operation. If it isn't used in a heavy
application, you'll risk running out of file descriptors.
+/
void cleanOldSockets() {
static struct CloseCandidate {
string key;
Socket socket;
}
CloseCandidate[36] closeCandidates;
int closeCandidatesPosition;
outer: foreach(key, sockets; socketsPerHost) {
foreach(socket; sockets) {
if(socket in activeRequestOnSocket)
continue; // it is still in use; we can't close it
closeCandidates[closeCandidatesPosition++] = CloseCandidate(key, socket);
if(closeCandidatesPosition == closeCandidates.length)
break outer;
}
}
auto cc = closeCandidates[0 .. closeCandidatesPosition];
if(cc.length == 0)
return; // no candidates to even examine
// has the server closed any of these? if so, we also close and drop them
static SocketSet readSet = null;
if(readSet is null)
readSet = new SocketSet();
readSet.reset();
foreach(candidate; cc) {
readSet.add(candidate.socket);
}
int closeCount;
auto got = Socket.select(readSet, null, null, 0.msecs /* timeout, want it small since we just checking for eof */);
if(got > 0) {
foreach(ref candidate; cc) {
if(readSet.isSet(candidate.socket)) {
// if we can read when it isn't in use, that means eof; the
// server closed it.
candidate.socket.close();
loseSocketByKey(candidate.key, candidate.socket);
closeCount++;
}
}
debug(arsd_http2) writeln(closeCount, " from inactivity");
} else {
// and if not, of the remaining ones, close a few just at random to bring us back beneath the arbitrary limit.
while(cc.length > 0 && (cachedSockets - closeCount) > connectionCacheSize) {
import std.random;
auto idx = uniform(0, cc.length);
cc[idx].socket.close();
loseSocketByKey(cc[idx].key, cc[idx].socket);
cc[idx] = cc[$ - 1];
cc = cc[0 .. $-1];
closeCount++;
}
debug(arsd_http2) writeln(closeCount, " from randomness");
}
cachedSockets -= closeCount;
}
void loseSocketByKey(string key, Socket s) {
if(auto list = key in socketsPerHost) {
for(int a = 0; a < (*list).length; a++) {
@ -1407,6 +1517,8 @@ class HttpRequest {
socket.Socket.connect(pa);
}
socket.blocking = true; // FIXME total hack to simplify the code here since it isn't really using the event loop yet
string message;
if(ssl) {
auto hostName = host ~ ":" ~ to!string(port);
@ -1461,6 +1573,10 @@ class HttpRequest {
return socket;
}
// import std.stdio; writeln(cachedSockets);
if(cachedSockets > connectionCacheSize)
cleanOldSockets();
import std.string;
auto key = format("http%s://%s:%s", ssl ? "s" : "", host, port);
@ -1478,7 +1594,7 @@ class HttpRequest {
assert(socket !is null);
assert(socket.handle() !is socket_t.init, socket is null ? "null" : socket.toString());
readSet.add(socket);
auto got = Socket.select(readSet, null, null, 5.msecs /* timeout */);
auto got = Socket.select(readSet, null, null, 0.msecs /* timeout, want it small since we just checking for eof */);
if(got > 0) {
// we can read something off this... but there aren't
// any active requests. Assume it is EOF and open a new one
@ -1487,6 +1603,7 @@ class HttpRequest {
loseSocket(host, port, ssl, socket);
goto openNew;
}
cachedSockets--;
return socket;
}
}
@ -1507,10 +1624,6 @@ class HttpRequest {
return socket;
}
// only one request can be active on a given socket (at least HTTP < 2.0) so this is that
HttpRequest[Socket] activeRequestOnSocket;
HttpRequest[] pending; // and these are the requests that are waiting
SocketSet readSet;
SocketSet writeSet;
@ -1658,6 +1771,7 @@ class HttpRequest {
foreach(s; inactive[0 .. inactiveCount]) {
debug(arsd_http2) writeln("removing socket from active list ", cast(void*) s);
activeRequestOnSocket.remove(s);
cachedSockets++;
}
}
@ -5003,12 +5117,16 @@ class WebSocket {
}
}
private template imported(string mod) {
mixin(`import imported = ` ~ mod ~ `;`);
}
/++
Warning: you should call this AFTER websocket.connect or else it might throw on connect because the function sets nonblocking mode and the connect function doesn't handle that well (it throws on the "would block" condition in that function. easier to just do that first)
+/
template addToSimpledisplayEventLoop() {
import arsd.simpledisplay;
void addToSimpledisplayEventLoop(WebSocket ws, SimpleWindow window) {
void addToSimpledisplayEventLoop(WebSocket ws, imported!"arsd.simpledisplay".SimpleWindow window) {
void midprocess() {
if(!ws.lowLevelReceive()) {