mirror of https://github.com/adamdruppe/arsd.git
add ipv6 only option for more predictable behavior
This commit is contained in:
parent
682ddd0563
commit
7854767903
2
cgi.d
2
cgi.d
|
@ -6030,6 +6030,8 @@ Socket startListening(string host, ushort port, ref bool tcp, ref void delegate(
|
||||||
}
|
}
|
||||||
cloexec(listener);
|
cloexec(listener);
|
||||||
listener.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true);
|
listener.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true);
|
||||||
|
if(cast(Internet6Address) address)
|
||||||
|
listener.setOption(SocketOptionLevel.SOCKET, SocketOption.IPV6_V6ONLY, true);
|
||||||
listener.bind(address);
|
listener.bind(address);
|
||||||
cleanup = delegate() {
|
cleanup = delegate() {
|
||||||
listener.close();
|
listener.close();
|
||||||
|
|
187
core.d
187
core.d
|
@ -61,7 +61,9 @@ version(Windows) {
|
||||||
} else version(linux) {
|
} else version(linux) {
|
||||||
version=Arsd_core_epoll;
|
version=Arsd_core_epoll;
|
||||||
|
|
||||||
version=Arsd_core_has_cloexec;
|
static if(__VERSION__ >= 2098) {
|
||||||
|
version=Arsd_core_has_cloexec;
|
||||||
|
}
|
||||||
} else version(FreeBSD) {
|
} else version(FreeBSD) {
|
||||||
version=Arsd_core_kqueue;
|
version=Arsd_core_kqueue;
|
||||||
|
|
||||||
|
@ -2561,6 +2563,8 @@ class AsyncFile : AbstractFile {
|
||||||
Reads or writes a file in one call. It might internally yield, but is generally blocking if it returns values. The callback ones depend on the implementation.
|
Reads or writes a file in one call. It might internally yield, but is generally blocking if it returns values. The callback ones depend on the implementation.
|
||||||
|
|
||||||
Tip: prefer the callback ones. If settings where async is possible, it will do async, and if not, it will sync.
|
Tip: prefer the callback ones. If settings where async is possible, it will do async, and if not, it will sync.
|
||||||
|
|
||||||
|
NOT IMPLEMENTED
|
||||||
+/
|
+/
|
||||||
void writeFile(string filename, const(void)[] contents) {
|
void writeFile(string filename, const(void)[] contents) {
|
||||||
|
|
||||||
|
@ -3123,6 +3127,9 @@ class AsyncSocket : AsyncFile {
|
||||||
setCloExec(handle);
|
setCloExec(handle);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int opt = 1;
|
||||||
|
setsockopt(handle, SOL_SOCKET, IPV6_V6ONLY, &opt, opt.sizeof);
|
||||||
|
|
||||||
// FIXME: chekc for broadcast
|
// FIXME: chekc for broadcast
|
||||||
|
|
||||||
// FIXME: REUSEADDR ?
|
// FIXME: REUSEADDR ?
|
||||||
|
@ -4316,11 +4323,158 @@ class AsyncReadResponse : AsyncOperationResponse {
|
||||||
runHelperFunction() - whomever it reports to is the parent
|
runHelperFunction() - whomever it reports to is the parent
|
||||||
+/
|
+/
|
||||||
|
|
||||||
/+
|
class ScheduableTask : Fiber {
|
||||||
class Task : Fiber {
|
private void delegate() dg;
|
||||||
|
|
||||||
|
// linked list stuff
|
||||||
|
private static ScheduableTask taskRoot;
|
||||||
|
private ScheduableTask previous;
|
||||||
|
private ScheduableTask next;
|
||||||
|
|
||||||
|
// need the controlling thread to know how to wake it up if it receives a message
|
||||||
|
private Thread controllingThread;
|
||||||
|
|
||||||
|
// the api
|
||||||
|
|
||||||
|
this(void delegate() dg) {
|
||||||
|
assert(dg !is null);
|
||||||
|
|
||||||
|
this.dg = dg;
|
||||||
|
super(&taskRunner);
|
||||||
|
|
||||||
|
if(taskRoot !is null) {
|
||||||
|
this.next = taskRoot;
|
||||||
|
taskRoot.previous = this;
|
||||||
|
}
|
||||||
|
taskRoot = this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/+
|
||||||
|
enum BehaviorOnCtrlC {
|
||||||
|
ignore,
|
||||||
|
cancel,
|
||||||
|
deliverMessage
|
||||||
|
}
|
||||||
|
+/
|
||||||
|
|
||||||
|
private bool cancelled;
|
||||||
|
|
||||||
|
public void cancel() {
|
||||||
|
this.cancelled = true;
|
||||||
|
// if this is running, we can throw immediately
|
||||||
|
// otherwise if we're calling from an appropriate thread, we can call it immediately
|
||||||
|
// otherwise we need to queue a wakeup to its own thread.
|
||||||
|
// tbh we should prolly just queue it every time
|
||||||
|
}
|
||||||
|
|
||||||
|
private void taskRunner() {
|
||||||
|
try {
|
||||||
|
dg();
|
||||||
|
} catch(TaskCancelledException tce) {
|
||||||
|
// this space intentionally left blank;
|
||||||
|
// the purpose of this exception is to just
|
||||||
|
// let the fiber's destructors run before we
|
||||||
|
// let it die.
|
||||||
|
} catch(Throwable t) {
|
||||||
|
if(taskUncaughtException is null) {
|
||||||
|
throw t;
|
||||||
|
} else {
|
||||||
|
taskUncaughtException(t);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if(this is taskRoot) {
|
||||||
|
taskRoot = taskRoot.next;
|
||||||
|
if(taskRoot !is null)
|
||||||
|
taskRoot.previous = null;
|
||||||
|
} else {
|
||||||
|
assert(this.previous !is null);
|
||||||
|
assert(this.previous.next is this);
|
||||||
|
this.previous.next = this.next;
|
||||||
|
if(this.next !is null)
|
||||||
|
this.next.previous = this.previous;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/++
|
||||||
|
|
||||||
+/
|
+/
|
||||||
|
void delegate(Throwable t) taskUncaughtException;
|
||||||
|
|
||||||
|
/++
|
||||||
|
Gets an object that lets you control a schedulable task (which is a specialization of a fiber) and can be used in an `if` statement.
|
||||||
|
|
||||||
|
---
|
||||||
|
if(auto controller = inSchedulableTask()) {
|
||||||
|
controller.yieldUntilReadable(...);
|
||||||
|
}
|
||||||
|
---
|
||||||
|
|
||||||
|
History:
|
||||||
|
Added August 11, 2023 (dub v11.1)
|
||||||
|
+/
|
||||||
|
SchedulableTaskController inSchedulableTask() {
|
||||||
|
import core.thread.fiber;
|
||||||
|
|
||||||
|
if(auto fiber = Fiber.getThis) {
|
||||||
|
return SchedulableTaskController(cast(ScheduableTask) fiber);
|
||||||
|
}
|
||||||
|
|
||||||
|
return SchedulableTaskController(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// ditto
|
||||||
|
struct SchedulableTaskController {
|
||||||
|
private this(ScheduableTask fiber) {
|
||||||
|
this.fiber = fiber;
|
||||||
|
}
|
||||||
|
|
||||||
|
private ScheduableTask fiber;
|
||||||
|
|
||||||
|
/++
|
||||||
|
|
||||||
|
+/
|
||||||
|
bool opCast(T : bool)() {
|
||||||
|
return fiber !is null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/++
|
||||||
|
|
||||||
|
+/
|
||||||
|
version(Posix)
|
||||||
|
void yieldUntilReadable(NativeFileHandle handle) {
|
||||||
|
assert(fiber !is null);
|
||||||
|
|
||||||
|
auto cb = new CallbackHelper(() { fiber.call(); });
|
||||||
|
|
||||||
|
// FIXME: if the fd is already registered in this thread it can throw...
|
||||||
|
version(Windows)
|
||||||
|
auto rearmToken = getThisThreadEventLoop().addCallbackOnFdReadableOneShot(handle, cb);
|
||||||
|
else
|
||||||
|
auto rearmToken = getThisThreadEventLoop().addCallbackOnFdReadableOneShot(handle, cb);
|
||||||
|
|
||||||
|
// FIXME: this is only valid if the fiber is only ever going to run in this thread!
|
||||||
|
fiber.yield();
|
||||||
|
|
||||||
|
rearmToken.unregister();
|
||||||
|
|
||||||
|
// what if there are other messages, like a ctrl+c?
|
||||||
|
if(fiber.cancelled)
|
||||||
|
throw new TaskCancelledException();
|
||||||
|
}
|
||||||
|
|
||||||
|
version(Windows)
|
||||||
|
void yieldUntilSignaled(NativeFileHandle handle) {
|
||||||
|
// add it to the WaitForMultipleObjects thing w/ a cb
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class TaskCancelledException : object.Exception {
|
||||||
|
this() {
|
||||||
|
super("Task cancelled");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private class CoreWorkerThread : Thread {
|
private class CoreWorkerThread : Thread {
|
||||||
this(EventLoopType type) {
|
this(EventLoopType type) {
|
||||||
|
@ -4338,7 +4492,13 @@ private class CoreWorkerThread : Thread {
|
||||||
atomicOp!"-="(runningCount, 1);
|
atomicOp!"-="(runningCount, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
eventLoop.run(() => true);
|
eventLoop.run(() => cancelled);
|
||||||
|
}
|
||||||
|
|
||||||
|
private bool cancelled;
|
||||||
|
|
||||||
|
void cancel() {
|
||||||
|
cancelled = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
EventLoopType type;
|
EventLoopType type;
|
||||||
|
@ -4381,6 +4541,14 @@ private class CoreWorkerThread : Thread {
|
||||||
started = true;
|
started = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void cancelAll() {
|
||||||
|
foreach(runner; taskRunners)
|
||||||
|
runner.cancel();
|
||||||
|
foreach(runner; helperRunners)
|
||||||
|
runner.cancel();
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4406,6 +4574,7 @@ private int numberOfCpus() {
|
||||||
|
|
||||||
Its destructor runs the event loop then waits to for the workers to finish to clean them up.
|
Its destructor runs the event loop then waits to for the workers to finish to clean them up.
|
||||||
+/
|
+/
|
||||||
|
// FIXME: single instance?
|
||||||
struct ArsdCoreApplication {
|
struct ArsdCoreApplication {
|
||||||
private ICoreEventLoop impl;
|
private ICoreEventLoop impl;
|
||||||
|
|
||||||
|
@ -4436,21 +4605,25 @@ struct ArsdCoreApplication {
|
||||||
@disable new();
|
@disable new();
|
||||||
|
|
||||||
~this() {
|
~this() {
|
||||||
run();
|
if(!alreadyRun)
|
||||||
|
run();
|
||||||
exitApplication();
|
exitApplication();
|
||||||
waitForWorkersToExit(3000);
|
waitForWorkersToExit(3000);
|
||||||
}
|
}
|
||||||
|
|
||||||
void exitApplication() {
|
void exitApplication() {
|
||||||
|
CoreWorkerThread.cancelAll();
|
||||||
}
|
}
|
||||||
|
|
||||||
void waitForWorkersToExit(int timeoutMilliseconds) {
|
void waitForWorkersToExit(int timeoutMilliseconds) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private bool alreadyRun;
|
||||||
|
|
||||||
void run() {
|
void run() {
|
||||||
impl.run(() => true);
|
impl.run(() => false);
|
||||||
|
alreadyRun = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue