fifo-process/source/app.d
2025-08-23 01:08:58 +03:00

180 lines
9.1 KiB
D
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Импортируем необходимые модули:
// - std.stdio: для ввода/вывода, включая writeln
// - std.process: для создания и управления дочерними процессами (spawnProcess, wait, execute)
// - core.thread: для работы с потоками (Thread, sleep)
// - core.sync.mutex: для мьютекса (Mutex) для синхронизации доступа к общим ресурсам
// - core.sync.condition: для условия (Condition) для уведомлений между потоками
// - std.file: для работы с файлами (используется косвенно через std.path)
// - std.path: для работы с путями к файлам (dirName, buildPath, buildNormalizedPath)
import std.stdio;
import std.process;
import core.thread;
import core.sync.mutex;
import core.sync.condition;
import std.file;
import std.path;
// Определяем класс Queue для реализации потокобезопасной FIFO-очереди.
// Используем класс вместо структуры, чтобы избежать проблем с shared и конструкторами в D.
// T - это шаблонный тип элементов очереди (в нашем случае string).
class Queue(T)
{
// Приватные поля:
// - elements: массив для хранения элементов очереди
private T[] elements;
// - mutex: мьютекс для синхронизации доступа к очереди
private Mutex mutex;
// - cond: условие для уведомления потоков о изменениях в очереди (например, о добавлении элемента)
private Condition cond;
// - done: флаг завершения добавления элементов в очередь
// Этот флаг используется для сигнала потребителю, что больше элементов не будет добавлено,
// и позволяет избежать использования специального значения (как пустая строка) для остановки.
private bool done;
// Конструктор класса.
// Инициализирует мьютекс, условие и флаг done (по умолчанию false, т.е. добавление еще возможно).
this()
{
mutex = new Mutex();
cond = new Condition(mutex);
done = false;
}
// Метод для добавления элемента в очередь (enqueue).
// Синхронизирован с помощью мьютекса для потокобезопасности.
// Этот метод добавляет элемент в конец очереди и уведомляет ожидающие потоки.
void enqueue(T item)
{
// Захватываем мьютекс для эксклюзивного доступа.
synchronized (mutex)
{
// Добавляем элемент в конец массива (FIFO).
elements ~= item;
// Уведомляем ожидающие потоки, что очередь изменилась (стала не пустой).
cond.notify();
}
}
// Метод для извлечения элемента из очереди (dequeue).
// Блокируется, если очередь пуста, до тех пор, пока не появится элемент или не будет установлен флаг done.
// Возвращает true, если элемент успешно извлечен, false - если очередь завершена (done и пуста).
// Параметр item передается по out, чтобы вернуть извлеченный элемент.
bool dequeue(out T item)
{
// Захватываем мьютекс для эксклюзивного доступа.
synchronized (mutex)
{
// Ждем, пока очередь не станет не пустой или не будет установлен done.
// Это предотвращает бесконечное ожидание, если больше элементов не добавляется.
while (elements.length == 0 && !done)
{
// Ожидаем уведомления от cond (от enqueue или finish).
cond.wait();
}
// Если очередь пуста и done установлен, возвращаем false.
// Это сигнал, что обработка завершена.
if (elements.length == 0 && done)
{
return false;
}
// Извлекаем первый элемент (FIFO).
item = elements[0];
// Удаляем его из массива, сдвигая остальные.
elements = elements[1 .. $];
// Возвращаем true, указывая на успешное извлечение.
return true;
}
}
// Метод для сигнала завершения добавления элементов.
// Устанавливает флаг done и уведомляет ожидающие потоки.
// Это позволяет потребителю выйти из цикла без специального значения в очереди.
void finish()
{
synchronized (mutex)
{
done = true;
cond.notify();
}
}
// Метод для проверки, пуста ли очередь.
// Также синхронизирован для потокобезопасности.
bool isEmpty()
{
// Захватываем мьютекс.
synchronized (mutex)
{
// Проверяем длину массива.
return elements.length == 0;
}
}
}
// Главная функция программы.
void main()
{
// Определяем массив слов для обработки.
// Эти слова будут добавлены в очередь для последовательной обработки.
string[] words = ["hello", "world", "foo", "bar", "baz", "qux"];
// Получаем директорию исполняемого файла.
string exeDir = thisExePath.dirName;
// Строим путь к скрипту script.sh, предполагая, что он находится на уровень выше.
string script = exeDir.buildPath("../script.sh").buildNormalizedPath;
// Создаем экземпляр очереди для строк.
auto q = new Queue!string();
// Создаем поток-потребитель (consumerThread).
// Этот поток будет извлекать слова из очереди и запускать bash-процессы последовательно.
auto consumerThread = new Thread({
// Локальная переменная для хранения извлеченного слова.
string word;
// Цикл, продолжающийся до тех пор, пока dequeue возвращает true.
// Когда dequeue вернет false (очередь пуста и done=true), цикл завершится.
while (q.dequeue(word))
{
// Запускаем и ждем завершения дочернего процесса с помощью execute.
// execute запускает процесс, ждет его завершения и захватывает вывод (stdout + stderr).
// В отличие от spawnProcess + wait, execute возвращает кортеж с кодом выхода и выводом.
// Здесь мы игнорируем вывод, но при необходимости можно проверить res.status и res.output.
// Это упрощает код, так как не нужно отдельно вызывать wait.
auto res = execute([script, word]);
// Опционально: проверка на ошибку
if (res.status != 0)
{
writeln("Ошибка выполнения скрипта для слова '", word, "': ", res.output);
}
else
{
res.output.write;
}
}
});
// Запускаем поток-потребитель.
consumerThread.start();
// Основной цикл: добавляем слова в очередь с задержкой в 500 мс.
// Это позволяет добавлять слова быстрее, чем они обрабатываются потребителем,
// демонстрируя асинхронное добавление и последовательную обработку.
foreach (word; words)
{
// Выводим слово для логирования.
writeln(word);
// Добавляем слово в очередь.
q.enqueue(word);
// Задержка между добавлениями.
Thread.sleep(dur!"msecs"(500));
}
// После добавления всех слов отправляем сигнал завершения через finish.
// Это устанавливает флаг done и уведомляет потребителя.
q.finish();
// Ждем завершения потока-потребителя.
consumerThread.join();
// Выводим сообщение о завершении.
writeln("Done.");
}