From 2502ee06e224dbdbb2d80e3c8bb2001d1efc6886 Mon Sep 17 00:00:00 2001 From: "Adam D. Ruppe" Date: Sun, 2 Apr 2023 07:21:44 -0400 Subject: [PATCH] issue #378 --- cgi.d | 8 +- core.d | 1018 ++++++++++++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 878 insertions(+), 148 deletions(-) diff --git a/cgi.d b/cgi.d index bc25b50..ad40c8d 100644 --- a/cgi.d +++ b/cgi.d @@ -486,7 +486,9 @@ void main() { +/ module arsd.cgi; -import arsd.core; +static import arsd.core; +version(Posix) +import arsd.core : makeNonBlocking; // FIXME: Nullable!T can be a checkbox that enables/disables the T on the automatic form // and a SumType!(T, R) can be a radio box to pick between T and R to disclose the extra boxes on the automatic form @@ -7407,7 +7409,7 @@ mixin template ImplementRpcClientInterface(T, string serverPath, string cmdArg) // derivedMembers on an interface seems to give exactly what I want: the virtual functions we need to implement. so I am just going to use it directly without more filtering. static foreach(idx, member; __traits(derivedMembers, T)) { - static if(__traits(isVirtualFunction, __traits(getMember, T, member))) + static if(__traits(isVirtualMethod, __traits(getMember, T, member))) mixin( q{ std.traits.ReturnType!(__traits(getMember, T, member)) } ~ member ~ q{(std.traits.Parameters!(__traits(getMember, T, member)) params) @@ -7500,7 +7502,7 @@ void dispatchRpcServer(Interface, Class)(Class this_, ubyte[] data, int fd) if(i sw: switch(calledIdx) { foreach(idx, memberName; __traits(derivedMembers, Interface)) - static if(__traits(isVirtualFunction, __traits(getMember, Interface, memberName))) { + static if(__traits(isVirtualMethod, __traits(getMember, Interface, memberName))) { case idx: assert(calledFunction == __traits(getMember, Interface, memberName).mangleof); diff --git a/core.d b/core.d index d347a1c..c05e161 100644 --- a/core.d +++ b/core.d @@ -1,7 +1,7 @@ /++ Shared core functionality including exception helpers, library loader, event loop, and possibly more. Maybe command line processor and uda helper and some basic shared annotation types. - I'll probably move the url, websocket, and ssl stuff in here too as they are often shared. + I'll probably move the url, websocket, and ssl stuff in here too as they are often shared. Maybe a small internationalization helper type (a hook for external implementation) and COM helpers too. If you use this directly outside the arsd library, you might consider using `static import` since names in here are likely to clash with Phobos if you use them together. `static import` will let you easily disambiguate and avoid name conflict errors if I add more here. Some names even clash deliberately to remind me to avoid some antipatterns inside the arsd modules! @@ -12,6 +12,8 @@ module arsd.core; // see for useful info: https://devblogs.microsoft.com/dotnet/how-async-await-really-works/ +// see: https://wiki.openssl.org/index.php/Simple_TLS_Server + import core.thread; import core.volatile; import core.atomic; @@ -29,8 +31,14 @@ static if(!__traits(hasMember, core.attribute, "mustuse")) version(Windows) { version=Arsd_core_windows; - import core.sys.windows.windows; + // import core.sys.windows.windows; + import core.sys.windows.winbase; import core.sys.windows.windef; + import core.sys.windows.winnls; + import core.sys.windows.winuser; + import core.sys.windows.winsock2; + + pragma(lib, "user32"); } else version(linux) { version=Arsd_core_epoll; @@ -39,6 +47,21 @@ version(Windows) { version=Arsd_core_kqueue; import core.sys.freebsd.sys.event; +} else version(DragonFlyBSD) { + // NOT ACTUALLY TESTED + version=Arsd_core_kqueue; + + import core.sys.dragonflybsd.sys.event; +} else version(NetBSD) { + // NOT ACTUALLY TESTED + version=Arsd_core_kqueue; + + import core.sys.netbsd.sys.event; +} else version(OpenBSD) { + version=Arsd_core_kqueue; + + // THIS FILE DOESN'T ACTUALLY EXIST, WE NEED TO MAKE IT + import core.sys.openbsd.sys.event; } else version(OSX) { version=Arsd_core_kqueue; @@ -47,6 +70,7 @@ version(Windows) { version(Posix) { import core.sys.posix.signal; + import core.sys.posix.unistd; } // FIXME: the exceptions should actually give some explanatory text too (at least sometimes) @@ -80,7 +104,7 @@ struct stringz { } /++ - Borrows a slice of the pointer up to the zero terminator. + Borrows a slice of the pointer up to (but not including) the zero terminator. +/ const(char)[] borrow() const { if(raw is null) @@ -1114,6 +1138,20 @@ struct SystemErrorCode { this.code = win32; } + /++ + Returns if the code indicated success. + + Please note that many calls do not actually set a code to success, but rather just don't touch it. Thus this may only be true on `init`. + +/ + bool wasSuccessful() const { + final switch(type) { + case Type.errno: + return this.code == 0; + case Type.win32: + return this.code == 0; + } + } + /++ Constructs a string containing both the code and the explanation string. +/ @@ -1270,8 +1308,6 @@ version(Windows) { } Return Win32Enforce(Params params, ArgSentinel sentinel = ArgSentinel.init, string file = __FILE__, size_t line = __LINE__) { - import core.sys.windows.winbase; - Return value = fn(params); if(value == errorValueToUse) { @@ -1449,7 +1485,7 @@ interface ICoreEventLoop { // to send messages between threads, i'll queue up a function that just call dispatchMessage. can embed the arg inside the callback helper prolly. // tho i might prefer to actually do messages w/ run payloads so it is easier to deduplicate i can still dedupe by insepcting the call args so idk - version(Arsd_core_epoll) { + version(Posix) { @mustuse static struct UnregisterToken { private CoreEventLoopImplementation impl; @@ -1463,7 +1499,14 @@ interface ICoreEventLoop { +/ void unregister() { assert(impl !is null, "Cannot reuse unregister token"); - impl.unregisterFd(fd); + + version(Arsd_core_epoll) { + impl.unregisterFd(fd); + } else version(Arsd_core_kqueue) { + // intentionally blank - all registrations are one-shot there + // FIXME: actually it might not have gone off yet, in that case we do need to delete the filter + } else static assert(0); + cb.release(); this = typeof(this).init; } @@ -1471,6 +1514,7 @@ interface ICoreEventLoop { @mustuse static struct RearmToken { + private bool readable; private CoreEventLoopImplementation impl; private int fd; private CallbackHelper cb; @@ -1481,7 +1525,14 @@ interface ICoreEventLoop { +/ void unregister() { assert(impl !is null, "cannot reuse rearm token after unregistering it"); - impl.unregisterFd(fd); + + version(Arsd_core_epoll) { + impl.unregisterFd(fd); + } else version(Arsd_core_kqueue) { + // intentionally blank - all registrations are one-shot there + // FIXME: actually it might not have gone off yet, in that case we do need to delete the filter + } else static assert(0); + cb.release(); this = typeof(this).init; } @@ -1497,16 +1548,7 @@ interface ICoreEventLoop { UnregisterToken addCallbackOnFdReadable(int fd, CallbackHelper cb); RearmToken addCallbackOnFdReadableOneShot(int fd, CallbackHelper cb); - } - - version(Arsd_core_kqueue) { - // can be a do-nothing here since the event is one off - @mustuse - static struct UnregisterToken { - void unregister() {} - } - - UnregisterToken addCallbackOnFdReadable(int fd, CallbackHelper cb); + RearmToken addCallbackOnFdWritableOneShot(int fd, CallbackHelper cb); } } @@ -1772,9 +1814,9 @@ version(Windows) { } /++ - + An `AbstractFile` represents a file handle on the operating system level. You cannot do much with it. +/ -final class SyncFile { +class AbstractFile { private { NativeFileHandle handle; } @@ -1795,47 +1837,20 @@ final class SyncFile { yes } - /++ - - +/ - ubyte[] read(scope ubyte[] buffer) { - return null; - } - - /++ - - +/ - void write(in void[] buffer) { - } - - enum Seek { - current, - fromBeginning, - fromEnd - } - - // Seeking/telling/sizing is not permitted when appending and some files don't support it - void seek(long where, Seek fromWhence) {} - - long tell() { return 0; } - - long size() { return 0; } - - // note that there is no fsync thing, instead use the special flag. - /+ enum SpecialFlags { randomAccessExpected, /// FILE_FLAG_SEQUENTIAL_SCAN is turned off skipCache, /// O_DSYNC, FILE_FLAG_NO_BUFFERING and maybe WRITE_THROUGH. note that metadata still goes through the cache, FlushFileBuffers and fsync can still do those temporary, /// FILE_ATTRIBUTE_TEMPORARY on Windows, idk how to specify on linux deleteWhenClosed, /// Windows has a flag for this but idk if it is of any real use + async, /// open it in overlapped mode, all reads and writes must then provide an offset. Only implemented on Windows } +/ /++ +/ - this(FilePath filename, OpenMode mode = OpenMode.readOnly, RequirePreexisting require = RequirePreexisting.no) { + protected this(bool async, FilePath filename, OpenMode mode = OpenMode.readOnly, RequirePreexisting require = RequirePreexisting.no, uint specialFlags = 0) { version(Windows) { DWORD access; DWORD creation; @@ -1891,7 +1906,7 @@ final class SyncFile { FILE_SHARE_READ, null, creation, - FILE_ATTRIBUTE_NORMAL,/* | FILE_FLAG_OVERLAPPED,*/ + FILE_ATTRIBUTE_NORMAL | (async ? FILE_FLAG_OVERLAPPED : 0), null ); @@ -1916,6 +1931,9 @@ final class SyncFile { setCloExec(this.handle); } + if(async) + flags |= O_NONBLOCK; + final switch(mode) { case OpenMode.readOnly: flags |= O_RDONLY; @@ -1966,10 +1984,15 @@ final class SyncFile { /++ +/ - this(NativeFileHandle handleToWrap) { + private this(NativeFileHandle handleToWrap) { this.handle = handleToWrap; } + // only available on some types of file + long size() { return 0; } + + // note that there is no fsync thing, instead use the special flag. + /++ +/ @@ -1987,12 +2010,357 @@ final class SyncFile { } } +/++ + ++/ +class File : AbstractFile { + + /++ + Opens a file in synchronous access mode. + + The permission mask is on used on posix systems FIXME: implement it + +/ + this(FilePath filename, OpenMode mode = OpenMode.readOnly, RequirePreexisting require = RequirePreexisting.no, uint specialFlags = 0, uint permMask = 0) { + super(false, filename, mode, require, specialFlags); + } + + /++ + + +/ + ubyte[] read(scope ubyte[] buffer) { + return null; + } + + /++ + + +/ + void write(in void[] buffer) { + } + + enum Seek { + current, + fromBeginning, + fromEnd + } + + // Seeking/telling/sizing is not permitted when appending and some files don't support it + // also not permitted in async mode + void seek(long where, Seek fromWhence) {} + long tell() { return 0; } +} + +/++ + Only one operation can be pending at any time in the current implementation. ++/ +class AsyncFile : AbstractFile { + /++ + Opens a file in asynchronous access mode. + +/ + this(FilePath filename, OpenMode mode = OpenMode.readOnly, RequirePreexisting require = RequirePreexisting.no, uint specialFlags = 0, uint permissionMask = 0) { + // FIXME: implement permissionMask + super(true, filename, mode, require, specialFlags); + } + + package(arsd) this(NativeFileHandle adoptPreSetup) { + super(adoptPreSetup); + } + + /// + AsyncReadRequest read(ubyte[] buffer, long offset = 0) { + return new AsyncReadRequest(this, buffer, offset); + } + + /// + AsyncWriteRequest write(const(void)[] buffer, long offset = 0) { + return new AsyncWriteRequest(this, cast(ubyte[]) buffer, offset); + } + +} + +/+ +private Class recycleObject(Class, Args...)(Class objectToRecycle, Args args) { + if(objectToRecycle is null) + return new Class(args); + // destroy nulls out the vtable which is the first thing in the object + // so if it hasn't already been destroyed, we'll do it here + if((*cast(void**) objectToRecycle) !is null) { + assert(typeid(objectToRecycle) is typeid(Class)); // to make sure we're actually recycling the right kind of object + .destroy(objectToRecycle); + } + + // then go ahead and reinitialize it + ubyte[] rawData = (cast(ubyte*) cast(void*) objectToRecycle)[0 .. __traits(classInstanceSize, Class)]; + rawData[] = (cast(ubyte[]) typeid(Class).initializer)[]; + + objectToRecycle.__ctor(args); + + return objectToRecycle; +} ++/ + +/+ +/++ + Preallocates a class object without initializing it. + + This is suitable *only* for passing to one of the functions in here that takes a preallocated object for recycling. ++/ +Class preallocate(Class)() { + import core.memory; + // FIXME: can i pass NO_SCAN here? + return cast(Class) GC.calloc(__traits(classInstanceSize, Class), 0, typeid(Class)); +} + +OwnedClass!Class preallocateOnStack(Class)() { + +} ++/ + +// thanks for a random person on stack overflow for this function +version(Windows) +BOOL MyCreatePipeEx( + PHANDLE lpReadPipe, + PHANDLE lpWritePipe, + LPSECURITY_ATTRIBUTES lpPipeAttributes, + DWORD nSize, + DWORD dwReadMode, + DWORD dwWriteMode +) +{ + HANDLE ReadPipeHandle, WritePipeHandle; + DWORD dwError; + CHAR[MAX_PATH] PipeNameBuffer; + + if (nSize == 0) { + nSize = 4096; + } + + // FIXME: should be atomic op and gshared + static shared(int) PipeSerialNumber = 0; + + import core.stdc.string; + import core.stdc.stdio; + + sprintf(PipeNameBuffer.ptr, + "\\\\.\\Pipe\\ArsdCoreAnonymousPipe.%08x.%08x".ptr, + GetCurrentProcessId(), + atomicOp!"+="(PipeSerialNumber, 1) + ); + + ReadPipeHandle = CreateNamedPipeA( + PipeNameBuffer.ptr, + 1/*PIPE_ACCESS_INBOUND*/ | dwReadMode, + 0/*PIPE_TYPE_BYTE*/ | 0/*PIPE_WAIT*/, + 1, // Number of pipes + nSize, // Out buffer size + nSize, // In buffer size + 120 * 1000, // Timeout in ms + lpPipeAttributes + ); + + if (! ReadPipeHandle) { + return FALSE; + } + + WritePipeHandle = CreateFileA( + PipeNameBuffer.ptr, + GENERIC_WRITE, + 0, // No sharing + lpPipeAttributes, + OPEN_EXISTING, + FILE_ATTRIBUTE_NORMAL | dwWriteMode, + null // Template file + ); + + if (INVALID_HANDLE_VALUE == WritePipeHandle) { + dwError = GetLastError(); + CloseHandle( ReadPipeHandle ); + SetLastError(dwError); + return FALSE; + } + + *lpReadPipe = ReadPipeHandle; + *lpWritePipe = WritePipeHandle; + return( TRUE ); +} + + + +/+ + + // this is probably useless. + +/++ + Creates a pair of anonymous pipes ready for async operations. + + You can pass some preallocated objects to recycle if you like. ++/ +AsyncAnonymousPipe[2] anonymousPipePair(AsyncAnonymousPipe[2] preallocatedObjects = [null, null], bool inheritable = false) { + version(Posix) { + int[2] fds; + auto ret = pipe(fds); + + if(ret == -1) + throw new SystemApiException("pipe", errno); + + // FIXME: do we want them inheritable? and do we want both sides to be async? + if(!inheritable) { + setCloExec(fds[0]); + setCloExec(fds[1]); + } + // if it is inherited, do we actually want it non-blocking? + makeNonBlocking(fds[0]); + makeNonBlocking(fds[1]); + + return [ + recycleObject(preallocatedObjects[0], fds[0]), + recycleObject(preallocatedObjects[1], fds[1]), + ]; + } else version(Windows) { + HANDLE rp, wp; + // FIXME: do we want them inheritable? and do we want both sides to be async? + if(!MyCreatePipeEx(&rp, &wp, null, 0, FILE_FLAG_OVERLAPPED, FILE_FLAG_OVERLAPPED)) + throw new SystemApiException("MyCreatePipeEx", GetLastError()); + return [ + recycleObject(preallocatedObjects[0], rp), + recycleObject(preallocatedObjects[1], wp), + ]; + } else throw ArsdException!"NotYetImplemented"(); +} + // on posix, just do pipe() w/ non block + // on windows, do an overlapped named pipe server, connect, stop listening, return pair. ++/ + +/+ +class NamedPipe : AsyncFile { + +} + +class WIPSocket : AsyncFile { + +} ++/ + +/++ + A named pipe ready to accept connections. + + A Windows named pipe is an IPC mechanism usable on local machines or across a Windows network. ++/ +version(Windows) +class NamedPipeServer { + // unix domain socket or windows named pipe + + // Promise!AsyncAnonymousPipe connect; + // Promise!AsyncAnonymousPipe accept; + + // when a new connection arrives, it calls your callback + // can be on a specific thread or on any thread +} + +class WIPSocket { + // stream sockets: send/receive data + // datagram sockets: sendTo, receiveFrom + // unix domain sockets: send/receive fd, get peer credentials (not available on Windows) + + // otherwise: accept, bind, connect, shutdown, close. +} + +class WIPAddress { + // maybe accept url? + // unix:///home/me/thing + // ip://0.0.0.0:4555 + // ipv6://[00:00:00:00:00:00] +} + +/++ + A socket bound and ready to accept connections. + + Depending on the specified address, it can be tcp, tcpv6, or unix domain. ++/ +class StreamServer { + this(WIPAddress listenTo) { + + } + // when a new connection arrives, it calls your callback + // can be on a specific thread or on any thread +} + +/++ + A socket bound and ready to use receiveFrom + + Depending on the address, it can be udp or unix domain. ++/ +class DatagramListener { + // whenever a udp message arrives, it calls your callback + // can be on a specific thread or on any thread + + // UDP is realistically just an async read on the bound socket + // just it can get the "from" data out and might need the "more in packet" flag +} + +/++ + Just in case I decide to change the implementation some day. ++/ +alias AsyncAnonymousPipe = AsyncFile; + + +// AsyncAnonymousPipe connectNamedPipe(AsyncAnonymousPipe preallocated, string name) + +// unix fifos are considered just non-seekable files and have no special support in the lib; open them as a regular file w/ the async flag. + +// DIRECTORY LISTINGS + // not async, so if you want that, do it in a helper thread + // just a convenient function to have (tho phobos has a decent one too, importing it expensive af) + +// FIXME + +// FILE/DIR WATCHES + // linux does it by name, windows and bsd do it by handle/descriptor + // dispatches change event to either your thread or maybe the any task` queue. + +class DirectoryWatcher { + // I guess I can have one inotify instance associated with the event loop + + private { + version(Arsd_core_windows) { + + } else version(Arsd_core_epoll) { + } else version(Arsd_core_kqueue) { + } + } + + /++ + Windows and Linux work best when you watch directories. The operating system tells you the name of files as they change. + + BSD doesn't support this. You can only get names by watching specific files. (Windows, by contrast, can only watch directories, but since it gives you a filename, it is easy to filter.) + + inotify is kinda clearly the best of the bunch, with Windows in second place, and kqueue dead last. + +/ + this(FilePath directoryToWatch) { + version(Arsd_core_windows) { + assert(0); + } else version(Arsd_core_epoll) { + auto el = getThisThreadEventLoop(); + // el.addCallbackOnFdReadable + } else version(Arsd_core_kqueue) { + /+ + auto el = cast(CoreEventLoopImplementation) getThisThreadEventLoop(); + + kevent_t ev; + + EV_SET(&ev, SIGINT, EVFILT_VNODE, EV_ADD | EV_ENABLE, 0, 0, null); + ErrnoEnforce!kevent(el.kqueuefd, &ev, 1, null, 0, null); + +/ + } else assert(0, "Not yet implemented for this platform"); + } +} + version(none) void main() { - auto file = new AsyncFile(FilePath("test.txt"), AsyncFile.OpenMode.write, AsyncFile.PreserveContents.truncateIfWriting, AsyncFile.RequirePreexisting.yes); + + auto file = new AsyncFile(FilePath("test.txt"), AsyncFile.OpenMode.writeWithTruncation, AsyncFile.RequirePreexisting.yes); auto buffer = cast(ubyte[]) "hello"; - auto wr = new AsyncWriteRequest(file, buffer); + auto wr = new AsyncWriteRequest(file, buffer, 0); wr.start(); wr.waitForCompletion(); @@ -2000,10 +2368,14 @@ void main() { file.close(); } -mixin template OverlappedIoRequest(Response) { +/++ + Implementation details of some requests. You shouldn't need to know any of this, the interface is all public. ++/ +mixin template OverlappedIoRequest(Response, alias LowLevelOperation) { private { - SyncFile file; + AsyncFile file; ubyte[] buffer; + long offset; OwnedClass!Response response; @@ -2014,42 +2386,133 @@ mixin template OverlappedIoRequest(Response) { static void overlappedCompletionRoutine(DWORD dwErrorCode, DWORD dwNumberOfBytesTransferred, LPOVERLAPPED lpOverlapped) { typeof(this) rr = cast(typeof(this)) (cast(void*) lpOverlapped - typeof(this).overlapped.offsetof); + rr.response = typeof(rr.response)(SystemErrorCode(dwErrorCode), rr.buffer[0 .. dwNumberOfBytesTransferred]); + rr.state_ = State.complete; + + // FIXME: on complete? + // this will queue our CallbackHelper and that should be run at the end of the event loop after it is woken up by the APC run } } - bool started; + version(Posix) { + ICoreEventLoop.RearmToken eventRegistration; + CallbackHelper cb; + + final CallbackHelper getCb() { + if(cb is null) + cb = new CallbackHelper(&cbImpl); + return cb; + } + + final void cbImpl() { + // it is ready to complete, time to do it + auto ret = LowLevelOperation(file.handle, buffer.ptr, buffer.length); + markCompleted(ret, errno); + } + + void markCompleted(long ret, int errno) { + // maybe i should queue an apc to actually do it, to ensure the event loop has cycled... FIXME + if(ret == -1) + response = typeof(response)(SystemErrorCode(errno), null); + else + response = typeof(response)(SystemErrorCode(0), buffer[0 .. cast(size_t) ret]); + state_ = State.complete; + } + } } - override void cancel() { + enum State { + unused, + started, + inProgress, + complete + } + private State state_; + + override void start() { + assert(state_ == State.unused); + + state_ = State.started; + version(Windows) { - Win32Enforce!CancelIoEx(file.handle, &overlapped); + overlapped.Offset = (cast(ulong) offset) & 0xffff_ffff; + overlapped.OffsetHigh = ((cast(ulong) offset) >> 32) & 0xffff_ffff; + + if(LowLevelOperation(file.handle, buffer.ptr, cast(DWORD) buffer.length, &overlapped, &overlappedCompletionRoutine)) { + // all good, though GetLastError() might have some informative info + } else { + // operation failed, the operation is always ReadFileEx or WriteFileEx so it won't give the io pending thing here + // should i issue error async? idk + state_ = State.complete; + throw new SystemApiException(__traits(identifier, LowLevelOperation), GetLastError()); + } + + // ReadFileEx always queues, even if it completed synchronously. I *could* check the get overlapped result and sleepex here but i'm prolly better off just letting the event loop do its thing anyway. } else version(Posix) { - // FIXME + + // first try to just do it + auto ret = LowLevelOperation(file.handle, buffer.ptr, buffer.length); + + auto errno = errno; + if(ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) { // unable to complete right now, register and try when it is ready + eventRegistration = getThisThreadEventLoop().addCallbackOnFdReadableOneShot(this.file.handle, this.getCb); + } else { + // i could set errors sync or async and since it couldn't even start, i think a sync exception is the right way + if(ret == -1) + throw new SystemApiException(__traits(identifier, LowLevelOperation), errno); + markCompleted(ret, errno); // it completed synchronously (if it is an error nor not is handled by the completion handler) + } + } + } + + + override void cancel() { + if(state_ == State.complete) + return; // it has already finished, just leave it alone, no point discarding what is already done + version(Windows) { + if(state_ != State.unused) + Win32Enforce!CancelIoEx(file.handle, &overlapped); + // Windows will notify us when the cancellation is complete, so we need to wait for that before updating the state + } else version(Posix) { + if(state_ != State.unused) + eventRegistration.unregister(); + markCompleted(-1, ECANCELED); } } override bool isComplete() { + // just always let the event loop do it instead + return state_ == State.complete; + + /+ version(Windows) { return HasOverlappedIoCompleted(&overlapped); } else version(Posix) { - return true; + return state_ == State.complete; } + +/ } override Response waitForCompletion() { - if(!started) + if(state_ == State.unused) start(); + // FIXME: if we are inside a fiber, we can set a oncomplete callback and then yield instead... + if(state_ != State.complete) + getThisThreadEventLoop().run(&isComplete); + + /+ version(Windows) { SleepEx(INFINITE, true); //DWORD numberTransferred; //Win32Enforce!GetOverlappedResult(file.handle, &overlapped, &numberTransferred, true); } else version(Posix) { - + getThisThreadEventLoop().run(&isComplete); } + +/ return response; } @@ -2059,105 +2522,81 @@ mixin template OverlappedIoRequest(Response) { You can write to a file asynchronously by creating one of these. +/ final class AsyncWriteRequest : AsyncOperationRequest { - mixin OverlappedIoRequest!AsyncWriteResponse; + version(Windows) + private alias LowLevelOperation = WriteFileEx; + else + private alias LowLevelOperation = core.sys.posix.unistd.write; + mixin OverlappedIoRequest!(AsyncWriteResponse, LowLevelOperation); - this() { + this(AsyncFile file, ubyte[] buffer, long offset) { + this.file = file; + this.buffer = buffer; + this.offset = offset; response = typeof(response).defaultConstructed; } - - override void start() { - } - - /++ - - +/ - void repeat() { - // FIXME - } } +/++ + ++/ class AsyncWriteResponse : AsyncOperationResponse { + const ubyte[] bufferWritten; + const SystemErrorCode errorCode; + + this(SystemErrorCode errorCode, const(ubyte)[] bufferWritten) { + this.errorCode = errorCode; + this.bufferWritten = bufferWritten; + } + override bool wasSuccessful() { - return false; + return errorCode.wasSuccessful; } } +/++ ++/ final class AsyncReadRequest : AsyncOperationRequest { - mixin OverlappedIoRequest!AsyncReadResponse; + version(Windows) + private alias LowLevelOperation = ReadFileEx; + else + private alias LowLevelOperation = core.sys.posix.unistd.read; + mixin OverlappedIoRequest!(AsyncReadResponse, LowLevelOperation); /++ The file must have the overlapped flag enabled on Windows and the nonblock flag set on Posix. The buffer MUST NOT be touched by you - not used by another request, modified, read, or freed, including letting a static array going out of scope - until this request's `isComplete` returns `true`. - The offset pointer is shared between this and other requests. + The offset is where to start reading a disk file. For all other types of files, pass 0. +/ - this(NativeFileHandle file, ubyte[] buffer, shared(long)* offset) { + this(AsyncFile file, ubyte[] buffer, long offset) { + this.file = file; + this.buffer = buffer; + this.offset = offset; response = typeof(response).defaultConstructed; } - override void start() { - version(Windows) { - auto ret = ReadFileEx(file.handle, buffer.ptr, cast(DWORD) buffer.length, &overlapped, &overlappedCompletionRoutine); - // need to check GetLastError - - // ReadFileEx always queues, even if it completed synchronously. I *could* check the get overlapped result and sleepex here but i'm prolly better off just letting the event loop do its thing anyway. - } else version(Posix) { - import core.sys.posix.unistd; - - // first try to just do it - auto ret = read(file.handle, buffer.ptr, buffer.length); - - // then if it doesn't complete synchronously, need to event loop register - - // if we are inside a fiber task, it can simply yield and call the fiber in the callback - // when we return here, it tries to read again - - // if not inside, we need to ensure the buffer remains valid and set a callback... and return. - // the callback must retry the read - - // generally, the callback must satisfy the read somehow they set the callback to trigger the result object's completion handler - } - } - /++ - Cancels the request. This will cause `isComplete` to return true once the cancellation has been processed, but [AsyncOperationResponse.wasSuccessful] will return `false` (unless it completed before the cancellation was processed, in which case it is still allowed to finish successfully). - - After cancelling a request, you should still wait for it to complete to ensure that the task has actually released its resources before doing anything else on it. - - Once a cancellation request has been sent, it cannot be undone. - +/ - override void cancel() { - - } - - /++ - Returns `true` if the operation has been completed. It may be completed successfully, cancelled, or have errored out - to check this, call [waitForCompletion] and check the members on the response object. - +/ - override bool isComplete() { - return false; - } - /++ - Waits until the request has completed - successfully or otherwise - and returns the response object. - - The response object may be embedded in the request object - do not reuse the request until you are finished with the response and do not keep the response around longer than you keep the request. - - - Note to implementers: all subclasses should override this and return their specific response object. You can use the top-level `waitForFirstToCompleteByIndex` function with a single-element static array to help with the implementation. - +/ - override AsyncOperationResponse waitForCompletion() { - return response; - } - /++ +/ // abstract void repeat(); } +/++ + ++/ class AsyncReadResponse : AsyncOperationResponse { + const ubyte[] bufferRead; + const SystemErrorCode errorCode; + + this(SystemErrorCode errorCode, const(ubyte)[] bufferRead) { + this.errorCode = errorCode; + this.bufferRead = bufferRead; + } + override bool wasSuccessful() { - return false; + return errorCode.wasSuccessful; } } @@ -2168,9 +2607,151 @@ class AsyncReadResponse : AsyncOperationResponse { runHelperFunction() - whomever it reports to is the parent +/ +/+ +class Task : Fiber { + +} ++/ + +private class CoreWorkerThread : Thread { + this(EventLoopType type) { + this.type = type; + + // task runners are supposed to have smallish stacks since they either just run a single callback or call into fibers + // the helper runners might be a bit bigger tho + super(&run); + } + void run() { + eventLoop = getThisThreadEventLoop(this.type); + atomicOp!"+="(startedCount, 1); + atomicOp!"+="(runningCount, 1); + scope(exit) { + atomicOp!"-="(runningCount, 1); + } + + eventLoop.run(() => true); + } + + EventLoopType type; + ICoreEventLoop eventLoop; + + __gshared static { + CoreWorkerThread[] taskRunners; + CoreWorkerThread[] helperRunners; + ICoreEventLoop mainThreadLoop; + + // for the helper function thing on the bsds i could have my own little circular buffer of availability + + shared(int) startedCount; + shared(int) runningCount; + + bool started; + + void setup(int numberOfTaskRunners, int numberOfHelpers) { + assert(!started); + synchronized { + mainThreadLoop = getThisThreadEventLoop(); + + foreach(i; 0 .. numberOfTaskRunners) { + auto nt = new CoreWorkerThread(EventLoopType.TaskRunner); + taskRunners ~= nt; + nt.start(); + } + foreach(i; 0 .. numberOfHelpers) { + auto nt = new CoreWorkerThread(EventLoopType.HelperWorker); + helperRunners ~= nt; + nt.start(); + } + + const expectedCount = numberOfHelpers + numberOfTaskRunners; + + while(startedCount < expectedCount) { + Thread.yield(); + } + + started = true; + } + } + } +} + +private int numberOfCpus() { + return 4; // FIXME +} + +/++ + To opt in to the full functionality of this module with customization opportunity, create one and only one of these objects that is valid for exactly the lifetime of the application. + + Normally, this means writing a main like this: + + --- + import arsd.core; + void main() { + ArsdCoreApplication app = ArsdCoreApplication("Your app name"); + + // do your setup here + + // the rest of your code here + } + --- + + Its destructor runs the event loop then waits to for the workers to finish to clean them up. ++/ +struct ArsdCoreApplication { + private ICoreEventLoop impl; + + /++ + default number of threads is to split your cpus between blocking function runners and task runners + +/ + this(string applicationName) { + auto num = numberOfCpus(); + num /= 2; + if(num <= 0) + num = 1; + this(applicationName, num, num); + } + + /++ + + +/ + this(string applicationName, int numberOfTaskRunners, int numberOfHelpers) { + impl = getThisThreadEventLoop(EventLoopType.Explicit); + CoreWorkerThread.setup(numberOfTaskRunners, numberOfHelpers); + } + + @disable this(); + @disable this(this); + /++ + This must be deterministically destroyed. + +/ + @disable new(); + + ~this() { + run(); + exitApplication(); + waitForWorkersToExit(3000); + } + + void exitApplication() { + + } + + void waitForWorkersToExit(int timeoutMilliseconds) { + + } + + void run() { + impl.run(() => true); + } +} + + private class CoreEventLoopImplementation : ICoreEventLoop { version(Arsd_core_kqueue) { + // this thread apc dispatches go as a custom event to the queue + // the other queues go through one byte at a time pipes (barf). freebsd 13 and newest nbsd have eventfd too tho so maybe i can use them but the other kqueue systems don't. + void runOnce() { kevent_t[16] ev; //timespec tout = timespec(1, 0); @@ -2206,7 +2787,34 @@ private class CoreEventLoopImplementation : ICoreEventLoop { ErrnoEnforce!kevent(kqueuefd, &ev, 1, null, 0, null); - return UnregisterToken(); + return UnregisterToken(this, fd, cb); + } + + RearmToken addCallbackOnFdReadableOneShot(int fd, CallbackHelper cb) { + kevent_t ev; + + EV_SET(&ev, fd, EVFILT_READ, EV_ADD | EV_ENABLE/* | EV_ONESHOT*/, 0, 0, cast(void*) cb); + + ErrnoEnforce!kevent(kqueuefd, &ev, 1, null, 0, null); + + return RearmToken(true, this, fd, cb, 0); + } + + RearmToken addCallbackOnFdWritableOneShot(int fd, CallbackHelper cb) { + kevent_t ev; + + EV_SET(&ev, fd, EVFILT_WRITE, EV_ADD | EV_ENABLE/* | EV_ONESHOT*/, 0, 0, cast(void*) cb); + + ErrnoEnforce!kevent(kqueuefd, &ev, 1, null, 0, null); + + return RearmToken(false, this, fd, cb, 0); + } + + private void rearmFd(RearmToken token) { + if(token.readable) + cast(void) addCallbackOnFdReadableOneShot(token.fd, token.cb); + else + cast(void) addCallbackOnFdWritableOneShot(token.fd, token.cb); } private void triggerGlobalEvent() { @@ -2222,6 +2830,8 @@ private class CoreEventLoopImplementation : ICoreEventLoop { if(kqueueGlobalFd[0] == 0) { import core.sys.posix.unistd; pipe(kqueueGlobalFd); + setCloExec(kqueueGlobalFd[0]); + setCloExec(kqueueGlobalFd[1]); signal(SIGINT, SIG_IGN); // FIXME } @@ -2280,6 +2890,9 @@ private class CoreEventLoopImplementation : ICoreEventLoop { // manages how to do it __gshared HANDLE iocpTaskRunners; __gshared HANDLE iocpWorkers; + + HANDLE[] handles; + // i think to terminate i just have to post the message at least once for every thread i know about, maybe a few more times for threads i don't know about. bool isWorker; // if it is a worker we wait on the iocp, if not we wait on msg @@ -2289,13 +2902,50 @@ private class CoreEventLoopImplementation : ICoreEventLoop { // this function is only supported on Windows Vista and up, so using this // means dropping support for XP. //GetQueuedCompletionStatusEx(); + assert(0); // FIXME } else { - //MsgWaitForMultipleObjectsEx(); - if(true) { - // handle: timeout - // HANDLE ready, forward message - // window messages - // also sleepex if needed + auto wto = 0; + + auto waitResult = MsgWaitForMultipleObjectsEx( + cast(int) handles.length, handles.ptr, + (wto == 0 ? INFINITE : wto), /* timeout */ + 0x04FF, /* QS_ALLINPUT */ + 0x0002 /* MWMO_ALERTABLE */ | 0x0004 /* MWMO_INPUTAVAILABLE */); + + enum WAIT_OBJECT_0 = 0; + if(waitResult >= WAIT_OBJECT_0 && waitResult < handles.length + WAIT_OBJECT_0) { + auto h = handles[waitResult - WAIT_OBJECT_0]; + // FIXME: run the handle ready callback + } else if(waitResult == handles.length + WAIT_OBJECT_0) { + // message ready + int count; + MSG message; + while(PeekMessage(&message, null, 0, 0, PM_NOREMOVE)) { // need to peek since sometimes MsgWaitForMultipleObjectsEx returns even though GetMessage can block. tbh i don't fully understand it but the docs say it is foreground activation + auto ret = GetMessage(&message, null, 0, 0); + if(ret == -1) + throw new WindowsApiException("GetMessage", GetLastError()); + TranslateMessage(&message); + DispatchMessage(&message); + + count++; + if(count > 10) + break; // take the opportunity to catch up on other events + + if(ret == 0) { // WM_QUIT + // EventLoop.quitApplication(); + assert(0); // FIXME + //break; + } + } + } else if(waitResult == 0x000000C0L /* WAIT_IO_COMPLETION */) { + SleepEx(0, true); // I call this to give it a chance to do stuff like async io + } else if(waitResult == 258L /* WAIT_TIMEOUT */) { + // timeout, should never happen since we aren't using it + } else if(waitResult == 0xFFFFFFFF) { + // failed + throw new WindowsApiException("MsgWaitForMultipleObjectsEx", GetLastError()); + } else { + // idk.... } } } @@ -2622,7 +3272,20 @@ private class CoreEventLoopImplementation : ICoreEventLoop { if(epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event) == -1) throw new ErrnoApiException("epoll_ctl", errno); - return RearmToken(this, fd, cb, EPOLLIN | EPOLLONESHOT); + return RearmToken(true, this, fd, cb, EPOLLIN | EPOLLONESHOT); + } + + /++ + Adds a one-off callback that you can optionally rearm when it happens. + +/ + RearmToken addCallbackOnFdWritableOneShot(int fd, CallbackHelper cb) { + epoll_event event; + event.data.ptr = cast(void*) cb; + event.events = EPOLLOUT | EPOLLONESHOT; + if(epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event) == -1) + throw new ErrnoApiException("epoll_ctl", errno); + + return RearmToken(false, this, fd, cb, EPOLLOUT | EPOLLONESHOT); } private void unregisterFd(int fd) { @@ -2659,6 +3322,14 @@ bool postMessage(ThreadToRunIn destination, Object message) { return false; } +/+ +void main() { + // FIXME: the offset doesn't seem to be done right + auto file = new AsyncFile(FilePath("test.txt"), AsyncFile.OpenMode.writeWithTruncation); + file.write("hello", 10).waitForCompletion(); +} ++/ + // to test the mailboxes /+ void main() { @@ -3136,13 +3807,15 @@ class ExternalProcess { This is the native version for Windows. +/ this(string program, string commandLine) { + version(Posix) { + assert(0, "not implemented command line to posix args yet"); + } } this(string commandLine) { version(Posix) { assert(0, "not implemented command line to posix args yet"); } - } this(string[] args) { @@ -3411,7 +4084,7 @@ class ExternalProcess { } // FIXME: comment this out -///+ +/+ unittest { auto proc = new ExternalProcess(FilePath("/bin/cat"), ["/bin/cat"]); @@ -3464,7 +4137,7 @@ unittest { assert(c == 2); } -//+/ ++/ // to test the thundering herd on signal handling version(none) @@ -3490,6 +4163,64 @@ unittest { } } +/+ + ================= + STDIO REPLACEMENT + ================= ++/ + +/++ + A `writeln` that actually works. + + It works correctly on Windows, using the correct functions to write unicode to the console. even allocating a console if needed. If the output has been redirected to a file or pipe, it writes UTF-8. + + This always does text. See also WritableStream and WritableTextStream ++/ +void writeln(T...)(T t) { + char[256] bufferBacking; + char[] buffer = bufferBacking[]; + int pos; + foreach(arg; t) { + static if(is(typeof(arg) : const char[])) { + buffer[pos .. pos + arg.length] = arg[]; + pos += arg.length; + } else static if(is(typeof(arg) : stringz)) { + auto b = arg.borrow; + buffer[pos .. pos + b.length] = b[]; + pos += b.length; + } else static if(is(typeof(arg) : long)) { + auto sliced = intToString(arg, buffer[pos .. $]); + pos += sliced.length; + } else static assert(0, "Unsupported type: " ~ T.stringof); + } + + buffer[pos++] = '\n'; + + version(Windows) { + import core.sys.windows.wincon; + + auto hStdOut = GetStdHandle(STD_OUTPUT_HANDLE); + if(hStdOut == null || hStdOut == INVALID_HANDLE_VALUE) { + AllocConsole(); + hStdOut = GetStdHandle(STD_OUTPUT_HANDLE); + } + + if(GetFileType(hStdOut) == FILE_TYPE_CHAR) { + wchar[256] wbuffer; + auto toWrite = makeWindowsString(buffer[0 .. pos], wbuffer, WindowsStringConversionFlags.convertNewLines); + + DWORD written; + WriteConsoleW(hStdOut, toWrite.ptr, cast(DWORD) toWrite.length, &written, null); + } else { + DWORD written; + WriteFile(hStdOut, buffer.ptr, pos, &written, null); + } + } else { + import unix = core.sys.posix.unistd; + unix.write(1, buffer.ptr, pos); + } +} + /+ STDIO @@ -3497,9 +4228,6 @@ STDIO /++ Please note using this will create a compile-time dependency on [arsd.terminal] - It works correctly on Windows, using the correct functions to write unicode to the console. - even allocating a console if needed. If the output has been redirected to a file or pipe, it - writes UTF-8. so my writeln replacement: