From 644c1869a180a6b20e33f6c5ad6cf951a61d7271 Mon Sep 17 00:00:00 2001 From: "Adam D. Ruppe" Date: Thu, 13 Feb 2025 09:15:06 -0500 Subject: [PATCH] logger framework almost actually usable --- core.d | 393 ++++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 349 insertions(+), 44 deletions(-) diff --git a/core.d b/core.d index f1f3bc4..52f71ba 100644 --- a/core.d +++ b/core.d @@ -511,7 +511,7 @@ unittest { } /++ - Basically a Phobos SysTime but standing alone as a simple 6 4 bit integer (but wrapped) for compatibility with LimitedVariant. + Basically a Phobos SysTime but standing alone as a simple 64 bit integer (but wrapped) for compatibility with LimitedVariant. +/ struct SimplifiedUtcTimestamp { long timestamp; @@ -1579,8 +1579,6 @@ string toStringInternal(T)(T t) { char[32] buffer; static if(is(T : string)) return t; - else static if(is(T : long)) - return intToString(t, buffer[]).idup; else static if(is(T == enum)) { switch(t) { foreach(memberName; __traits(allMembers, T)) { @@ -1590,6 +1588,8 @@ string toStringInternal(T)(T t) { default: return ""; } + } else static if(is(T : long)) { + return intToString(t, buffer[]).idup; } else static if(is(T : const E[], E)) { string ret = "["; foreach(idx, e; t) { @@ -8053,11 +8053,11 @@ unittest { DO NOT USE THIS YET IT IS NOT FUNCTIONAL NOR STABLE - The arsd.core logger works differently than many in that it works as a ring buffer of objects that are consumed (or missed; buffer overruns are possible) by a different thread instead of as strings written to some file. + The arsd.core logger works differently than many in that it works as a ring buffer of objects that are consumed (or missed; buffer overruns are possible) by a different thread instead of strings written to some file. A library (or an application) defines a log source. They write to this source. - Applications then define log sinks, zero or more, which reads from various sources and does something with them. + Applications then define log listeners, zero or more, which reads from various sources and does something with them. Log calls, in this sense, are quite similar to asynchronous events that can be subscribed to by event handlers. The difference is events are generally not dropped - they might coalesce but are usually not just plain dropped in a buffer overrun - whereas logs can be. If the log consumer can't keep up, the details are just lost. The log producer will not wait for the consumer to catch up. @@ -8084,54 +8084,339 @@ unittest { +/ class LoggerOf(T, size_t bufferSize = 16) { private LoggedMessage!T[bufferSize] ring; - private uint writeBufferPosition; + private ulong writeBufferPosition; + + import core.sync.mutex; + import core.sync.condition; + + private Mutex mutex; + private Condition condition; + private bool active; + private int listenerCount; + + this() shared { + mutex = new shared Mutex(cast(LoggerOf) this); + condition = new shared Condition(mutex); + active = true; + } + + /++ + Closes the log channel and waits for all listeners to finish pending work before returning. + + Once the logger is closed, it cannot be used again. + + You should close any logger you attached listeners to before returning from `main()`. + +/ + void close() shared { + synchronized(this) { + active = false; + condition.notifyAll(); + + while(listenerCount > 0) { + condition.wait(); + } + } + } + + /++ + + Examples: + + --- + // to write all messages to the console + logger.addListener((message, missedMessageCount) { + writeln(message); + }); + --- + + --- + // to only write warnings and errors + logger.addListener((message, missedMessageCount) { + if(message.level >= LogLevel.warn) + writeln(message); + }); + --- + + --- + // to ignore messages from arsd.core + logger.addListener((message, missedMessageCount) { + if(message.sourceLocation.moduleName != "arsd.core") + writeln(message); + }); + --- + +/ + LogListenerController addListener(void delegate(LoggedMessage!T message, int missedMessages) dg) shared { + static class Listener : Thread, LogListenerController { + shared LoggerOf logger; + ulong readBufferPosition; + void delegate(LoggedMessage!T, int) dg; + + bool connected; + + import core.sync.event; + Event event; + + this(shared LoggerOf logger, void delegate(LoggedMessage!T msg, int) dg) { + this.dg = dg; + this.logger = logger; + this.connected = true; + this.isDaemon = true; + + auto us = cast(LoggerOf) logger; + synchronized(logger) + us.listenerCount++; + + event.initialize(true, false); + super(&run); + } + + void disconnect() { + this.connected = false; + } + + void run() { + auto us = cast(LoggerOf) logger; + /+ + // can't do this due to https://github.com/ldc-developers/ldc/issues/4837 + // so doing the try/catch below and putting this under it + scope(exit) { + synchronized(logger) { + us.listenerCount--; + logger.condition.notifyAll(); + } + // mark us as complete for other listeners waiting as well + event.set(); + } + +/ + + try { + + LoggedMessage!T[bufferSize] buffer; + do { + int missedMessages = 0; + long n; + synchronized(logger) { + while(logger.active && connected && logger.writeBufferPosition < readBufferPosition) { + logger.condition.wait(); + } + + n = us.writeBufferPosition - readBufferPosition; + if(n > bufferSize) { + // we missed something... + missedMessages = cast(int) (n - bufferSize); + readBufferPosition = us.writeBufferPosition - bufferSize; + n = bufferSize; + } + auto startPos = readBufferPosition % bufferSize; + auto endPos = us.writeBufferPosition % bufferSize; + if(endPos > startPos) { + buffer[0 .. n] = us.ring[startPos .. endPos]; + } else { + auto ourSplit = us.ring.length - startPos; + buffer[0 .. ourSplit] = us.ring[startPos .. $]; + buffer[ourSplit .. ourSplit + endPos] = us.ring[0 .. endPos]; + } + readBufferPosition = us.writeBufferPosition; + } + foreach(item; buffer[0 .. n]) { + if(!connected) + break; + dg(item, missedMessages); + missedMessages = 0; + } + } while(logger.active && connected); + + } catch(Throwable t) { + // i guess i could try to log the exception for other listeners to pick up... + + } + + synchronized(logger) { + us.listenerCount--; + logger.condition.notifyAll(); + } + // mark us as complete for other listeners waiting as well + event.set(); + + } + + void waitForCompletion() { + event.wait(); + } + } + + auto listener = new Listener(this, dg); + listener.start(); + + return listener; + } void log(LoggedMessage!T message) shared { synchronized(this) { auto unshared = cast() this; - unshared.ring[writeBufferPosition] = message; + unshared.ring[writeBufferPosition % bufferSize] = message; unshared.writeBufferPosition += 1; // import std.stdio; std.stdio.writeln(message); + condition.notifyAll(); } } - void log(LogLevel level, T message, SourceLocation sourceLocation = SourceLocation(__FILE__, __LINE__)) shared { - log(LoggedMessage!T(LogLevel.Info, sourceLocation, 0, message)); + /// ditto + void log(LogLevel level, T message, SourceLocation sourceLocation = SourceLocation(__MODULE__, __LINE__)) shared { + import core.stdc.time; + log(LoggedMessage!T(level, sourceLocation, SimplifiedUtcTimestamp.fromUnixTime(time(null)), Thread.getThis(), message)); } - void info(T message, SourceLocation sourceLocation = SourceLocation(__FILE__, __LINE__)) shared { - log(LogLevel.Info, message, sourceLocation); + /// ditto + void info(T message, SourceLocation sourceLocation = SourceLocation(__MODULE__, __LINE__)) shared { + log(LogLevel.info, message, sourceLocation); + } + /// ditto + void trace(T message, SourceLocation sourceLocation = SourceLocation(__MODULE__, __LINE__)) shared { + log(LogLevel.trace, message, sourceLocation); + } + /// ditto + void warn(T message, SourceLocation sourceLocation = SourceLocation(__MODULE__, __LINE__)) shared { + log(LogLevel.warn, message, sourceLocation); + } + /// ditto + void error(T message, SourceLocation sourceLocation = SourceLocation(__MODULE__, __LINE__)) shared { + log(LogLevel.error, message, sourceLocation); + } + + static if(is(T == GenericEmbeddableInterpolatedSequence)) { + pragma(inline, true) + final void info(T...)(InterpolationHeader header, T message, InterpolationFooter footer, SourceLocation sourceLocation = SourceLocation(__MODULE__, __LINE__)) shared { + log(LogLevel.info, GenericEmbeddableInterpolatedSequence(header, message, footer), sourceLocation); + } + pragma(inline, true) + final void trace(T...)(InterpolationHeader header, T message, InterpolationFooter footer, SourceLocation sourceLocation = SourceLocation(__MODULE__, __LINE__)) shared { + log(LogLevel.trace, GenericEmbeddableInterpolatedSequence(header, message, footer), sourceLocation); + } + pragma(inline, true) + final void warn(T...)(InterpolationHeader header, T message, InterpolationFooter footer, SourceLocation sourceLocation = SourceLocation(__MODULE__, __LINE__)) shared { + log(LogLevel.warn, GenericEmbeddableInterpolatedSequence(header, message, footer), sourceLocation); + } + pragma(inline, true) + final void error(T...)(InterpolationHeader header, T message, InterpolationFooter footer, SourceLocation sourceLocation = SourceLocation(__MODULE__, __LINE__)) shared { + log(LogLevel.error, GenericEmbeddableInterpolatedSequence(header, message, footer), sourceLocation); + } } } +/// ditto +interface LogListenerController { + /++ + Disconnects from the log producer as soon as possible, possibly leaving messages + behind in the log buffer. Once disconnected, the log listener will terminate + asynchronously and cannot be reused. Use [waitForCompletion] to block your thread + until the termination is complete. + +/ + void disconnect(); + + /++ + Waits for the listener to finish its pending work and terminate. You should call + [disconnect] first to make it start to exit. + +/ + void waitForCompletion(); +} + +/// ditto struct SourceLocation { - string file; + string moduleName; size_t line; } +/// ditto struct LoggedMessage(T) { LogLevel level; SourceLocation sourceLocation; - ulong timestamp; + SimplifiedUtcTimestamp timestamp; + Thread originatingThread; T message; - // process id? - // thread id? + // process id can be assumed by the listener, + // since it is always the same; logs are sent and received by the same process. + + string toString() { + string ret; + + ret ~= sourceLocation.moduleName; + ret ~= ":"; + ret ~= toStringInternal(sourceLocation.line); + ret ~= " "; + if(originatingThread) { + char[16] buffer; + ret ~= originatingThread.name.length ? originatingThread.name : intToString(cast(long) originatingThread.id, buffer, IntToStringArgs().withRadix(16)); + } + ret ~= "["; + ret ~= toStringInternal(level); + ret ~= "] "; + ret ~= timestamp.toString(); + ret ~= " "; + ret ~= message.toString(); + + return ret; + } // callstack? } -//mixin LoggerOf!GenericEmbeddableInterpolatedSequence GeisLogger; - +/// ditto enum LogLevel { - Info + trace, + info, + warn, + error, } -unittest { - auto logger = new shared LoggerOf!GenericEmbeddableInterpolatedSequence; - logger.info(GenericEmbeddableInterpolatedSequence(i"hello world")); +private shared(LoggerOf!GenericEmbeddableInterpolatedSequence) _commonLogger; +shared(LoggerOf!GenericEmbeddableInterpolatedSequence) logger() { + if(_commonLogger is null) { + synchronized { + if(_commonLogger is null) + _commonLogger = new shared LoggerOf!GenericEmbeddableInterpolatedSequence; + } + } + + return _commonLogger; } +/+ +// using this requires a newish compiler so we just uncomment when necessary +unittest { + void main() { + auto logger = logger;// new shared LoggerOf!GenericEmbeddableInterpolatedSequence; + LogListenerController l1; + l1 = logger.addListener((msg, missedCount) { + if(missedCount) + writeln("T1: missed ", missedCount); + writeln("T1:" ~msg.toString()); + //Thread.sleep(500.msecs); + //l1.disconnect(); + Thread.sleep(1.msecs); + }); + foreach(n; 0 .. 200) { + logger.info(i"hello world $n"); + if(n % 6 == 0) + Thread.sleep(1.msecs); + } + + logger.addListener((msg, missedCount) { + if(missedCount) writeln("T2 missed ", missedCount); + writeln("T2:" ~msg.toString()); + }); + + Thread.sleep(500.msecs); + l1.disconnect; + l1.waitForCompletion; + + logger.close(); + } + //main; +} ++/ + /+ ===================== TRANSLATION FRAMEWORK @@ -8349,27 +8634,6 @@ struct GenericEmbeddableInterpolatedSequence { } } -private struct LoggedElement(T) { - LogLevel level; // ? - MonoTime timestamp; - void*[16] stack; // ? - string originComponent; - string originFile; - size_t originLine; - - T message; -} - -private class TypeErasedLogger { - ubyte[] buffer; - - void*[] messagePointers; - size_t position; -} - - - - /+ ================= STDIO REPLACEMENT @@ -8398,20 +8662,57 @@ private void appendToBuffer(ref char[] buffer, ref int pos, long what) { This always does text. See also WritableStream and WritableTextStream when they are implemented. +/ -void writeln(T...)(T t) { +void writeln(bool printInterpolatedCode = false, T...)(T t) { char[256] bufferBacking; char[] buffer = bufferBacking[]; int pos; foreach(arg; t) { - static if(is(typeof(arg) : const char[])) { + static if(is(typeof(arg) Base == enum)) { + appendToBuffer(buffer, pos, typeof(arg).stringof); + appendToBuffer(buffer, pos, "."); + appendToBuffer(buffer, pos, toStringInternal(arg)); + appendToBuffer(buffer, pos, "("); + appendToBuffer(buffer, pos, cast(Base) arg); + appendToBuffer(buffer, pos, ")"); + } else static if(is(typeof(arg) : const char[])) { appendToBuffer(buffer, pos, arg); } else static if(is(typeof(arg) : stringz)) { appendToBuffer(buffer, pos, arg.borrow); } else static if(is(typeof(arg) : long)) { appendToBuffer(buffer, pos, arg); + } else static if(is(typeof(arg) : double)) { + import core.stdc.stdio; + char[128] fb; + auto count = snprintf(fb.ptr, fb.length, "%.4lf", arg); + + appendToBuffer(buffer, pos, fb[0 .. count]); + } else static if(is(typeof(arg) == InterpolatedExpression!code, string code)) { + static if(printInterpolatedCode) { + appendToBuffer(buffer, pos, code); + appendToBuffer(buffer, pos, " = "); + } } else static if(is(typeof(arg.toString()) : const char[])) { appendToBuffer(buffer, pos, arg.toString()); + } else static if(is(typeof(arg) A == struct)) { + appendToBuffer(buffer, pos, A.stringof); + appendToBuffer(buffer, pos, "("); + foreach(idx, item; arg.tupleof) { + if(idx) + appendToBuffer(buffer, pos, ", "); + appendToBuffer(buffer, pos, __traits(identifier, arg.tupleof[idx])); + appendToBuffer(buffer, pos, ": "); + appendToBuffer(buffer, pos, item); + } + appendToBuffer(buffer, pos, ")"); + } else static if(is(typeof(arg) == E[], E)) { + appendToBuffer(buffer, pos, "["); + foreach(idx, item; arg) { + if(idx) + appendToBuffer(buffer, pos, ", "); + appendToBuffer(buffer, pos, item); + } + appendToBuffer(buffer, pos, "]"); } else { appendToBuffer(buffer, pos, "<" ~ typeof(arg).stringof ~ ">"); } @@ -8422,6 +8723,10 @@ void writeln(T...)(T t) { actuallyWriteToStdout(buffer[0 .. pos]); } +debug void dump(T...)(T t, string file = __FILE__, size_t line = __LINE__) { + writeln!true(file, ":", line, ": ", t); +} + private void actuallyWriteToStdout(scope char[] buffer) @trusted { version(UseStdioWriteln)