fifo-process/source/app.d
2025-08-23 00:52:34 +03:00

142 lines
6.3 KiB
D
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Импортируем необходимые модули:
// - std.stdio: для ввода/вывода, включая writeln
// - std.process: для создания и управления дочерними процессами (spawnProcess, wait)
// - core.thread: для работы с потоками (Thread, sleep)
// - core.sync.mutex: для мьютекса (Mutex) для синхронизации доступа к общим ресурсам
// - core.sync.condition: для условия (Condition) для уведомлений между потоками
// - std.file: для работы с файлами (используется косвенно через std.path)
// - std.path: для работы с путями к файлам (dirName, buildPath, buildNormalizedPath)
import std.stdio;
import std.process;
import core.thread;
import core.sync.mutex;
import core.sync.condition;
import std.file;
import std.path;
// Определяем класс Queue для реализации потокобезопасной FIFO-очереди.
// T - это шаблонный тип элементов очереди (в нашем случае string).
class Queue(T)
{
// Приватные поля:
// - elements: массив для хранения элементов очереди
private T[] elements;
// - mutex: мьютекс для синхронизации доступа к очереди
private Mutex mutex;
// - cond: условие для уведомления потоков о изменениях в очереди (например, о добавлении элемента)
private Condition cond;
// Конструктор класса.
// Инициализирует мьютекс и условие.
this()
{
mutex = new Mutex();
cond = new Condition(mutex);
}
// Метод для добавления элемента в очередь (enqueue).
// Синхронизирован с помощью мьютекса для потокобезопасности.
void enqueue(T item)
{
// Захватываем мьютекс для эксклюзивного доступа.
synchronized (mutex)
{
// Добавляем элемент в конец массива (FIFO).
elements ~= item;
// Уведомляем ожидающие потоки, что очередь изменилась (стала не пустой).
cond.notify();
}
}
// Метод для извлечения элемента из очереди (dequeue).
// Блокируется, если очередь пуста, до тех пор, пока не появится элемент.
T dequeue()
{
// Захватываем мьютекс для эксклюзивного доступа.
synchronized (mutex)
{
// Ждем, пока очередь не станет не пустой.
while (elements.length == 0)
{
// Ожидаем уведомления от cond (от enqueue).
cond.wait();
}
// Извлекаем первый элемент (FIFO).
T item = elements[0];
// Удаляем его из массива, сдвигая остальные.
elements = elements[1 .. $];
// Возвращаем извлеченный элемент.
return item;
}
}
// Метод для проверки, пуста ли очередь.
// Также синхронизирован для потокобезопасности.
bool isEmpty()
{
// Захватываем мьютекс.
synchronized (mutex)
{
// Проверяем длину массива.
return elements.length == 0;
}
}
}
// Главная функция программы.
void main()
{
// Определяем массив слов для обработки.
string[] words = ["hello", "world", "foo", "bar", "baz", "qux"];
// Получаем директорию исполняемого файла.
string exeDir = thisExePath.dirName;
// Строим путь к скрипту script.sh, предполагая, что он находится на уровень выше.
string script = exeDir.buildPath("../script.sh").buildNormalizedPath;
// Создаем экземпляр очереди для строк.
auto q = new Queue!string();
// Создаем поток-потребитель (consumerThread).
// Этот поток будет извлекать слова из очереди и запускать bash-процессы последовательно.
auto consumerThread = new Thread({
// Бесконечный цикл, пока не получим сигнал завершения (пустая строка).
while (true)
{
// Извлекаем слово из очереди (блокируется, если очередь пуста).
string word = q.dequeue();
// Если слово пустое, это сигнал завершения - выходим из цикла.
if (word == "")
{
break;
}
// Запускаем дочерний процесс: выполнение скрипта с аргументом word.
// Это выполняется последовательно, один за другим, без параллелизма дочерних процессов.
auto pid = spawnProcess([script, word]);
// Ждем завершения процесса (wait), чтобы следующий не запускался параллельно.
wait(pid);
}
});
// Запускаем поток-потребитель.
consumerThread.start();
// Основной цикл: добавляем слова в очередь с задержкой в 500 миллисекунд.
// Это позволяет добавлять слова быстрее, чем они обрабатываются потребителем.
foreach (word; words)
{
writeln(word);
// Добавляем слово в очередь.
q.enqueue(word);
// Задержка 500 миллисекунд.
Thread.sleep(dur!"msecs"(500));
}
// После добавления всех слов отправляем сигнал завершения (пустая строка).
q.enqueue("");
// Ждем завершения потока-потребителя.
consumerThread.join();
// Выводим сообщение о завершении.
writeln("Done.");
}