mirror of https://github.com/adamdruppe/arsd.git
ExternalProcess overhaul to be better pipe citizens
This commit is contained in:
parent
d5a59f8a3b
commit
d246f4d744
302
core.d
302
core.d
|
@ -2858,6 +2858,21 @@ package(arsd) enum EventLoopType {
|
|||
Tasks are assigned to a worker thread and may share it with other tasks.
|
||||
+/
|
||||
|
||||
/+
|
||||
private ThreadLocalGcRoots gcRoots;
|
||||
|
||||
private struct ThreadLocalGcRoots {
|
||||
// it actually would be kinda cool if i could tell the GC
|
||||
// that only part of this array is actually used so it can skip
|
||||
// scanning the rest. but meh.
|
||||
const(void)*[] roots;
|
||||
|
||||
void* add(const(void)* what) {
|
||||
roots ~= what;
|
||||
return &roots[$-1];
|
||||
}
|
||||
}
|
||||
+/
|
||||
|
||||
// the GC may not be able to see this! remember, it can be hidden inside kernel buffers
|
||||
version(HasThread) package(arsd) class CallbackHelper {
|
||||
|
@ -5230,8 +5245,10 @@ mixin template OverlappedIoRequest(Response, LowLevelOperation) {
|
|||
rr.response = typeof(rr.response)(SystemErrorCode(dwErrorCode), rr.llo.buffer[0 .. dwNumberOfBytesTransferred]);
|
||||
rr.state_ = State.complete;
|
||||
|
||||
// FIXME: on complete?
|
||||
if(rr.oncomplete)
|
||||
rr.oncomplete(rr);
|
||||
|
||||
// FIXME: on complete?
|
||||
// this will queue our CallbackHelper and that should be run at the end of the event loop after it is woken up by the APC run
|
||||
}
|
||||
}
|
||||
|
@ -5259,6 +5276,9 @@ mixin template OverlappedIoRequest(Response, LowLevelOperation) {
|
|||
else
|
||||
response = typeof(response)(SystemErrorCode(0), llo.buffer[0 .. cast(size_t) ret]);
|
||||
state_ = State.complete;
|
||||
|
||||
if(oncomplete)
|
||||
oncomplete(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -5279,6 +5299,7 @@ mixin template OverlappedIoRequest(Response, LowLevelOperation) {
|
|||
version(Windows) {
|
||||
if(llo(&overlapped, &overlappedCompletionRoutine)) {
|
||||
// all good, though GetLastError() might have some informative info
|
||||
//writeln(GetLastError());
|
||||
} else {
|
||||
// operation failed, the operation is always ReadFileEx or WriteFileEx so it won't give the io pending thing here
|
||||
// should i issue error async? idk
|
||||
|
@ -5294,7 +5315,10 @@ mixin template OverlappedIoRequest(Response, LowLevelOperation) {
|
|||
|
||||
auto errno = errno;
|
||||
if(ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) { // unable to complete right now, register and try when it is ready
|
||||
eventRegistration = getThisThreadEventLoop().addCallbackOnFdReadableOneShot(this.llo.file.handle, this.getCb);
|
||||
if(eventRegistration is typeof(eventRegistration).init)
|
||||
eventRegistration = getThisThreadEventLoop().addCallbackOnFdReadableOneShot(this.llo.file.handle, this.getCb);
|
||||
else
|
||||
eventRegistration.rearm();
|
||||
} else {
|
||||
// i could set errors sync or async and since it couldn't even start, i think a sync exception is the right way
|
||||
if(ret == -1)
|
||||
|
@ -5304,7 +5328,6 @@ mixin template OverlappedIoRequest(Response, LowLevelOperation) {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
override void cancel() {
|
||||
if(state_ == State.complete)
|
||||
return; // it has already finished, just leave it alone, no point discarding what is already done
|
||||
|
@ -5354,6 +5377,20 @@ mixin template OverlappedIoRequest(Response, LowLevelOperation) {
|
|||
|
||||
return response;
|
||||
}
|
||||
|
||||
/++
|
||||
Repeats the operation, restarting the request.
|
||||
|
||||
This must only be called when the operation has already completed.
|
||||
+/
|
||||
void repeat() {
|
||||
if(state_ != State.complete)
|
||||
throw new Exception("wrong use, cannot repeat if not complete");
|
||||
state_ = State.unused;
|
||||
start();
|
||||
}
|
||||
|
||||
void delegate(typeof(this) t) oncomplete;
|
||||
}
|
||||
|
||||
/++
|
||||
|
@ -5410,6 +5447,13 @@ class AsyncWriteResponse : AsyncOperationResponse {
|
|||
}
|
||||
}
|
||||
|
||||
// FIXME: on Windows, you may want two operations outstanding at once
|
||||
// so there's no delay between sequential ops. this system currently makes that
|
||||
// impossible since epoll won't let you register twice...
|
||||
|
||||
// FIXME: if an op completes synchronously, and oncomplete calls repeat
|
||||
// you can get infinite recursion into the stack...
|
||||
|
||||
/++
|
||||
|
||||
+/
|
||||
|
@ -6864,17 +6908,26 @@ version(HasThread) class ReadableStream {
|
|||
|
||||
/// ditto
|
||||
final T get(T : E[], E)(scope bool delegate(E e) isTerminatingSentinel, ByteOrder elementByteOrder = ByteOrder.irrelevant, string file = __FILE__, size_t line = __LINE__) {
|
||||
if(byteOrder == ByteOrder.irrelevant && E.sizeof > 1)
|
||||
if(elementByteOrder == ByteOrder.irrelevant && E.sizeof > 1)
|
||||
throw new InvalidArgumentsException("elementByteOrder", "byte order must be specified for type " ~ E.stringof ~ " because it is bigger than one byte", "ReadableStream.get", file, line);
|
||||
|
||||
assert(0, "Not implemented");
|
||||
T ret;
|
||||
|
||||
do {
|
||||
try
|
||||
ret ~= get!E(elementByteOrder);
|
||||
catch(ArsdException!"is already closed" ae)
|
||||
return ret;
|
||||
} while(!isTerminatingSentinel(ret[$-1]));
|
||||
|
||||
return ret[0 .. $-1]; // cut off the terminating sentinel
|
||||
}
|
||||
|
||||
/++
|
||||
|
||||
+/
|
||||
bool isClosed() {
|
||||
return isClosed_;
|
||||
return isClosed_ && currentBuffer.length == 0 && leftoverBuffer.length == 0;
|
||||
}
|
||||
|
||||
// Control side of things
|
||||
|
@ -6905,6 +6958,9 @@ version(HasThread) class ReadableStream {
|
|||
You basically have to use this thing from a task
|
||||
+/
|
||||
protected void waitForAdditionalData() {
|
||||
if(isClosed_)
|
||||
throw ArsdException!("is already closed")();
|
||||
|
||||
Fiber task = Fiber.getThis;
|
||||
|
||||
assert(task !is null);
|
||||
|
@ -7002,9 +7058,6 @@ unittest {
|
|||
---
|
||||
|
||||
Please note that this does not currently and I have no plans as of this writing to add support for any kind of direct file descriptor passing. It always pipes them back to the parent for processing. If you don't want this, call the lower level functions yourself; the reason this class is here is to aid integration in the arsd.core event loop. Of course, I might change my mind on this.
|
||||
|
||||
Bugs:
|
||||
Not implemented at all on Windows yet.
|
||||
+/
|
||||
class ExternalProcess /*: AsyncOperationRequest*/ {
|
||||
|
||||
|
@ -7021,8 +7074,7 @@ class ExternalProcess /*: AsyncOperationRequest*/ {
|
|||
synchronized(typeid(ExternalProcess)) {
|
||||
if(pid in activeChildren) {
|
||||
auto ac = activeChildren[pid];
|
||||
ac.completed = true;
|
||||
ac.status = status;
|
||||
ac.markComplete(status);
|
||||
activeChildren.remove(pid);
|
||||
}
|
||||
}
|
||||
|
@ -7034,13 +7086,18 @@ class ExternalProcess /*: AsyncOperationRequest*/ {
|
|||
/++
|
||||
This is the native version for Windows.
|
||||
+/
|
||||
this(string program, string commandLine) {
|
||||
version(Windows)
|
||||
this(FilePath program, string commandLine) {
|
||||
version(Posix) {
|
||||
assert(0, "not implemented command line to posix args yet");
|
||||
} else version(Windows) {
|
||||
this.program = program;
|
||||
this.commandLine = commandLine;
|
||||
}
|
||||
else throw new NotYetImplementedException();
|
||||
}
|
||||
|
||||
/+
|
||||
this(string commandLine) {
|
||||
version(Posix) {
|
||||
assert(0, "not implemented command line to posix args yet");
|
||||
|
@ -7055,10 +7112,12 @@ class ExternalProcess /*: AsyncOperationRequest*/ {
|
|||
}
|
||||
else throw new NotYetImplementedException();
|
||||
}
|
||||
+/
|
||||
|
||||
/++
|
||||
This is the native version for Posix.
|
||||
+/
|
||||
version(Posix)
|
||||
this(FilePath program, string[] args) {
|
||||
version(Posix) {
|
||||
this.program = program;
|
||||
|
@ -7067,12 +7126,13 @@ class ExternalProcess /*: AsyncOperationRequest*/ {
|
|||
else throw new NotYetImplementedException();
|
||||
}
|
||||
|
||||
// you can modify these before calling start
|
||||
int stdoutBufferSize = 32 * 1024;
|
||||
int stderrBufferSize = 8 * 1024;
|
||||
/++
|
||||
|
||||
+/
|
||||
void start() {
|
||||
version(Posix) {
|
||||
getThisThreadEventLoop(); // ensure it is initialized
|
||||
|
||||
int ret;
|
||||
|
||||
int[2] stdinPipes;
|
||||
|
@ -7085,7 +7145,7 @@ class ExternalProcess /*: AsyncOperationRequest*/ {
|
|||
close(stdinPipes[1]);
|
||||
}
|
||||
|
||||
stdinFd = stdinPipes[1];
|
||||
auto stdinFd = stdinPipes[1];
|
||||
|
||||
int[2] stdoutPipes;
|
||||
ret = pipe(stdoutPipes);
|
||||
|
@ -7097,7 +7157,7 @@ class ExternalProcess /*: AsyncOperationRequest*/ {
|
|||
close(stdoutPipes[1]);
|
||||
}
|
||||
|
||||
stdoutFd = stdoutPipes[0];
|
||||
auto stdoutFd = stdoutPipes[0];
|
||||
|
||||
int[2] stderrPipes;
|
||||
ret = pipe(stderrPipes);
|
||||
|
@ -7109,7 +7169,7 @@ class ExternalProcess /*: AsyncOperationRequest*/ {
|
|||
close(stderrPipes[1]);
|
||||
}
|
||||
|
||||
stderrFd = stderrPipes[0];
|
||||
auto stderrFd = stderrPipes[0];
|
||||
|
||||
|
||||
int[2] errorReportPipes;
|
||||
|
@ -7233,107 +7293,165 @@ class ExternalProcess /*: AsyncOperationRequest*/ {
|
|||
|
||||
ErrnoEnforce!close(errorReportPipes[0]);
|
||||
|
||||
// and now register the ones we need to read with the event loop so it can call the callbacks
|
||||
// also need to listen to SIGCHLD to queue up the terminated callback. FIXME
|
||||
makeNonBlocking(stdinFd);
|
||||
makeNonBlocking(stdoutFd);
|
||||
makeNonBlocking(stderrFd);
|
||||
|
||||
stdoutUnregisterToken = getThisThreadEventLoop().addCallbackOnFdReadable(stdoutFd, new CallbackHelper(&stdoutReadable));
|
||||
stderrUnregisterToken = getThisThreadEventLoop().addCallbackOnFdReadable(stderrFd, new CallbackHelper(&stderrReadable));
|
||||
_stdin = new AsyncFile(stdinFd);
|
||||
_stdout = new AsyncFile(stdoutFd);
|
||||
_stderr = new AsyncFile(stderrFd);
|
||||
}
|
||||
} else version(Windows) {
|
||||
WCharzBuffer program = this.program.path;
|
||||
WCharzBuffer cmdLine = this.commandLine;
|
||||
|
||||
PROCESS_INFORMATION pi;
|
||||
STARTUPINFOW startupInfo;
|
||||
|
||||
SECURITY_ATTRIBUTES saAttr;
|
||||
saAttr.nLength = SECURITY_ATTRIBUTES.sizeof;
|
||||
saAttr.bInheritHandle = true;
|
||||
saAttr.lpSecurityDescriptor = null;
|
||||
|
||||
HANDLE inreadPipe;
|
||||
HANDLE inwritePipe;
|
||||
if(MyCreatePipeEx(&inreadPipe, &inwritePipe, &saAttr, 0, 0, FILE_FLAG_OVERLAPPED) == 0)
|
||||
throw new WindowsApiException("CreatePipe", GetLastError());
|
||||
if(!SetHandleInformation(inwritePipe, 1/*HANDLE_FLAG_INHERIT*/, 0))
|
||||
throw new WindowsApiException("SetHandleInformation", GetLastError());
|
||||
|
||||
HANDLE outreadPipe;
|
||||
HANDLE outwritePipe;
|
||||
if(MyCreatePipeEx(&outreadPipe, &outwritePipe, &saAttr, 0, FILE_FLAG_OVERLAPPED, 0) == 0)
|
||||
throw new WindowsApiException("CreatePipe", GetLastError());
|
||||
if(!SetHandleInformation(outreadPipe, 1/*HANDLE_FLAG_INHERIT*/, 0))
|
||||
throw new WindowsApiException("SetHandleInformation", GetLastError());
|
||||
|
||||
HANDLE errreadPipe;
|
||||
HANDLE errwritePipe;
|
||||
if(MyCreatePipeEx(&errreadPipe, &errwritePipe, &saAttr, 0, FILE_FLAG_OVERLAPPED, 0) == 0)
|
||||
throw new WindowsApiException("CreatePipe", GetLastError());
|
||||
if(!SetHandleInformation(errreadPipe, 1/*HANDLE_FLAG_INHERIT*/, 0))
|
||||
throw new WindowsApiException("SetHandleInformation", GetLastError());
|
||||
|
||||
startupInfo.cb = startupInfo.sizeof;
|
||||
startupInfo.dwFlags = STARTF_USESTDHANDLES;
|
||||
startupInfo.hStdInput = inreadPipe;
|
||||
startupInfo.hStdOutput = outwritePipe;
|
||||
startupInfo.hStdError = errwritePipe;
|
||||
|
||||
auto result = CreateProcessW(
|
||||
program.ptr,
|
||||
cmdLine.ptr,
|
||||
null, // process attributes
|
||||
null, // thread attributes
|
||||
true, // inherit handles; necessary for the std in/out/err ones to work
|
||||
0, // dwCreationFlags FIXME might be useful to change
|
||||
null, // environment, might be worth changing
|
||||
null, // current directory
|
||||
&startupInfo,
|
||||
&pi
|
||||
);
|
||||
|
||||
if(!result)
|
||||
throw new WindowsApiException("CreateProcessW", GetLastError());
|
||||
|
||||
_stdin = new AsyncFile(inwritePipe);
|
||||
_stdout = new AsyncFile(outreadPipe);
|
||||
_stderr = new AsyncFile(errreadPipe);
|
||||
|
||||
Win32Enforce!CloseHandle(inreadPipe);
|
||||
Win32Enforce!CloseHandle(outwritePipe);
|
||||
Win32Enforce!CloseHandle(errwritePipe);
|
||||
|
||||
Win32Enforce!CloseHandle(pi.hThread);
|
||||
|
||||
handle = pi.hProcess;
|
||||
|
||||
procRegistration = getThisThreadEventLoop.addCallbackOnHandleReady(handle, new CallbackHelper(&almostComplete));
|
||||
}
|
||||
}
|
||||
|
||||
private version(Posix) {
|
||||
version(Windows) {
|
||||
private HANDLE handle;
|
||||
private FilePath program;
|
||||
private string commandLine;
|
||||
private ICoreEventLoop.UnregisterToken procRegistration;
|
||||
|
||||
private final void almostComplete() {
|
||||
// GetProcessTimes lol
|
||||
Win32Enforce!GetExitCodeProcess(handle, cast(uint*) &_status);
|
||||
|
||||
markComplete(_status);
|
||||
|
||||
procRegistration.unregister();
|
||||
CloseHandle(handle);
|
||||
this.completed = true;
|
||||
}
|
||||
} else version(Posix) {
|
||||
import core.sys.posix.unistd;
|
||||
import core.sys.posix.fcntl;
|
||||
|
||||
int stdinFd = -1;
|
||||
int stdoutFd = -1;
|
||||
int stderrFd = -1;
|
||||
|
||||
ICoreEventLoop.UnregisterToken stdoutUnregisterToken;
|
||||
ICoreEventLoop.UnregisterToken stderrUnregisterToken;
|
||||
|
||||
pid_t pid = -1;
|
||||
private pid_t pid = -1;
|
||||
|
||||
public void delegate() beforeExec;
|
||||
|
||||
FilePath program;
|
||||
string[] args;
|
||||
|
||||
void stdoutReadable() {
|
||||
if(stdoutReadBuffer is null)
|
||||
stdoutReadBuffer = new ubyte[](stdoutBufferSize);
|
||||
auto ret = read(stdoutFd, stdoutReadBuffer.ptr, stdoutReadBuffer.length);
|
||||
if(ret == -1)
|
||||
throw new ErrnoApiException("read", errno);
|
||||
if(onStdoutAvailable) {
|
||||
onStdoutAvailable(stdoutReadBuffer[0 .. ret]);
|
||||
}
|
||||
|
||||
if(ret == 0) {
|
||||
stdoutUnregisterToken.unregister();
|
||||
|
||||
close(stdoutFd);
|
||||
stdoutFd = -1;
|
||||
}
|
||||
}
|
||||
|
||||
void stderrReadable() {
|
||||
if(stderrReadBuffer is null)
|
||||
stderrReadBuffer = new ubyte[](stderrBufferSize);
|
||||
auto ret = read(stderrFd, stderrReadBuffer.ptr, stderrReadBuffer.length);
|
||||
if(ret == -1)
|
||||
throw new ErrnoApiException("read", errno);
|
||||
if(onStderrAvailable) {
|
||||
onStderrAvailable(stderrReadBuffer[0 .. ret]);
|
||||
}
|
||||
|
||||
if(ret == 0) {
|
||||
stderrUnregisterToken.unregister();
|
||||
|
||||
close(stderrFd);
|
||||
stderrFd = -1;
|
||||
}
|
||||
}
|
||||
private FilePath program;
|
||||
private string[] args;
|
||||
}
|
||||
|
||||
private ubyte[] stdoutReadBuffer;
|
||||
private ubyte[] stderrReadBuffer;
|
||||
private final void markComplete(int status) {
|
||||
completed = true;
|
||||
_status = status;
|
||||
|
||||
if(oncomplete)
|
||||
oncomplete(this);
|
||||
}
|
||||
|
||||
|
||||
private AsyncFile _stdin;
|
||||
private AsyncFile _stdout;
|
||||
private AsyncFile _stderr;
|
||||
|
||||
/++
|
||||
|
||||
+/
|
||||
AsyncFile stdin() {
|
||||
return _stdin;
|
||||
}
|
||||
/// ditto
|
||||
AsyncFile stdout() {
|
||||
return _stdout;
|
||||
}
|
||||
/// ditto
|
||||
AsyncFile stderr() {
|
||||
return _stderr;
|
||||
}
|
||||
|
||||
/++
|
||||
+/
|
||||
void waitForCompletion() {
|
||||
getThisThreadEventLoop().run(&this.isComplete);
|
||||
}
|
||||
|
||||
/++
|
||||
+/
|
||||
bool isComplete() {
|
||||
return completed;
|
||||
}
|
||||
|
||||
bool completed;
|
||||
int status = int.min;
|
||||
private bool completed;
|
||||
private int _status = int.min;
|
||||
|
||||
/++
|
||||
If blocking, it will block the current task until the write succeeds.
|
||||
|
||||
Write `null` as data to close the pipe. Once the pipe is closed, you must not try to write to it again.
|
||||
+/
|
||||
void writeToStdin(in void[] data) {
|
||||
version(Posix) {
|
||||
if(data is null) {
|
||||
close(stdinFd);
|
||||
stdinFd = -1;
|
||||
} else {
|
||||
// FIXME: check the return value again and queue async writes
|
||||
auto ret = write(stdinFd, data.ptr, data.length);
|
||||
if(ret == -1)
|
||||
throw new ErrnoApiException("write", errno);
|
||||
}
|
||||
}
|
||||
|
||||
int status() {
|
||||
return _status;
|
||||
}
|
||||
|
||||
void delegate(ubyte[] got) onStdoutAvailable;
|
||||
void delegate(ubyte[] got) onStderrAvailable;
|
||||
void delegate(int code) onTermination;
|
||||
// void delegate(int code) onTermination;
|
||||
|
||||
void delegate(ExternalProcess) oncomplete;
|
||||
|
||||
// pty?
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue