Compare commits

..

1 commit

Author SHA1 Message Date
6619bbffec
execute module 2025-08-23 01:08:58 +03:00

View file

@ -1,6 +1,6 @@
// Импортируем необходимые модули: // Импортируем необходимые модули:
// - std.stdio: для ввода/вывода, включая writeln // - std.stdio: для ввода/вывода, включая writeln
// - std.process: для создания и управления дочерними процессами (spawnProcess, wait) // - std.process: для создания и управления дочерними процессами (spawnProcess, wait, execute)
// - core.thread: для работы с потоками (Thread, sleep) // - core.thread: для работы с потоками (Thread, sleep)
// - core.sync.mutex: для мьютекса (Mutex) для синхронизации доступа к общим ресурсам // - core.sync.mutex: для мьютекса (Mutex) для синхронизации доступа к общим ресурсам
// - core.sync.condition: для условия (Condition) для уведомлений между потоками // - core.sync.condition: для условия (Condition) для уведомлений между потоками
@ -15,6 +15,7 @@ import std.file;
import std.path; import std.path;
// Определяем класс Queue для реализации потокобезопасной FIFO-очереди. // Определяем класс Queue для реализации потокобезопасной FIFO-очереди.
// Используем класс вместо структуры, чтобы избежать проблем с shared и конструкторами в D.
// T - это шаблонный тип элементов очереди (в нашем случае string). // T - это шаблонный тип элементов очереди (в нашем случае string).
class Queue(T) class Queue(T)
{ {
@ -25,17 +26,23 @@ class Queue(T)
private Mutex mutex; private Mutex mutex;
// - cond: условие для уведомления потоков о изменениях в очереди (например, о добавлении элемента) // - cond: условие для уведомления потоков о изменениях в очереди (например, о добавлении элемента)
private Condition cond; private Condition cond;
// - done: флаг завершения добавления элементов в очередь
// Этот флаг используется для сигнала потребителю, что больше элементов не будет добавлено,
// и позволяет избежать использования специального значения (как пустая строка) для остановки.
private bool done;
// Конструктор класса. // Конструктор класса.
// Инициализирует мьютекс и условие. // Инициализирует мьютекс, условие и флаг done (по умолчанию false, т.е. добавление еще возможно).
this() this()
{ {
mutex = new Mutex(); mutex = new Mutex();
cond = new Condition(mutex); cond = new Condition(mutex);
done = false;
} }
// Метод для добавления элемента в очередь (enqueue). // Метод для добавления элемента в очередь (enqueue).
// Синхронизирован с помощью мьютекса для потокобезопасности. // Синхронизирован с помощью мьютекса для потокобезопасности.
// Этот метод добавляет элемент в конец очереди и уведомляет ожидающие потоки.
void enqueue(T item) void enqueue(T item)
{ {
// Захватываем мьютекс для эксклюзивного доступа. // Захватываем мьютекс для эксклюзивного доступа.
@ -49,24 +56,45 @@ class Queue(T)
} }
// Метод для извлечения элемента из очереди (dequeue). // Метод для извлечения элемента из очереди (dequeue).
// Блокируется, если очередь пуста, до тех пор, пока не появится элемент. // Блокируется, если очередь пуста, до тех пор, пока не появится элемент или не будет установлен флаг done.
T dequeue() // Возвращает true, если элемент успешно извлечен, false - если очередь завершена (done и пуста).
// Параметр item передается по out, чтобы вернуть извлеченный элемент.
bool dequeue(out T item)
{ {
// Захватываем мьютекс для эксклюзивного доступа. // Захватываем мьютекс для эксклюзивного доступа.
synchronized (mutex) synchronized (mutex)
{ {
// Ждем, пока очередь не станет не пустой. // Ждем, пока очередь не станет не пустой или не будет установлен done.
while (elements.length == 0) // Это предотвращает бесконечное ожидание, если больше элементов не добавляется.
while (elements.length == 0 && !done)
{ {
// Ожидаем уведомления от cond (от enqueue). // Ожидаем уведомления от cond (от enqueue или finish).
cond.wait(); cond.wait();
} }
// Если очередь пуста и done установлен, возвращаем false.
// Это сигнал, что обработка завершена.
if (elements.length == 0 && done)
{
return false;
}
// Извлекаем первый элемент (FIFO). // Извлекаем первый элемент (FIFO).
T item = elements[0]; item = elements[0];
// Удаляем его из массива, сдвигая остальные. // Удаляем его из массива, сдвигая остальные.
elements = elements[1 .. $]; elements = elements[1 .. $];
// Возвращаем извлеченный элемент. // Возвращаем true, указывая на успешное извлечение.
return item; return true;
}
}
// Метод для сигнала завершения добавления элементов.
// Устанавливает флаг done и уведомляет ожидающие потоки.
// Это позволяет потребителю выйти из цикла без специального значения в очереди.
void finish()
{
synchronized (mutex)
{
done = true;
cond.notify();
} }
} }
@ -87,6 +115,7 @@ class Queue(T)
void main() void main()
{ {
// Определяем массив слов для обработки. // Определяем массив слов для обработки.
// Эти слова будут добавлены в очередь для последовательной обработки.
string[] words = ["hello", "world", "foo", "bar", "baz", "qux"]; string[] words = ["hello", "world", "foo", "bar", "baz", "qux"];
// Получаем директорию исполняемого файла. // Получаем директорию исполняемого файла.
string exeDir = thisExePath.dirName; string exeDir = thisExePath.dirName;
@ -99,40 +128,49 @@ void main()
// Создаем поток-потребитель (consumerThread). // Создаем поток-потребитель (consumerThread).
// Этот поток будет извлекать слова из очереди и запускать bash-процессы последовательно. // Этот поток будет извлекать слова из очереди и запускать bash-процессы последовательно.
auto consumerThread = new Thread({ auto consumerThread = new Thread({
// Бесконечный цикл, пока не получим сигнал завершения (пустая строка). // Локальная переменная для хранения извлеченного слова.
while (true) string word;
// Цикл, продолжающийся до тех пор, пока dequeue возвращает true.
// Когда dequeue вернет false (очередь пуста и done=true), цикл завершится.
while (q.dequeue(word))
{ {
// Извлекаем слово из очереди (блокируется, если очередь пуста). // Запускаем и ждем завершения дочернего процесса с помощью execute.
string word = q.dequeue(); // execute запускает процесс, ждет его завершения и захватывает вывод (stdout + stderr).
// Если слово пустое, это сигнал завершения - выходим из цикла. // В отличие от spawnProcess + wait, execute возвращает кортеж с кодом выхода и выводом.
if (word == "") // Здесь мы игнорируем вывод, но при необходимости можно проверить res.status и res.output.
// Это упрощает код, так как не нужно отдельно вызывать wait.
auto res = execute([script, word]);
// Опционально: проверка на ошибку
if (res.status != 0)
{ {
break; writeln("Ошибка выполнения скрипта для слова '", word, "': ", res.output);
}
else
{
res.output.write;
} }
// Запускаем дочерний процесс: выполнение скрипта с аргументом word.
// Это выполняется последовательно, один за другим, без параллелизма дочерних процессов.
auto pid = spawnProcess([script, word]);
// Ждем завершения процесса (wait), чтобы следующий не запускался параллельно.
wait(pid);
} }
}); });
// Запускаем поток-потребитель. // Запускаем поток-потребитель.
consumerThread.start(); consumerThread.start();
// Основной цикл: добавляем слова в очередь с задержкой в 500 миллисекунд. // Основной цикл: добавляем слова в очередь с задержкой в 500 мс.
// Это позволяет добавлять слова быстрее, чем они обрабатываются потребителем. // Это позволяет добавлять слова быстрее, чем они обрабатываются потребителем,
// демонстрируя асинхронное добавление и последовательную обработку.
foreach (word; words) foreach (word; words)
{ {
// Выводим слово для логирования.
writeln(word); writeln(word);
// Добавляем слово в очередь. // Добавляем слово в очередь.
q.enqueue(word); q.enqueue(word);
// Задержка 500 миллисекунд. // Задержка между добавлениями.
Thread.sleep(dur!"msecs"(500)); Thread.sleep(dur!"msecs"(500));
} }
// После добавления всех слов отправляем сигнал завершения (пустая строка). // После добавления всех слов отправляем сигнал завершения через finish.
q.enqueue(""); // Это устанавливает флаг done и уведомляет потребителя.
q.finish();
// Ждем завершения потока-потребителя. // Ждем завершения потока-потребителя.
consumerThread.join(); consumerThread.join();