rework background process handling

This commit is contained in:
Vadim Lopatin 2015-01-29 11:20:40 +03:00
parent 52040619c9
commit 25301d8c4e
2 changed files with 227 additions and 133 deletions

View File

@ -8,10 +8,11 @@ import std.algorithm;
import std.string;
import std.conv;
class Builder : BackgroundOperationWatcher, ProcessOutputTarget {
class Builder : BackgroundOperationWatcher {
protected Project _project;
protected ExternalProcess _extprocess;
protected OutputPanel _log;
protected ProtectedTextStorage _box;
@property Project project() { return _project; }
@property void project(Project p) { _project = p; }
@ -21,9 +22,11 @@ class Builder : BackgroundOperationWatcher, ProcessOutputTarget {
_project = project;
_log = log;
_extprocess = new ExternalProcess();
_box = new ProtectedTextStorage();
}
/// log lines
override void onText(dstring text) {
void pollText() {
dstring text = _box.readText();
dstring[] lines = text.split('\n');
_log.addLogLines(null, lines);
}
@ -32,11 +35,18 @@ class Builder : BackgroundOperationWatcher, ProcessOutputTarget {
override @property string icon() { return "folder"; }
/// update background operation status
override void update() {
scope(exit)pollText();
if (_extprocess.state == ExternalProcessState.None) {
onText("Running dub\n"d);
_extprocess.run(cast(char[])"dub", cast(char[][])["build"], cast(char[])_project.dir, this, null);
_box.writeText("Running dub\n"d);
char[] program = "dub".dup;
char[][] params;
char[] dir = _project.dir.dup;
params ~= "build".dup;
params ~= "-v".dup;
params ~= "--force".dup;
_extprocess.run(program, params, dir, _box, null);
if (_extprocess.state != ExternalProcessState.Running) {
onText("Failed to run builder tool");
_box.writeText("Failed to run builder tool\n"d);
_finished = true;
destroy(_extprocess);
_extprocess = null;
@ -45,7 +55,7 @@ class Builder : BackgroundOperationWatcher, ProcessOutputTarget {
}
ExternalProcessState state = _extprocess.poll();
if (state == ExternalProcessState.Stopped) {
onText("Builder finished with result "d ~ to!dstring(_extprocess.result) ~ "\n"d);
_box.writeText("Builder finished with result "d ~ to!dstring(_extprocess.result) ~ "\n"d);
_finished = true;
return;
}

View File

@ -5,11 +5,69 @@ import dlangui.core.logger;
import std.process;
import std.file;
import std.utf;
import std.stdio;
import core.thread;
import core.sync.mutex;
/// interface to forward process output to
interface ProcessOutputTarget {
interface TextWriter {
/// log lines
void onText(dstring text);
void writeText(dstring text);
}
/// interface to read text
interface TextReader {
/// log lines
dstring readText();
}
/// protected text storage box to read and write text from different threads
class ProtectedTextStorage : TextReader, TextWriter {
private Mutex _mutex;
private shared bool _closed;
private dchar[] _buffer;
this() {
_mutex = new Mutex();
}
@property bool closed() { return _closed; }
void close() {
if (_closed)
return;
_closed = true;
_buffer = null;
}
/// log lines
override void writeText(dstring text) {
if (!_closed) {
// if not closed
_mutex.lock();
scope(exit) _mutex.unlock();
// append text
_buffer ~= text;
}
}
/// log lines
override dstring readText() {
if (!_closed) {
// if not closed
_mutex.lock();
scope(exit) _mutex.unlock();
if (!_buffer.length)
return null;
dstring res = _buffer.dup;
_buffer = null;
return res;
} else {
// reading from closed
return null;
}
}
}
enum ExternalProcessState : uint {
@ -25,6 +83,135 @@ enum ExternalProcessState : uint {
Error
}
/// base class for text reading from std.stdio.File in background thread
class BackgroundReaderBase : Thread {
private std.stdio.File _file;
private shared bool _finished;
private ubyte[1] _byteBuffer;
private ubyte[] _bytes;
dchar[] _textbuffer;
private int _len;
private bool _utfError;
this(std.stdio.File f) {
super(&run);
assert(f.isOpen());
_file = f;
_len = 0;
_finished = false;
}
@property bool finished() {
return _finished;
}
void addByte(ubyte data) {
if (_bytes.length < _len + 1)
_bytes.length = _bytes.length ? _bytes.length * 2 : 1024;
ubyte prevchar = _len > 0 ? _bytes[_len - 1] : 0;
_bytes[_len++] = data;
bool eolchar = (data == '\r' || data == '\n');
bool preveol = (prevchar == '\r' || prevchar == '\n');
if (eolchar || (!eolchar && preveol))
flush(_len);
}
void flush(int pos) {
if (!_len)
return;
if (_textbuffer.length < _len)
_textbuffer.length = _len + 256;
size_t count = 0;
for(size_t i = 0; i < _len;) {
dchar ch = 0;
if (_utfError) {
ch = _bytes[i++];
} else {
try {
ch = decode(cast(string)_bytes, i);
} catch (UTFException e) {
_utfError = true;
ch = _bytes[i++];
Log.d("non-unicode characters found in output of process");
}
}
_textbuffer[count++] = ch;
}
_len = 0;
if (!count)
return;
// fix line endings - must be '\n'
count = convertLineEndings(_textbuffer[0..count]);
// data is ready to send
if (count)
sendResult(_textbuffer[0..count].dup);
}
/// inplace convert line endings to unix format (\n)
size_t convertLineEndings(dchar[] text) {
size_t src = 0;
size_t dst = 0;
for(;src < text.length;) {
dchar ch = text[src++];
if (ch == '\n') {
if (src < text.length && text[src] == '\r')
src++;
text[dst++] = ch;
} else if (ch == '\r') {
if (src < text.length && text[src] == '\n')
src++;
text[dst++] = '\n';
} else {
text[dst++] = ch;
}
}
return dst;
}
protected void sendResult(dstring text) {
// override to deal with ready data
}
protected void handleFinish() {
// override to do something when thread is finishing
}
private void run() {
// read file by bytes
try {
for (;;) {
ubyte[] r = _file.rawRead(_byteBuffer);
if (!r.length)
break;
addByte(r[0]);
}
_file.close();
} catch (Exception e) {
Log.e("Exception occured while reading stream: ", e);
}
handleFinish();
_finished = true;
}
}
/// reader which sends output text to TextWriter (warning: call will be made from background thread)
class BackgroundReader : BackgroundReaderBase {
protected TextWriter _destination;
this(std.stdio.File f, TextWriter destination) {
super(f);
assert(destination);
_destination = destination;
}
override protected void sendResult(dstring text) {
// override to deal with ready data
_destination.writeText(text);
}
override protected void handleFinish() {
// remove link to destination to help GC
_destination = null;
}
}
/// runs external process, catches output, allows to stop
class ExternalProcess {
@ -32,8 +219,10 @@ class ExternalProcess {
protected char[] _workDir;
protected char[] _program;
protected string[string] _env;
protected ProcessOutputTarget _stdout;
protected ProcessOutputTarget _stderr;
protected TextWriter _stdout;
protected TextWriter _stderr;
protected BackgroundReader _stdoutReader;
protected BackgroundReader _stderrReader;
protected ProcessPipes _pipes;
protected ExternalProcessState _state;
@ -46,17 +235,15 @@ class ExternalProcess {
this() {
}
ExternalProcessState run(char[] program, char[][]args, char[] dir, ProcessOutputTarget stdoutTarget, ProcessOutputTarget stderrTarget = null) {
ExternalProcessState run(char[] program, char[][]args, char[] dir, TextWriter stdoutTarget, TextWriter stderrTarget = null) {
_state = ExternalProcessState.None;
_program = program;
_args = args;
_workDir = dir;
_stdout = stdoutTarget;
_stdoutBuffer.clear();
_stderrBuffer.clear();
_stderr = stderrTarget;
_result = 0;
assert(_stdout);
_stderr = stderrTarget;
Redirect redirect;
char[][] params;
params ~= _program;
@ -69,9 +256,13 @@ class ExternalProcess {
try {
_pipes = pipeProcess(params, redirect, _env, Config.none, _workDir);
_state = ExternalProcessState.Running;
_stdoutBuffer.init();
if (_stderr)
_stderrBuffer.init();
// start readers
_stdoutReader = new BackgroundReader(_pipes.stdout, _stdout);
_stdoutReader.start();
if (_stderr) {
_stderrReader = new BackgroundReader(_pipes.stderr, _stderr);
_stderrReader.start();
}
} catch (ProcessException e) {
Log.e("Cannot run program ", _program, " ", e);
} catch (std.stdio.StdioException e) {
@ -80,118 +271,13 @@ class ExternalProcess {
return _state;
}
static immutable READ_BUFFER_SIZE = 4096;
static class Buffer {
ubyte[] buffer;
ubyte[] bytes;
dchar[] textbuffer;
bool utfError;
size_t len;
void init() {
buffer = new ubyte[READ_BUFFER_SIZE];
bytes = new ubyte[READ_BUFFER_SIZE * 2];
textbuffer = new dchar[textbuffer.length];
utfError = false;
len = 0;
}
void addBytes(ubyte[] data) {
if (bytes.length < len + data.length)
bytes.length = bytes.length * 2 + len + data.length;
for(size_t i = 0; i < data.length; i++)
bytes[len++] = data[i];
}
size_t read(std.file.File file) {
size_t bytesRead = 0;
for (;;) {
ubyte[] readData = file.rawRead(buffer);
if (!readData.length)
break;
bytesRead += readData.length;
addBytes(readData);
bytes ~= readData;
}
return bytesRead;
}
/// inplace convert line endings to unix format (\n)
size_t convertLineEndings(dchar[] text) {
size_t src = 0;
size_t dst = 0;
for(;src < text.length;) {
dchar ch = text[src++];
if (ch == '\n') {
if (src < text.length && text[src] == '\r')
src++;
text[dst++] = ch;
} else if (ch == '\r') {
if (src < text.length && text[src] == '\n')
src++;
text[dst++] = '\n';
} else {
text[dst++] = ch;
}
}
return dst;
}
dstring text() {
if (textbuffer.length < len)
textbuffer.length = len;
size_t count = 0;
for(size_t i = 0; i < len;) {
dchar ch = 0;
if (utfError) {
ch = bytes[i++];
} else {
try {
ch = decode(cast(string)bytes, i);
} catch (UTFException e) {
utfError = true;
ch = bytes[i++];
Log.d("non-unicode characters found in output of process");
}
}
textbuffer[count++] = ch;
}
len = 0;
if (!count)
return null;
count = convertLineEndings(textbuffer[0..count]);
return textbuffer[0 .. count].dup;
}
void clear() {
buffer = null;
bytes = null;
textbuffer = null;
len = 0;
}
}
protected Buffer _stdoutBuffer;
protected Buffer _stderrBuffer;
protected bool poll(ProcessOutputTarget dst, std.file.File src, ref Buffer buffer) {
if (src.isOpen) {
buffer.read(src);
dstring s = buffer.text;
if (s)
dst.onText(s);
return true;
} else {
return false;
}
}
protected bool pollStreams() {
bool res = true;
try {
res = poll(_stdout, _pipes.stdout, _stdoutBuffer) && res;
if (_stderr)
res = poll(_stderr, _pipes.stderr, _stderrBuffer) && res;
} catch (Error e) {
Log.e("error occued while trying to poll streams for process ", _program);
res = false;
}
return res;
protected void waitForReadingCompletion() {
if (_stdoutReader && !_stdoutReader.finished)
_stdoutReader.join(false);
if (_stderrReader && !_stderrReader.finished)
_stderrReader.join(false);
_stdoutReader = null;
_stderrReader = null;
}
/// polls all available output from process streams
@ -199,16 +285,13 @@ class ExternalProcess {
bool res = true;
if (_state == ExternalProcessState.Error || _state == ExternalProcessState.None || _state == ExternalProcessState.Stopped)
return _state;
if (_state == ExternalProcessState.Running) {
res = pollStreams();
}
// check for process finishing
try {
auto pstate = std.process.tryWait(_pipes.pid);
if (pstate.terminated) {
pollStreams();
_state = ExternalProcessState.Stopped;
_result = pstate.status;
waitForReadingCompletion();
}
} catch (Exception e) {
Log.e("Exception while waiting for process ", _program);
@ -224,6 +307,7 @@ class ExternalProcess {
try {
_result = std.process.wait(_pipes.pid);
_state = ExternalProcessState.Stopped;
waitForReadingCompletion();
} catch (Exception e) {
Log.e("Exception while waiting for process ", _program);
_state = ExternalProcessState.Error;