diff --git a/source/app.d b/source/app.d index fb20e0e..9b76eee 100644 --- a/source/app.d +++ b/source/app.d @@ -1,6 +1,6 @@ // Импортируем необходимые модули: // - std.stdio: для ввода/вывода, включая writeln -// - std.process: для создания и управления дочерними процессами (spawnProcess, wait) +// - std.process: для создания и управления дочерними процессами (spawnProcess, wait, execute) // - core.thread: для работы с потоками (Thread, sleep) // - core.sync.mutex: для мьютекса (Mutex) для синхронизации доступа к общим ресурсам // - core.sync.condition: для условия (Condition) для уведомлений между потоками @@ -15,6 +15,7 @@ import std.file; import std.path; // Определяем класс Queue для реализации потокобезопасной FIFO-очереди. +// Используем класс вместо структуры, чтобы избежать проблем с shared и конструкторами в D. // T - это шаблонный тип элементов очереди (в нашем случае string). class Queue(T) { @@ -25,17 +26,23 @@ class Queue(T) private Mutex mutex; // - cond: условие для уведомления потоков о изменениях в очереди (например, о добавлении элемента) private Condition cond; + // - done: флаг завершения добавления элементов в очередь + // Этот флаг используется для сигнала потребителю, что больше элементов не будет добавлено, + // и позволяет избежать использования специального значения (как пустая строка) для остановки. + private bool done; // Конструктор класса. - // Инициализирует мьютекс и условие. + // Инициализирует мьютекс, условие и флаг done (по умолчанию false, т.е. добавление еще возможно). this() { mutex = new Mutex(); cond = new Condition(mutex); + done = false; } // Метод для добавления элемента в очередь (enqueue). // Синхронизирован с помощью мьютекса для потокобезопасности. + // Этот метод добавляет элемент в конец очереди и уведомляет ожидающие потоки. void enqueue(T item) { // Захватываем мьютекс для эксклюзивного доступа. @@ -49,24 +56,45 @@ class Queue(T) } // Метод для извлечения элемента из очереди (dequeue). - // Блокируется, если очередь пуста, до тех пор, пока не появится элемент. - T dequeue() + // Блокируется, если очередь пуста, до тех пор, пока не появится элемент или не будет установлен флаг done. + // Возвращает true, если элемент успешно извлечен, false - если очередь завершена (done и пуста). + // Параметр item передается по out, чтобы вернуть извлеченный элемент. + bool dequeue(out T item) { // Захватываем мьютекс для эксклюзивного доступа. synchronized (mutex) { - // Ждем, пока очередь не станет не пустой. - while (elements.length == 0) + // Ждем, пока очередь не станет не пустой или не будет установлен done. + // Это предотвращает бесконечное ожидание, если больше элементов не добавляется. + while (elements.length == 0 && !done) { - // Ожидаем уведомления от cond (от enqueue). + // Ожидаем уведомления от cond (от enqueue или finish). cond.wait(); } + // Если очередь пуста и done установлен, возвращаем false. + // Это сигнал, что обработка завершена. + if (elements.length == 0 && done) + { + return false; + } // Извлекаем первый элемент (FIFO). - T item = elements[0]; + item = elements[0]; // Удаляем его из массива, сдвигая остальные. elements = elements[1 .. $]; - // Возвращаем извлеченный элемент. - return item; + // Возвращаем true, указывая на успешное извлечение. + return true; + } + } + + // Метод для сигнала завершения добавления элементов. + // Устанавливает флаг done и уведомляет ожидающие потоки. + // Это позволяет потребителю выйти из цикла без специального значения в очереди. + void finish() + { + synchronized (mutex) + { + done = true; + cond.notify(); } } @@ -87,6 +115,7 @@ class Queue(T) void main() { // Определяем массив слов для обработки. + // Эти слова будут добавлены в очередь для последовательной обработки. string[] words = ["hello", "world", "foo", "bar", "baz", "qux"]; // Получаем директорию исполняемого файла. string exeDir = thisExePath.dirName; @@ -99,40 +128,49 @@ void main() // Создаем поток-потребитель (consumerThread). // Этот поток будет извлекать слова из очереди и запускать bash-процессы последовательно. auto consumerThread = new Thread({ - // Бесконечный цикл, пока не получим сигнал завершения (пустая строка). - while (true) + // Локальная переменная для хранения извлеченного слова. + string word; + // Цикл, продолжающийся до тех пор, пока dequeue возвращает true. + // Когда dequeue вернет false (очередь пуста и done=true), цикл завершится. + while (q.dequeue(word)) { - // Извлекаем слово из очереди (блокируется, если очередь пуста). - string word = q.dequeue(); - // Если слово пустое, это сигнал завершения - выходим из цикла. - if (word == "") + // Запускаем и ждем завершения дочернего процесса с помощью execute. + // execute запускает процесс, ждет его завершения и захватывает вывод (stdout + stderr). + // В отличие от spawnProcess + wait, execute возвращает кортеж с кодом выхода и выводом. + // Здесь мы игнорируем вывод, но при необходимости можно проверить res.status и res.output. + // Это упрощает код, так как не нужно отдельно вызывать wait. + auto res = execute([script, word]); + // Опционально: проверка на ошибку + if (res.status != 0) { - break; + writeln("Ошибка выполнения скрипта для слова '", word, "': ", res.output); + } + else + { + res.output.write; } - // Запускаем дочерний процесс: выполнение скрипта с аргументом word. - // Это выполняется последовательно, один за другим, без параллелизма дочерних процессов. - auto pid = spawnProcess([script, word]); - // Ждем завершения процесса (wait), чтобы следующий не запускался параллельно. - wait(pid); } }); // Запускаем поток-потребитель. consumerThread.start(); - // Основной цикл: добавляем слова в очередь с задержкой в 500 миллисекунд. - // Это позволяет добавлять слова быстрее, чем они обрабатываются потребителем. + // Основной цикл: добавляем слова в очередь с задержкой в 500 мс. + // Это позволяет добавлять слова быстрее, чем они обрабатываются потребителем, + // демонстрируя асинхронное добавление и последовательную обработку. foreach (word; words) { + // Выводим слово для логирования. writeln(word); // Добавляем слово в очередь. q.enqueue(word); - // Задержка 500 миллисекунд. + // Задержка между добавлениями. Thread.sleep(dur!"msecs"(500)); } - // После добавления всех слов отправляем сигнал завершения (пустая строка). - q.enqueue(""); + // После добавления всех слов отправляем сигнал завершения через finish. + // Это устанавливает флаг done и уведомляет потребителя. + q.finish(); // Ждем завершения потока-потребителя. consumerThread.join();