diff --git a/rpc.d b/rpc.d index 0b82abb..ec500c8 100644 --- a/rpc.d +++ b/rpc.d @@ -1,6 +1,13 @@ module arsd.rpc; -/+ //example usage +/* + FIXME: + 1) integrate with arsd.eventloop + 2) make it easy to use with other processes; pipe to a process and talk to it that way. perhaps with shared memory too? + 3) extend the serialization capabilities +*/ + +///+ //example usage interface ExampleNetworkFunctions { string sayHello(string name); int add(int a, int b); @@ -52,12 +59,20 @@ void main(string[] args) { client.structTest(S1(20, "cool!"), (a) { writeln(a.name, " -- ", a.number); }, null); client.die(delegate () { writeln("shouldn't happen"); }, delegate(a) { writeln(a); }); client.eventLoop(); + + /* + auto client = makeNetworkClient!(ExampleNetworkFunctions, false)("localhost", 5005); + writeln(client.sayHello("whoa")); + writeln(client.add(1, 2)); + client.die(); + writeln(client.add(1, 2)); + */ } else { auto server = new ExampleServer(5005); server.eventLoop(); } } -+/ +//+/ mixin template NetworkServer(Interface) { import std.socket; @@ -260,22 +275,27 @@ final public inout(ubyte)[] deserializeInto(T)(inout(ubyte)[] buffer, ref T s) { return buffer; } -mixin template NetworkClient(Interface) { +mixin template NetworkClient(Interface, bool useAsync = true) { private static string createClass() { // this doesn't actually inherit from the interface because // the return value needs to be handled async string code;// = `final class Class /*: ` ~ Interface.stringof ~ `*/ {`; code ~= "\n\timport std.socket;"; code ~= "\n\tprivate Socket socket;"; - code ~= "\n\tprivate void delegate(const(ubyte)[] buffer)[uint] onSuccesses;"; - code ~= "\n\tprivate void delegate(const(ubyte)[] buffer)[uint] onErrors;"; + if(useAsync) { + code ~= "\n\tprivate void delegate(const(ubyte)[] buffer)[uint] onSuccesses;"; + code ~= "\n\tprivate void delegate(const(ubyte)[] buffer)[uint] onErrors;"; + } code ~= "\n\tprivate uint lastSequenceNumber;"; code ~= q{ private this(string host, ushort port) { this.socket = new TcpSocket(); this.socket.connect(new InternetAddress(host, port)); } + }; + if(useAsync) + code ~= q{ final public void eventLoop() { ubyte[4096] buffer; bool open = true; @@ -326,10 +346,19 @@ mixin template NetworkClient(Interface) { code ~= "\n\tpublic:\n"; foreach(memIdx, member; __traits(allMembers, Interface)) { - alias mem = PassThrough!(__traits(getMember, Interface, member)); - code ~= "\t\tfinal void " ~ member ~ "("; - bool hadArgument = false; import std.traits; + alias mem = PassThrough!(__traits(getMember, Interface, member)); + string type; + if(useAsync) + type = "void"; + else { + static if(is(ReturnType!mem == void)) + type = "void"; + else + type = (ReturnType!mem).stringof; + } + code ~= "\t\tfinal "~type~" " ~ member ~ "("; + bool hadArgument = false; import std.conv; // arguments foreach(i, arg; ParameterTypeTuple!mem) { @@ -340,19 +369,22 @@ mixin template NetworkClient(Interface) { hadArgument = true; } - if(hadArgument) - code ~= ", "; + if(useAsync) { + if(hadArgument) + code ~= ", "; - static if(is(ReturnType!mem == void)) - code ~= "void delegate() onSuccess"; - else - code ~= "void delegate("~(ReturnType!mem).stringof~") onSuccess"; - code ~= ", "; - code ~= "void delegate(Throwable) onError"; + static if(is(ReturnType!mem == void)) + code ~= "void delegate() onSuccess"; + else + code ~= "void delegate("~(ReturnType!mem).stringof~") onSuccess"; + code ~= ", "; + code ~= "void delegate(Throwable) onError"; + } code ~= ") {\n"; + code ~= "auto seq = ++lastSequenceNumber;"; + if(useAsync) code ~= q{ #line 252 - auto seq = ++lastSequenceNumber; onSuccesses[seq] = (const(ubyte)[] buffer) { onSuccesses.remove(seq); onErrors.remove(seq); @@ -381,7 +413,9 @@ mixin template NetworkClient(Interface) { if(onError !is null) onError(t); }; + }; + code ~= q{ #line 283 ubyte[4096] bufferBase; auto buffer = bufferBase[12 .. $]; // leaving room for size, func number, and seq number @@ -402,10 +436,77 @@ mixin template NetworkClient(Interface) { // FIXME: what if it doesn't all send at once? code ~= "\t\t\tsocket.send(bufferBase[0 .. 12 + used]);\n"; //code ~= `writeln("sending ", bufferBase[0 .. 12 + used]);`; + + if(!useAsync) + code ~= q{ + ubyte[4096] dbuffer; + bool open = true; + static if(is(typeof(return) == void)) { + + } else + typeof(return) returned; + + auto gotNum = socket.receive(dbuffer); + if(gotNum == 0) { + open = false; + throw new Exception("connection closed"); + } + while(gotNum < 9) { + auto g2 = socket.receive(dbuffer[gotNum .. $]); + if(g2 == 0) { + open = false; + break; + } + gotNum += g2; + } + + auto got = dbuffer[0 .. gotNum]; + another: + uint length; + uint success; + got = deserializeInto(got, length); + got = deserializeInto(got, seq); + got = deserializeInto(got, success); + auto more = got[length .. $]; + + if(got.length >= length) { + if(success) { + /* + auto s = (seq in onSuccesses); + if(s !is null && *s !is null) + (*s)(got); + */ + static if(is(typeof(return) == void)) { + } else { + got = deserializeInto(got, returned); + } + } else { + /* + auto s = (seq in onErrors); + if(s !is null && *s !is null) + (*s)(got); + */ + auto t = new Throwable(""); + got = deserializeInto(got, t.msg); + got = deserializeInto(got, t.file); + got = deserializeInto(got, t.line); + throw t; + } + } + + if(more.length) { + got = more; + goto another; + } + static if(is(typeof(return) == void)) { + + } else + return returned; + }; + code ~= "}\n"; code ~= "\n"; } - //code ~= `}`; return code; } @@ -414,6 +515,14 @@ mixin template NetworkClient(Interface) { mixin(createClass()); } +auto makeNetworkClient(Interface, bool useAsync = true)(string host, ushort port) { + class Thing { + mixin NetworkClient!(Interface, useAsync); + } + + return new Thing(host, port); +} + // the protocol is: /*