sync stuff

This commit is contained in:
Adam D. Ruppe 2013-11-11 19:23:34 -05:00
parent 94c474597d
commit 88c19d7355
1 changed files with 127 additions and 18 deletions

145
rpc.d
View File

@ -1,6 +1,13 @@
module arsd.rpc;
/+ //example usage
/*
FIXME:
1) integrate with arsd.eventloop
2) make it easy to use with other processes; pipe to a process and talk to it that way. perhaps with shared memory too?
3) extend the serialization capabilities
*/
///+ //example usage
interface ExampleNetworkFunctions {
string sayHello(string name);
int add(int a, int b);
@ -52,12 +59,20 @@ void main(string[] args) {
client.structTest(S1(20, "cool!"), (a) { writeln(a.name, " -- ", a.number); }, null);
client.die(delegate () { writeln("shouldn't happen"); }, delegate(a) { writeln(a); });
client.eventLoop();
/*
auto client = makeNetworkClient!(ExampleNetworkFunctions, false)("localhost", 5005);
writeln(client.sayHello("whoa"));
writeln(client.add(1, 2));
client.die();
writeln(client.add(1, 2));
*/
} else {
auto server = new ExampleServer(5005);
server.eventLoop();
}
}
+/
//+/
mixin template NetworkServer(Interface) {
import std.socket;
@ -260,22 +275,27 @@ final public inout(ubyte)[] deserializeInto(T)(inout(ubyte)[] buffer, ref T s) {
return buffer;
}
mixin template NetworkClient(Interface) {
mixin template NetworkClient(Interface, bool useAsync = true) {
private static string createClass() {
// this doesn't actually inherit from the interface because
// the return value needs to be handled async
string code;// = `final class Class /*: ` ~ Interface.stringof ~ `*/ {`;
code ~= "\n\timport std.socket;";
code ~= "\n\tprivate Socket socket;";
code ~= "\n\tprivate void delegate(const(ubyte)[] buffer)[uint] onSuccesses;";
code ~= "\n\tprivate void delegate(const(ubyte)[] buffer)[uint] onErrors;";
if(useAsync) {
code ~= "\n\tprivate void delegate(const(ubyte)[] buffer)[uint] onSuccesses;";
code ~= "\n\tprivate void delegate(const(ubyte)[] buffer)[uint] onErrors;";
}
code ~= "\n\tprivate uint lastSequenceNumber;";
code ~= q{
private this(string host, ushort port) {
this.socket = new TcpSocket();
this.socket.connect(new InternetAddress(host, port));
}
};
if(useAsync)
code ~= q{
final public void eventLoop() {
ubyte[4096] buffer;
bool open = true;
@ -326,10 +346,19 @@ mixin template NetworkClient(Interface) {
code ~= "\n\tpublic:\n";
foreach(memIdx, member; __traits(allMembers, Interface)) {
alias mem = PassThrough!(__traits(getMember, Interface, member));
code ~= "\t\tfinal void " ~ member ~ "(";
bool hadArgument = false;
import std.traits;
alias mem = PassThrough!(__traits(getMember, Interface, member));
string type;
if(useAsync)
type = "void";
else {
static if(is(ReturnType!mem == void))
type = "void";
else
type = (ReturnType!mem).stringof;
}
code ~= "\t\tfinal "~type~" " ~ member ~ "(";
bool hadArgument = false;
import std.conv;
// arguments
foreach(i, arg; ParameterTypeTuple!mem) {
@ -340,19 +369,22 @@ mixin template NetworkClient(Interface) {
hadArgument = true;
}
if(hadArgument)
code ~= ", ";
if(useAsync) {
if(hadArgument)
code ~= ", ";
static if(is(ReturnType!mem == void))
code ~= "void delegate() onSuccess";
else
code ~= "void delegate("~(ReturnType!mem).stringof~") onSuccess";
code ~= ", ";
code ~= "void delegate(Throwable) onError";
static if(is(ReturnType!mem == void))
code ~= "void delegate() onSuccess";
else
code ~= "void delegate("~(ReturnType!mem).stringof~") onSuccess";
code ~= ", ";
code ~= "void delegate(Throwable) onError";
}
code ~= ") {\n";
code ~= "auto seq = ++lastSequenceNumber;";
if(useAsync)
code ~= q{
#line 252
auto seq = ++lastSequenceNumber;
onSuccesses[seq] = (const(ubyte)[] buffer) {
onSuccesses.remove(seq);
onErrors.remove(seq);
@ -381,7 +413,9 @@ mixin template NetworkClient(Interface) {
if(onError !is null)
onError(t);
};
};
code ~= q{
#line 283
ubyte[4096] bufferBase;
auto buffer = bufferBase[12 .. $]; // leaving room for size, func number, and seq number
@ -402,10 +436,77 @@ mixin template NetworkClient(Interface) {
// FIXME: what if it doesn't all send at once?
code ~= "\t\t\tsocket.send(bufferBase[0 .. 12 + used]);\n";
//code ~= `writeln("sending ", bufferBase[0 .. 12 + used]);`;
if(!useAsync)
code ~= q{
ubyte[4096] dbuffer;
bool open = true;
static if(is(typeof(return) == void)) {
} else
typeof(return) returned;
auto gotNum = socket.receive(dbuffer);
if(gotNum == 0) {
open = false;
throw new Exception("connection closed");
}
while(gotNum < 9) {
auto g2 = socket.receive(dbuffer[gotNum .. $]);
if(g2 == 0) {
open = false;
break;
}
gotNum += g2;
}
auto got = dbuffer[0 .. gotNum];
another:
uint length;
uint success;
got = deserializeInto(got, length);
got = deserializeInto(got, seq);
got = deserializeInto(got, success);
auto more = got[length .. $];
if(got.length >= length) {
if(success) {
/*
auto s = (seq in onSuccesses);
if(s !is null && *s !is null)
(*s)(got);
*/
static if(is(typeof(return) == void)) {
} else {
got = deserializeInto(got, returned);
}
} else {
/*
auto s = (seq in onErrors);
if(s !is null && *s !is null)
(*s)(got);
*/
auto t = new Throwable("");
got = deserializeInto(got, t.msg);
got = deserializeInto(got, t.file);
got = deserializeInto(got, t.line);
throw t;
}
}
if(more.length) {
got = more;
goto another;
}
static if(is(typeof(return) == void)) {
} else
return returned;
};
code ~= "}\n";
code ~= "\n";
}
//code ~= `}`;
return code;
}
@ -414,6 +515,14 @@ mixin template NetworkClient(Interface) {
mixin(createClass());
}
auto makeNetworkClient(Interface, bool useAsync = true)(string host, ushort port) {
class Thing {
mixin NetworkClient!(Interface, useAsync);
}
return new Thing(host, port);
}
// the protocol is:
/*