diff --git a/source/app.d b/source/app.d index 9b76eee..fb20e0e 100644 --- a/source/app.d +++ b/source/app.d @@ -1,6 +1,6 @@ // Импортируем необходимые модули: // - std.stdio: для ввода/вывода, включая writeln -// - std.process: для создания и управления дочерними процессами (spawnProcess, wait, execute) +// - std.process: для создания и управления дочерними процессами (spawnProcess, wait) // - core.thread: для работы с потоками (Thread, sleep) // - core.sync.mutex: для мьютекса (Mutex) для синхронизации доступа к общим ресурсам // - core.sync.condition: для условия (Condition) для уведомлений между потоками @@ -15,7 +15,6 @@ import std.file; import std.path; // Определяем класс Queue для реализации потокобезопасной FIFO-очереди. -// Используем класс вместо структуры, чтобы избежать проблем с shared и конструкторами в D. // T - это шаблонный тип элементов очереди (в нашем случае string). class Queue(T) { @@ -26,23 +25,17 @@ class Queue(T) private Mutex mutex; // - cond: условие для уведомления потоков о изменениях в очереди (например, о добавлении элемента) private Condition cond; - // - done: флаг завершения добавления элементов в очередь - // Этот флаг используется для сигнала потребителю, что больше элементов не будет добавлено, - // и позволяет избежать использования специального значения (как пустая строка) для остановки. - private bool done; // Конструктор класса. - // Инициализирует мьютекс, условие и флаг done (по умолчанию false, т.е. добавление еще возможно). + // Инициализирует мьютекс и условие. this() { mutex = new Mutex(); cond = new Condition(mutex); - done = false; } // Метод для добавления элемента в очередь (enqueue). // Синхронизирован с помощью мьютекса для потокобезопасности. - // Этот метод добавляет элемент в конец очереди и уведомляет ожидающие потоки. void enqueue(T item) { // Захватываем мьютекс для эксклюзивного доступа. @@ -56,45 +49,24 @@ class Queue(T) } // Метод для извлечения элемента из очереди (dequeue). - // Блокируется, если очередь пуста, до тех пор, пока не появится элемент или не будет установлен флаг done. - // Возвращает true, если элемент успешно извлечен, false - если очередь завершена (done и пуста). - // Параметр item передается по out, чтобы вернуть извлеченный элемент. - bool dequeue(out T item) + // Блокируется, если очередь пуста, до тех пор, пока не появится элемент. + T dequeue() { // Захватываем мьютекс для эксклюзивного доступа. synchronized (mutex) { - // Ждем, пока очередь не станет не пустой или не будет установлен done. - // Это предотвращает бесконечное ожидание, если больше элементов не добавляется. - while (elements.length == 0 && !done) + // Ждем, пока очередь не станет не пустой. + while (elements.length == 0) { - // Ожидаем уведомления от cond (от enqueue или finish). + // Ожидаем уведомления от cond (от enqueue). cond.wait(); } - // Если очередь пуста и done установлен, возвращаем false. - // Это сигнал, что обработка завершена. - if (elements.length == 0 && done) - { - return false; - } // Извлекаем первый элемент (FIFO). - item = elements[0]; + T item = elements[0]; // Удаляем его из массива, сдвигая остальные. elements = elements[1 .. $]; - // Возвращаем true, указывая на успешное извлечение. - return true; - } - } - - // Метод для сигнала завершения добавления элементов. - // Устанавливает флаг done и уведомляет ожидающие потоки. - // Это позволяет потребителю выйти из цикла без специального значения в очереди. - void finish() - { - synchronized (mutex) - { - done = true; - cond.notify(); + // Возвращаем извлеченный элемент. + return item; } } @@ -115,7 +87,6 @@ class Queue(T) void main() { // Определяем массив слов для обработки. - // Эти слова будут добавлены в очередь для последовательной обработки. string[] words = ["hello", "world", "foo", "bar", "baz", "qux"]; // Получаем директорию исполняемого файла. string exeDir = thisExePath.dirName; @@ -128,49 +99,40 @@ void main() // Создаем поток-потребитель (consumerThread). // Этот поток будет извлекать слова из очереди и запускать bash-процессы последовательно. auto consumerThread = new Thread({ - // Локальная переменная для хранения извлеченного слова. - string word; - // Цикл, продолжающийся до тех пор, пока dequeue возвращает true. - // Когда dequeue вернет false (очередь пуста и done=true), цикл завершится. - while (q.dequeue(word)) + // Бесконечный цикл, пока не получим сигнал завершения (пустая строка). + while (true) { - // Запускаем и ждем завершения дочернего процесса с помощью execute. - // execute запускает процесс, ждет его завершения и захватывает вывод (stdout + stderr). - // В отличие от spawnProcess + wait, execute возвращает кортеж с кодом выхода и выводом. - // Здесь мы игнорируем вывод, но при необходимости можно проверить res.status и res.output. - // Это упрощает код, так как не нужно отдельно вызывать wait. - auto res = execute([script, word]); - // Опционально: проверка на ошибку - if (res.status != 0) + // Извлекаем слово из очереди (блокируется, если очередь пуста). + string word = q.dequeue(); + // Если слово пустое, это сигнал завершения - выходим из цикла. + if (word == "") { - writeln("Ошибка выполнения скрипта для слова '", word, "': ", res.output); - } - else - { - res.output.write; + break; } + // Запускаем дочерний процесс: выполнение скрипта с аргументом word. + // Это выполняется последовательно, один за другим, без параллелизма дочерних процессов. + auto pid = spawnProcess([script, word]); + // Ждем завершения процесса (wait), чтобы следующий не запускался параллельно. + wait(pid); } }); // Запускаем поток-потребитель. consumerThread.start(); - // Основной цикл: добавляем слова в очередь с задержкой в 500 мс. - // Это позволяет добавлять слова быстрее, чем они обрабатываются потребителем, - // демонстрируя асинхронное добавление и последовательную обработку. + // Основной цикл: добавляем слова в очередь с задержкой в 500 миллисекунд. + // Это позволяет добавлять слова быстрее, чем они обрабатываются потребителем. foreach (word; words) { - // Выводим слово для логирования. writeln(word); // Добавляем слово в очередь. q.enqueue(word); - // Задержка между добавлениями. + // Задержка 500 миллисекунд. Thread.sleep(dur!"msecs"(500)); } - // После добавления всех слов отправляем сигнал завершения через finish. - // Это устанавливает флаг done и уведомляет потребителя. - q.finish(); + // После добавления всех слов отправляем сигнал завершения (пустая строка). + q.enqueue(""); // Ждем завершения потока-потребителя. consumerThread.join();