// Импортируем необходимые модули: // - std.stdio: для ввода/вывода, включая writeln // - std.process: для создания и управления дочерними процессами (spawnProcess, wait) // - core.thread: для работы с потоками (Thread, sleep) // - core.sync.mutex: для мьютекса (Mutex) для синхронизации доступа к общим ресурсам // - core.sync.condition: для условия (Condition) для уведомлений между потоками // - std.file: для работы с файлами (используется косвенно через std.path) // - std.path: для работы с путями к файлам (dirName, buildPath, buildNormalizedPath) import std.stdio; import std.process; import core.thread; import core.sync.mutex; import core.sync.condition; import std.file; import std.path; // Определяем класс Queue для реализации потокобезопасной FIFO-очереди. // T - это шаблонный тип элементов очереди (в нашем случае string). class Queue(T) { // Приватные поля: // - elements: массив для хранения элементов очереди private T[] elements; // - mutex: мьютекс для синхронизации доступа к очереди private Mutex mutex; // - cond: условие для уведомления потоков о изменениях в очереди (например, о добавлении элемента) private Condition cond; // Конструктор класса. // Инициализирует мьютекс и условие. this() { mutex = new Mutex(); cond = new Condition(mutex); } // Метод для добавления элемента в очередь (enqueue). // Синхронизирован с помощью мьютекса для потокобезопасности. void enqueue(T item) { // Захватываем мьютекс для эксклюзивного доступа. synchronized (mutex) { // Добавляем элемент в конец массива (FIFO). elements ~= item; // Уведомляем ожидающие потоки, что очередь изменилась (стала не пустой). cond.notify(); } } // Метод для извлечения элемента из очереди (dequeue). // Блокируется, если очередь пуста, до тех пор, пока не появится элемент. T dequeue() { // Захватываем мьютекс для эксклюзивного доступа. synchronized (mutex) { // Ждем, пока очередь не станет не пустой. while (elements.length == 0) { // Ожидаем уведомления от cond (от enqueue). cond.wait(); } // Извлекаем первый элемент (FIFO). T item = elements[0]; // Удаляем его из массива, сдвигая остальные. elements = elements[1 .. $]; // Возвращаем извлеченный элемент. return item; } } // Метод для проверки, пуста ли очередь. // Также синхронизирован для потокобезопасности. bool isEmpty() { // Захватываем мьютекс. synchronized (mutex) { // Проверяем длину массива. return elements.length == 0; } } } // Главная функция программы. void main() { // Определяем массив слов для обработки. string[] words = ["hello", "world", "foo", "bar", "baz", "qux"]; // Получаем директорию исполняемого файла. string exeDir = thisExePath.dirName; // Строим путь к скрипту script.sh, предполагая, что он находится на уровень выше. string script = exeDir.buildPath("../script.sh").buildNormalizedPath; // Создаем экземпляр очереди для строк. auto q = new Queue!string(); // Создаем поток-потребитель (consumerThread). // Этот поток будет извлекать слова из очереди и запускать bash-процессы последовательно. auto consumerThread = new Thread({ // Бесконечный цикл, пока не получим сигнал завершения (пустая строка). while (true) { // Извлекаем слово из очереди (блокируется, если очередь пуста). string word = q.dequeue(); // Если слово пустое, это сигнал завершения - выходим из цикла. if (word == "") { break; } // Запускаем дочерний процесс: выполнение скрипта с аргументом word. // Это выполняется последовательно, один за другим, без параллелизма дочерних процессов. auto pid = spawnProcess([script, word]); // Ждем завершения процесса (wait), чтобы следующий не запускался параллельно. wait(pid); } }); // Запускаем поток-потребитель. consumerThread.start(); // Основной цикл: добавляем слова в очередь с задержкой в 500 миллисекунд. // Это позволяет добавлять слова быстрее, чем они обрабатываются потребителем. foreach (word; words) { writeln(word); // Добавляем слово в очередь. q.enqueue(word); // Задержка 500 миллисекунд. Thread.sleep(dur!"msecs"(500)); } // После добавления всех слов отправляем сигнал завершения (пустая строка). q.enqueue(""); // Ждем завершения потока-потребителя. consumerThread.join(); // Выводим сообщение о завершении. writeln("Done."); }