This commit is contained in:
Adam D. Ruppe 2023-04-16 08:44:26 -04:00
parent 2ff74269fe
commit 2ae4ac1b25
4 changed files with 543 additions and 47 deletions

1
cgi.d
View File

@ -599,6 +599,7 @@ version(Windows) {
}
}
// FIXME: can use the arsd.core function now but it is trivial anyway tbh
void cloexec(int fd) {
version(Posix) {
import core.sys.posix.fcntl;

585
core.d
View File

@ -73,6 +73,10 @@ version(Windows) {
version(Posix) {
import core.sys.posix.signal;
import core.sys.posix.unistd;
import core.sys.posix.sys.un;
import core.sys.posix.sys.socket;
import core.sys.posix.netinet.in_;
}
// FIXME: the exceptions should actually give some explanatory text too (at least sometimes)
@ -2110,13 +2114,13 @@ class AsyncOperationRequest {
/++
+/
abstract class AsyncOperationResponse {
interface AsyncOperationResponse {
/++
Returns true if the request completed successfully, finishing what it was supposed to.
Should be set to `false` if the request was cancelled before completing or encountered an error.
+/
abstract bool wasSuccessful();
bool wasSuccessful();
}
/++
@ -2254,9 +2258,9 @@ class AbstractFile {
/+
enum SpecialFlags {
randomAccessExpected, /// FILE_FLAG_SEQUENTIAL_SCAN is turned off
randomAccessExpected, /// FILE_FLAG_SEQUENTIAL_SCAN is turned off and posix_fadvise(POSIX_FADV_SEQUENTIAL)
skipCache, /// O_DSYNC, FILE_FLAG_NO_BUFFERING and maybe WRITE_THROUGH. note that metadata still goes through the cache, FlushFileBuffers and fsync can still do those
temporary, /// FILE_ATTRIBUTE_TEMPORARY on Windows, idk how to specify on linux
temporary, /// FILE_ATTRIBUTE_TEMPORARY on Windows, idk how to specify on linux. also FILE_FLAG_DELETE_ON_CLOSE can be combined to make a (almost) all memory file. kinda like a private anonymous mmap i believe.
deleteWhenClosed, /// Windows has a flag for this but idk if it is of any real use
async, /// open it in overlapped mode, all reads and writes must then provide an offset. Only implemented on Windows
}
@ -2658,10 +2662,6 @@ AsyncAnonymousPipe[2] anonymousPipePair(AsyncAnonymousPipe[2] preallocatedObject
/+
class NamedPipe : AsyncFile {
}
class WIPSocket : AsyncFile {
}
+/
@ -2681,19 +2681,461 @@ class NamedPipeServer {
// can be on a specific thread or on any thread
}
class WIPSocket {
// stream sockets: send/receive data
// datagram sockets: sendTo, receiveFrom
// unix domain sockets: send/receive fd, get peer credentials (not available on Windows)
/++
Looking these up might be done asynchronously. The objects both represent an async request and its result, which is the actual address the operating system uses.
// otherwise: accept, bind, connect, shutdown, close.
}
When you create an address, it holds a request. You can call `start` and `waitForCompletion` like with other async requests. The request may be run in a helper thread.
class WIPAddress {
Unlike most the async objects though, its methods will implicitly call `waitForCompletion`.
Note that The current implementation just blocks.
+/
class SocketAddress /* : AsyncOperationRequest, AsyncOperationResponse */ {
// maybe accept url?
// unix:///home/me/thing
// ip://0.0.0.0:4555
// ipv6://[00:00:00:00:00:00]
// address info
abstract int domain();
// FIXME: find all cases of this and make sure it is completed first
abstract sockaddr* rawAddr();
abstract socklen_t rawAddrLength();
/+
// request interface
abstract void start();
abstract SocketAddress waitForCompletion();
abstract bool isComplete();
// response interface
abstract bool wasSuccessful();
+/
}
/+
class BluetoothAddress : SocketAddress {
// FIXME it is AF_BLUETOOTH
// see: https://people.csail.mit.edu/albert/bluez-intro/x79.html
// see: https://learn.microsoft.com/en-us/windows/win32/Bluetooth/bluetooth-programming-with-windows-sockets
}
+/
version(Posix) // FIXME: find the sockaddr_un definition for Windows too and add it in
final class UnixAddress : SocketAddress {
sockaddr_un address;
override int domain() {
return AF_UNIX;
}
override sockaddr* rawAddr() {
return cast(sockaddr*) &address;
}
override socklen_t rawAddrLength() {
return address.sizeof;
}
}
final class IpAddress : SocketAddress {
sockaddr_in address;
override int domain() {
return AF_INET;
}
override sockaddr* rawAddr() {
return cast(sockaddr*) &address;
}
override socklen_t rawAddrLength() {
return address.sizeof;
}
}
final class Ipv6Address : SocketAddress {
sockaddr_in6 address;
override int domain() {
return AF_INET6;
}
override sockaddr* rawAddr() {
return cast(sockaddr*) &address;
}
override socklen_t rawAddrLength() {
return address.sizeof;
}
}
/++
For functions that give you an unknown address, you can use this to hold it.
+/
struct SocketAddressBuffer {
sockaddr address;
socklen_t addrlen;
}
class AsyncSocket : AsyncFile {
// otherwise: accept, bind, connect, shutdown, close.
static auto lastError() {
version(Windows)
return WSAGetLastError();
else
return errno;
}
static bool wouldHaveBlocked() {
auto error = lastError;
version(Windows) {
return error == WSAEWOULDBLOCK || error == WSAETIMEDOUT;
} else {
return error == EAGAIN || error == EWOULDBLOCK;
}
}
version(Windows)
enum INVALID = INVALID_SOCKET;
else
enum INVALID = -1;
// type is mostly SOCK_STREAM or SOCK_DGRAM
/++
Creates a socket compatible with the given address. It does not actually connect or bind, nor store the address. You will want to pass it again to those functions:
---
auto socket = new Socket(address, Socket.Type.Stream);
socket.connect(address).waitForCompletion();
---
+/
this(SocketAddress address, int type, int protocol = 0) {
// need to look up these values for linux
// type |= SOCK_NONBLOCK | SOCK_CLOEXEC;
handle_ = socket(address.domain(), type, protocol);
if(handle == INVALID)
throw new SystemApiException("socket", lastError());
super(cast(NativeFileHandle) handle); // I think that cast is ok on Windows... i think
version(Posix) {
makeNonBlocking(handle);
setCloExec(handle);
}
// FIXME: chekc for broadcast
// FIXME: REUSEADDR ?
// FIXME: also set NO_DELAY prolly
// int opt = 1;
// setsockopt(handle, IPPROTO_TCP, TCP_NODELAY, &opt, opt.sizeof);
}
/++
Enabling NODELAY can give latency improvements if you are managing buffers on your end
+/
void setNoDelay(bool enabled) {
}
/++
`allowQuickRestart` will set the SO_REUSEADDR on unix and SO_DONTLINGER on Windows,
allowing the application to be quickly restarted despite there still potentially being
pending data in the tcp stack.
See https://stackoverflow.com/questions/3229860/what-is-the-meaning-of-so-reuseaddr-setsockopt-option-linux for more information.
If you already set your appropriate socket options or value correctness and reliability of the network stream over restart speed, leave this at the default `false`.
+/
void bind(SocketAddress address, bool allowQuickRestart = false) {
if(allowQuickRestart) {
// FIXME
}
auto ret = .bind(handle, address.rawAddr, address.rawAddrLength);
if(ret == -1)
throw new SystemApiException("bind", lastError);
}
/++
You must call [bind] before this.
The backlog should be set to a value where your application can reliably catch up on the backlog in a reasonable amount of time under average load. It is meant to smooth over short duration bursts and making it too big will leave clients hanging - which might cause them to try to reconnect, thinking things got lost in transit, adding to your impossible backlog.
I personally tend to set this to be two per worker thread unless I have actual real world measurements saying to do something else. It is a bit arbitrary and not based on legitimate reasoning, it just seems to work for me (perhaps just because it has never really been put to the test).
+/
void listen(int backlog) {
auto ret = .listen(handle, backlog);
if(ret == -1)
throw new SystemApiException("listen", lastError);
}
/++
+/
void shutdown(int how) {
auto ret = .shutdown(handle, how);
if(ret == -1)
throw new SystemApiException("shutdown", lastError);
}
/++
+/
override void close() {
version(Windows)
closesocket(handle);
else
.close(handle);
handle_ = -1;
}
/++
You can also construct your own request externally to control the memory more.
+/
AsyncConnectRequest connect(SocketAddress address) {
return new AsyncConnectRequest(this, address);
}
/++
You can also construct your own request externally to control the memory more.
+/
AsyncAcceptRequest accept() {
return new AsyncAcceptRequest(this);
}
// note that send is just sendto w/ a null address
// and receive is just receivefrom w/ a null address
/++
You can also construct your own request externally to control the memory more.
+/
AsyncSendRequest send(const(ubyte)[] buffer, int flags = 0) {
return new AsyncSendRequest(this, buffer, null, flags);
}
/++
You can also construct your own request externally to control the memory more.
+/
AsyncReceiveRequest receive(ubyte[] buffer, int flags = 0) {
return new AsyncReceiveRequest(this, buffer, null, flags);
}
/++
You can also construct your own request externally to control the memory more.
+/
AsyncSendRequest sendTo(const(ubyte)[] buffer, SocketAddress address, int flags = 0) {
return new AsyncSendRequest(this, buffer, address, flags);
}
/++
You can also construct your own request externally to control the memory more.
+/
AsyncReceiveRequest receiveFrom(ubyte[] buffer, SocketAddressBuffer* address, int flags = 0) {
return new AsyncReceiveRequest(this, buffer, address, flags);
}
/++
+/
SocketAddress localAddress() {
return null; // FIXME
}
/++
+/
SocketAddress peerAddress() {
return null; // FIXME
}
// for unix sockets on unix only: send/receive fd, get peer creds
/++
+/
final NativeSocketHandle handle() {
return handle_;
}
private NativeSocketHandle handle_;
}
/++
+/
class AsyncConnectRequest : AsyncOperationRequest {
this(AsyncSocket socket, SocketAddress address) {
}
override void start() {}
override void cancel() {}
override bool isComplete() { return true; }
override AsyncConnectResponse waitForCompletion() { assert(0); }
}
/++
+/
class AsyncConnectResponse : AsyncOperationResponse {
const SystemErrorCode errorCode;
this(SystemErrorCode errorCode) {
this.errorCode = errorCode;
}
override bool wasSuccessful() {
return errorCode.wasSuccessful;
}
}
/++
+/
class AsyncAcceptRequest : AsyncOperationRequest {
this(AsyncSocket socket) {
}
override void start() {}
override void cancel() {}
override bool isComplete() { return true; }
override AsyncConnectResponse waitForCompletion() { assert(0); }
}
/++
+/
class AsyncAcceptResponse : AsyncOperationResponse {
AsyncSocket newSocket;
const SystemErrorCode errorCode;
this(AsyncSocket newSocket, SystemErrorCode errorCode) {
this.newSocket = newSocket;
this.errorCode = errorCode;
}
override bool wasSuccessful() {
return errorCode.wasSuccessful;
}
}
/++
+/
class AsyncReceiveRequest : AsyncOperationRequest {
struct LowLevelOperation {
AsyncSocket file;
ubyte[] buffer;
int flags;
SocketAddressBuffer* address;
this(typeof(this.tupleof) args) {
this.tupleof = args;
}
version(Windows) {
auto opCall(OVERLAPPED* overlapped, LPOVERLAPPED_COMPLETION_ROUTINE ocr) {
WSABUF buf;
buf.len = buffer.length;
buf.buf = cast(typeof(buf.buf)) buffer.ptr;
uint flags = this.flags;
if(address is null)
return WSARecv(file.handle, &buf, 1, null, &flags, overlapped, ocr);
else {
return WSARecvFrom(file.handle, &buf, 1, null, &flags, &(address.address), &(address.addrlen), overlapped, ocr);
}
}
} else {
auto opCall() {
if(address is null)
return core.sys.posix.sys.socket.recv(file.handle, buffer.ptr, buffer.length, flags);
else
return core.sys.posix.sys.socket.recvfrom(file.handle, buffer.ptr, buffer.length, flags, &(address.address), &(address.addrlen));
}
}
string errorString() {
return "Receive";
}
}
mixin OverlappedIoRequest!(AsyncReceiveResponse, LowLevelOperation);
this(AsyncSocket socket, ubyte[] buffer, SocketAddressBuffer* address, int flags) {
llo = LowLevelOperation(socket, buffer, flags, address);
this.response = typeof(this.response).defaultConstructed;
}
}
/++
+/
class AsyncReceiveResponse : AsyncOperationResponse {
const ubyte[] bufferWritten;
const SystemErrorCode errorCode;
this(SystemErrorCode errorCode, const(ubyte)[] bufferWritten) {
this.errorCode = errorCode;
this.bufferWritten = bufferWritten;
}
override bool wasSuccessful() {
return errorCode.wasSuccessful;
}
}
/++
+/
class AsyncSendRequest : AsyncOperationRequest {
struct LowLevelOperation {
AsyncSocket file;
const(ubyte)[] buffer;
int flags;
SocketAddress address;
this(typeof(this.tupleof) args) {
this.tupleof = args;
}
version(Windows) {
auto opCall(OVERLAPPED* overlapped, LPOVERLAPPED_COMPLETION_ROUTINE ocr) {
WSABUF buf;
buf.len = buffer.length;
buf.buf = cast(typeof(buf.buf)) buffer.ptr;
if(address is null)
return WSASend(file.handle, &buf, 1, null, flags, overlapped, ocr);
else {
return WSASendTo(file.handle, &buf, 1, null, flags, address.rawAddr, address.rawAddrLength, overlapped, ocr);
}
}
} else {
auto opCall() {
if(address is null)
return core.sys.posix.sys.socket.send(file.handle, buffer.ptr, buffer.length, flags);
else
return core.sys.posix.sys.socket.sendto(file.handle, buffer.ptr, buffer.length, flags, address.rawAddr, address.rawAddrLength);
}
}
string errorString() {
return "Send";
}
}
mixin OverlappedIoRequest!(AsyncSendResponse, LowLevelOperation);
this(AsyncSocket socket, const(ubyte)[] buffer, SocketAddress address, int flags) {
llo = LowLevelOperation(socket, buffer, flags, address);
this.response = typeof(this.response).defaultConstructed;
}
}
/++
+/
class AsyncSendResponse : AsyncOperationResponse {
const ubyte[] bufferWritten;
const SystemErrorCode errorCode;
this(SystemErrorCode errorCode, const(ubyte)[] bufferWritten) {
this.errorCode = errorCode;
this.bufferWritten = bufferWritten;
}
override bool wasSuccessful() {
return errorCode.wasSuccessful;
}
}
/++
@ -2702,7 +3144,7 @@ class WIPAddress {
Depending on the specified address, it can be tcp, tcpv6, or unix domain.
+/
class StreamServer {
this(WIPAddress listenTo) {
this(SocketAddress listenTo) {
}
// when a new connection arrives, it calls your callback
@ -3202,11 +3644,9 @@ void main() {
/++
Implementation details of some requests. You shouldn't need to know any of this, the interface is all public.
+/
mixin template OverlappedIoRequest(Response, alias LowLevelOperation) {
mixin template OverlappedIoRequest(Response, LowLevelOperation) {
private {
AsyncFile file;
ubyte[] buffer;
long offset;
LowLevelOperation llo;
OwnedClass!Response response;
@ -3217,7 +3657,7 @@ mixin template OverlappedIoRequest(Response, alias LowLevelOperation) {
static void overlappedCompletionRoutine(DWORD dwErrorCode, DWORD dwNumberOfBytesTransferred, LPOVERLAPPED lpOverlapped) {
typeof(this) rr = cast(typeof(this)) (cast(void*) lpOverlapped - typeof(this).overlapped.offsetof);
rr.response = typeof(rr.response)(SystemErrorCode(dwErrorCode), rr.buffer[0 .. dwNumberOfBytesTransferred]);
rr.response = typeof(rr.response)(SystemErrorCode(dwErrorCode), rr.llo.buffer[0 .. dwNumberOfBytesTransferred]);
rr.state_ = State.complete;
// FIXME: on complete?
@ -3238,7 +3678,7 @@ mixin template OverlappedIoRequest(Response, alias LowLevelOperation) {
final void cbImpl() {
// it is ready to complete, time to do it
auto ret = LowLevelOperation(file.handle, buffer.ptr, buffer.length);
auto ret = llo();
markCompleted(ret, errno);
}
@ -3247,7 +3687,7 @@ mixin template OverlappedIoRequest(Response, alias LowLevelOperation) {
if(ret == -1)
response = typeof(response)(SystemErrorCode(errno), null);
else
response = typeof(response)(SystemErrorCode(0), buffer[0 .. cast(size_t) ret]);
response = typeof(response)(SystemErrorCode(0), llo.buffer[0 .. cast(size_t) ret]);
state_ = State.complete;
}
}
@ -3267,31 +3707,28 @@ mixin template OverlappedIoRequest(Response, alias LowLevelOperation) {
state_ = State.started;
version(Windows) {
overlapped.Offset = (cast(ulong) offset) & 0xffff_ffff;
overlapped.OffsetHigh = ((cast(ulong) offset) >> 32) & 0xffff_ffff;
if(LowLevelOperation(file.handle, buffer.ptr, cast(DWORD) buffer.length, &overlapped, &overlappedCompletionRoutine)) {
if(llo(&overlapped, &overlappedCompletionRoutine)) {
// all good, though GetLastError() might have some informative info
} else {
// operation failed, the operation is always ReadFileEx or WriteFileEx so it won't give the io pending thing here
// should i issue error async? idk
state_ = State.complete;
throw new SystemApiException(__traits(identifier, LowLevelOperation), GetLastError());
throw new SystemApiException(llo.errorString(), GetLastError());
}
// ReadFileEx always queues, even if it completed synchronously. I *could* check the get overlapped result and sleepex here but i'm prolly better off just letting the event loop do its thing anyway.
} else version(Posix) {
// first try to just do it
auto ret = LowLevelOperation(file.handle, buffer.ptr, buffer.length);
auto ret = llo();
auto errno = errno;
if(ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) { // unable to complete right now, register and try when it is ready
eventRegistration = getThisThreadEventLoop().addCallbackOnFdReadableOneShot(this.file.handle, this.getCb);
eventRegistration = getThisThreadEventLoop().addCallbackOnFdReadableOneShot(this.llo.file.handle, this.getCb);
} else {
// i could set errors sync or async and since it couldn't even start, i think a sync exception is the right way
if(ret == -1)
throw new SystemApiException(__traits(identifier, LowLevelOperation), errno);
throw new SystemApiException(llo.errorString(), errno);
markCompleted(ret, errno); // it completed synchronously (if it is an error nor not is handled by the completion handler)
}
}
@ -3303,7 +3740,7 @@ mixin template OverlappedIoRequest(Response, alias LowLevelOperation) {
return; // it has already finished, just leave it alone, no point discarding what is already done
version(Windows) {
if(state_ != State.unused)
Win32Enforce!CancelIoEx(file.handle, &overlapped);
Win32Enforce!CancelIoEx(llo.file.AbstractFile.handle, &overlapped);
// Windows will notify us when the cancellation is complete, so we need to wait for that before updating the state
} else version(Posix) {
if(state_ != State.unused)
@ -3353,16 +3790,35 @@ mixin template OverlappedIoRequest(Response, alias LowLevelOperation) {
You can write to a file asynchronously by creating one of these.
+/
final class AsyncWriteRequest : AsyncOperationRequest {
version(Windows)
private alias LowLevelOperation = WriteFileEx;
else
private alias LowLevelOperation = core.sys.posix.unistd.write;
struct LowLevelOperation {
AsyncFile file;
ubyte[] buffer;
long offset;
this(typeof(this.tupleof) args) {
this.tupleof = args;
}
version(Windows) {
auto opCall(OVERLAPPED* overlapped, LPOVERLAPPED_COMPLETION_ROUTINE ocr) {
overlapped.Offset = (cast(ulong) offset) & 0xffff_ffff;
overlapped.OffsetHigh = ((cast(ulong) offset) >> 32) & 0xffff_ffff;
return WriteFileEx(file.handle, buffer.ptr, buffer.length, overlapped, ocr);
}
} else {
auto opCall() {
return core.sys.posix.unistd.write(file.handle, buffer.ptr, buffer.length);
}
}
string errorString() {
return "Write";
}
}
mixin OverlappedIoRequest!(AsyncWriteResponse, LowLevelOperation);
this(AsyncFile file, ubyte[] buffer, long offset) {
this.file = file;
this.buffer = buffer;
this.offset = offset;
this.llo = LowLevelOperation(file, buffer, offset);
response = typeof(response).defaultConstructed;
}
}
@ -3388,10 +3844,31 @@ class AsyncWriteResponse : AsyncOperationResponse {
+/
final class AsyncReadRequest : AsyncOperationRequest {
version(Windows)
private alias LowLevelOperation = ReadFileEx;
else
private alias LowLevelOperation = core.sys.posix.unistd.read;
struct LowLevelOperation {
AsyncFile file;
ubyte[] buffer;
long offset;
this(typeof(this.tupleof) args) {
this.tupleof = args;
}
version(Windows) {
auto opCall(OVERLAPPED* overlapped, LPOVERLAPPED_COMPLETION_ROUTINE ocr) {
overlapped.Offset = (cast(ulong) offset) & 0xffff_ffff;
overlapped.OffsetHigh = ((cast(ulong) offset) >> 32) & 0xffff_ffff;
return ReadFileEx(file.handle, buffer.ptr, buffer.length, overlapped, ocr);
}
} else {
auto opCall() {
return core.sys.posix.unistd.read(file.handle, buffer.ptr, buffer.length);
}
}
string errorString() {
return "Read";
}
}
mixin OverlappedIoRequest!(AsyncReadResponse, LowLevelOperation);
/++
@ -3402,9 +3879,7 @@ final class AsyncReadRequest : AsyncOperationRequest {
The offset is where to start reading a disk file. For all other types of files, pass 0.
+/
this(AsyncFile file, ubyte[] buffer, long offset) {
this.file = file;
this.buffer = buffer;
this.offset = offset;
this.llo = LowLevelOperation(file, buffer, offset);
response = typeof(response).defaultConstructed;
}
@ -5157,7 +5632,7 @@ instead of throwing, the prompt could change to indicate the binary data is expe
* read
* write
* seek
* sendfile on linux
* sendfile on linux, TransmitFile on Windows
* let completion handlers run in the io worker thread instead of signaling back
* pipe ops (anonymous or named)
* create
@ -5431,4 +5906,20 @@ so the end result is you keep the last ones. it wouldn't report errors if multip
private version(Windows) extern(Windows) {
BOOL CancelIoEx(HANDLE, LPOVERLAPPED);
struct WSABUF {
ULONG len;
ubyte* buf;
}
alias LPWSABUF = WSABUF*;
// https://learn.microsoft.com/en-us/windows/win32/api/winsock2/ns-winsock2-wsaoverlapped
// "The WSAOVERLAPPED structure is compatible with the Windows OVERLAPPED structure."
// so ima lie here in the bindings.
int WSASend(SOCKET, LPWSABUF, DWORD, LPDWORD, DWORD, LPOVERLAPPED, LPOVERLAPPED_COMPLETION_ROUTINE);
int WSASendTo(SOCKET, LPWSABUF, DWORD, LPDWORD, DWORD, const sockaddr*, int, LPOVERLAPPED, LPOVERLAPPED_COMPLETION_ROUTINE);
int WSARecv(SOCKET, LPWSABUF, DWORD, LPDWORD, LPDWORD, LPOVERLAPPED, LPOVERLAPPED_COMPLETION_ROUTINE);
int WSARecvFrom(SOCKET, LPWSABUF, DWORD, LPDWORD, LPDWORD, sockaddr*, LPINT, LPOVERLAPPED, LPOVERLAPPED_COMPLETION_ROUTINE);
}

View File

@ -14441,6 +14441,8 @@ struct FileName(alias storage = previousFileReferenced, string[] filters = null,
}
/++
Gets a file name for an open or save operation, calling your `onOK` function when the user has selected one. This function may or may not block depending on the operating system, you MUST assume it will complete asynchronously.
History:
onCancel was added November 6, 2021.

View File

@ -5403,8 +5403,10 @@ Pixmap transparencyMaskFromMemoryImage(MemoryImage i, Window window) {
destroy it when it is triggered) nor are there pause/resume functions -
the timer must again be destroyed and recreated if you want to pause it.
---
auto timer = new Timer(50, { it happened!; });
timer.destroy();
---
Timers can only be expected to fire when the event loop is running and only
once per iteration through the event loop.