execute module

This commit is contained in:
Alexander Zhirov 2025-08-23 01:08:58 +03:00
parent 4dace7c53c
commit 6619bbffec
Signed by: alexander
GPG key ID: C8D8BE544A27C511

View file

@ -1,6 +1,6 @@
// Импортируем необходимые модули:
// - std.stdio: для ввода/вывода, включая writeln
// - std.process: для создания и управления дочерними процессами (spawnProcess, wait)
// - std.process: для создания и управления дочерними процессами (spawnProcess, wait, execute)
// - core.thread: для работы с потоками (Thread, sleep)
// - core.sync.mutex: для мьютекса (Mutex) для синхронизации доступа к общим ресурсам
// - core.sync.condition: для условия (Condition) для уведомлений между потоками
@ -15,6 +15,7 @@ import std.file;
import std.path;
// Определяем класс Queue для реализации потокобезопасной FIFO-очереди.
// Используем класс вместо структуры, чтобы избежать проблем с shared и конструкторами в D.
// T - это шаблонный тип элементов очереди (в нашем случае string).
class Queue(T)
{
@ -25,17 +26,23 @@ class Queue(T)
private Mutex mutex;
// - cond: условие для уведомления потоков о изменениях в очереди (например, о добавлении элемента)
private Condition cond;
// - done: флаг завершения добавления элементов в очередь
// Этот флаг используется для сигнала потребителю, что больше элементов не будет добавлено,
// и позволяет избежать использования специального значения (как пустая строка) для остановки.
private bool done;
// Конструктор класса.
// Инициализирует мьютекс и условие.
// Инициализирует мьютекс, условие и флаг done (по умолчанию false, т.е. добавление еще возможно).
this()
{
mutex = new Mutex();
cond = new Condition(mutex);
done = false;
}
// Метод для добавления элемента в очередь (enqueue).
// Синхронизирован с помощью мьютекса для потокобезопасности.
// Этот метод добавляет элемент в конец очереди и уведомляет ожидающие потоки.
void enqueue(T item)
{
// Захватываем мьютекс для эксклюзивного доступа.
@ -49,24 +56,45 @@ class Queue(T)
}
// Метод для извлечения элемента из очереди (dequeue).
// Блокируется, если очередь пуста, до тех пор, пока не появится элемент.
T dequeue()
// Блокируется, если очередь пуста, до тех пор, пока не появится элемент или не будет установлен флаг done.
// Возвращает true, если элемент успешно извлечен, false - если очередь завершена (done и пуста).
// Параметр item передается по out, чтобы вернуть извлеченный элемент.
bool dequeue(out T item)
{
// Захватываем мьютекс для эксклюзивного доступа.
synchronized (mutex)
{
// Ждем, пока очередь не станет не пустой.
while (elements.length == 0)
// Ждем, пока очередь не станет не пустой или не будет установлен done.
// Это предотвращает бесконечное ожидание, если больше элементов не добавляется.
while (elements.length == 0 && !done)
{
// Ожидаем уведомления от cond (от enqueue).
// Ожидаем уведомления от cond (от enqueue или finish).
cond.wait();
}
// Если очередь пуста и done установлен, возвращаем false.
// Это сигнал, что обработка завершена.
if (elements.length == 0 && done)
{
return false;
}
// Извлекаем первый элемент (FIFO).
T item = elements[0];
item = elements[0];
// Удаляем его из массива, сдвигая остальные.
elements = elements[1 .. $];
// Возвращаем извлеченный элемент.
return item;
// Возвращаем true, указывая на успешное извлечение.
return true;
}
}
// Метод для сигнала завершения добавления элементов.
// Устанавливает флаг done и уведомляет ожидающие потоки.
// Это позволяет потребителю выйти из цикла без специального значения в очереди.
void finish()
{
synchronized (mutex)
{
done = true;
cond.notify();
}
}
@ -87,6 +115,7 @@ class Queue(T)
void main()
{
// Определяем массив слов для обработки.
// Эти слова будут добавлены в очередь для последовательной обработки.
string[] words = ["hello", "world", "foo", "bar", "baz", "qux"];
// Получаем директорию исполняемого файла.
string exeDir = thisExePath.dirName;
@ -99,40 +128,49 @@ void main()
// Создаем поток-потребитель (consumerThread).
// Этот поток будет извлекать слова из очереди и запускать bash-процессы последовательно.
auto consumerThread = new Thread({
// Бесконечный цикл, пока не получим сигнал завершения (пустая строка).
while (true)
// Локальная переменная для хранения извлеченного слова.
string word;
// Цикл, продолжающийся до тех пор, пока dequeue возвращает true.
// Когда dequeue вернет false (очередь пуста и done=true), цикл завершится.
while (q.dequeue(word))
{
// Извлекаем слово из очереди (блокируется, если очередь пуста).
string word = q.dequeue();
// Если слово пустое, это сигнал завершения - выходим из цикла.
if (word == "")
// Запускаем и ждем завершения дочернего процесса с помощью execute.
// execute запускает процесс, ждет его завершения и захватывает вывод (stdout + stderr).
// В отличие от spawnProcess + wait, execute возвращает кортеж с кодом выхода и выводом.
// Здесь мы игнорируем вывод, но при необходимости можно проверить res.status и res.output.
// Это упрощает код, так как не нужно отдельно вызывать wait.
auto res = execute([script, word]);
// Опционально: проверка на ошибку
if (res.status != 0)
{
break;
writeln("Ошибка выполнения скрипта для слова '", word, "': ", res.output);
}
else
{
res.output.write;
}
// Запускаем дочерний процесс: выполнение скрипта с аргументом word.
// Это выполняется последовательно, один за другим, без параллелизма дочерних процессов.
auto pid = spawnProcess([script, word]);
// Ждем завершения процесса (wait), чтобы следующий не запускался параллельно.
wait(pid);
}
});
// Запускаем поток-потребитель.
consumerThread.start();
// Основной цикл: добавляем слова в очередь с задержкой в 500 миллисекунд.
// Это позволяет добавлять слова быстрее, чем они обрабатываются потребителем.
// Основной цикл: добавляем слова в очередь с задержкой в 500 мс.
// Это позволяет добавлять слова быстрее, чем они обрабатываются потребителем,
// демонстрируя асинхронное добавление и последовательную обработку.
foreach (word; words)
{
// Выводим слово для логирования.
writeln(word);
// Добавляем слово в очередь.
q.enqueue(word);
// Задержка 500 миллисекунд.
// Задержка между добавлениями.
Thread.sleep(dur!"msecs"(500));
}
// После добавления всех слов отправляем сигнал завершения (пустая строка).
q.enqueue("");
// После добавления всех слов отправляем сигнал завершения через finish.
// Это устанавливает флаг done и уведомляет потребителя.
q.finish();
// Ждем завершения потока-потребителя.
consumerThread.join();