// Импортируем необходимые модули: // - std.stdio: для ввода/вывода, включая writeln // - std.process: для создания и управления дочерними процессами (spawnProcess, wait, execute) // - core.thread: для работы с потоками (Thread, sleep) // - core.sync.mutex: для мьютекса (Mutex) для синхронизации доступа к общим ресурсам // - core.sync.condition: для условия (Condition) для уведомлений между потоками // - std.file: для работы с файлами (используется косвенно через std.path) // - std.path: для работы с путями к файлам (dirName, buildPath, buildNormalizedPath) import std.stdio; import std.process; import core.thread; import core.sync.mutex; import core.sync.condition; import std.file; import std.path; // Определяем класс Queue для реализации потокобезопасной FIFO-очереди. // Используем класс вместо структуры, чтобы избежать проблем с shared и конструкторами в D. // T - это шаблонный тип элементов очереди (в нашем случае string). class Queue(T) { // Приватные поля: // - elements: массив для хранения элементов очереди private T[] elements; // - mutex: мьютекс для синхронизации доступа к очереди private Mutex mutex; // - cond: условие для уведомления потоков о изменениях в очереди (например, о добавлении элемента) private Condition cond; // - done: флаг завершения добавления элементов в очередь // Этот флаг используется для сигнала потребителю, что больше элементов не будет добавлено, // и позволяет избежать использования специального значения (как пустая строка) для остановки. private bool done; // Конструктор класса. // Инициализирует мьютекс, условие и флаг done (по умолчанию false, т.е. добавление еще возможно). this() { mutex = new Mutex(); cond = new Condition(mutex); done = false; } // Метод для добавления элемента в очередь (enqueue). // Синхронизирован с помощью мьютекса для потокобезопасности. // Этот метод добавляет элемент в конец очереди и уведомляет ожидающие потоки. void enqueue(T item) { // Захватываем мьютекс для эксклюзивного доступа. synchronized (mutex) { // Добавляем элемент в конец массива (FIFO). elements ~= item; // Уведомляем ожидающие потоки, что очередь изменилась (стала не пустой). cond.notify(); } } // Метод для извлечения элемента из очереди (dequeue). // Блокируется, если очередь пуста, до тех пор, пока не появится элемент или не будет установлен флаг done. // Возвращает true, если элемент успешно извлечен, false - если очередь завершена (done и пуста). // Параметр item передается по out, чтобы вернуть извлеченный элемент. bool dequeue(out T item) { // Захватываем мьютекс для эксклюзивного доступа. synchronized (mutex) { // Ждем, пока очередь не станет не пустой или не будет установлен done. // Это предотвращает бесконечное ожидание, если больше элементов не добавляется. while (elements.length == 0 && !done) { // Ожидаем уведомления от cond (от enqueue или finish). cond.wait(); } // Если очередь пуста и done установлен, возвращаем false. // Это сигнал, что обработка завершена. if (elements.length == 0 && done) { return false; } // Извлекаем первый элемент (FIFO). item = elements[0]; // Удаляем его из массива, сдвигая остальные. elements = elements[1 .. $]; // Возвращаем true, указывая на успешное извлечение. return true; } } // Метод для сигнала завершения добавления элементов. // Устанавливает флаг done и уведомляет ожидающие потоки. // Это позволяет потребителю выйти из цикла без специального значения в очереди. void finish() { synchronized (mutex) { done = true; cond.notify(); } } // Метод для проверки, пуста ли очередь. // Также синхронизирован для потокобезопасности. bool isEmpty() { // Захватываем мьютекс. synchronized (mutex) { // Проверяем длину массива. return elements.length == 0; } } } // Главная функция программы. void main() { // Определяем массив слов для обработки. // Эти слова будут добавлены в очередь для последовательной обработки. string[] words = ["hello", "world", "foo", "bar", "baz", "qux"]; // Получаем директорию исполняемого файла. string exeDir = thisExePath.dirName; // Строим путь к скрипту script.sh, предполагая, что он находится на уровень выше. string script = exeDir.buildPath("../script.sh").buildNormalizedPath; // Создаем экземпляр очереди для строк. auto q = new Queue!string(); // Создаем поток-потребитель (consumerThread). // Этот поток будет извлекать слова из очереди и запускать bash-процессы последовательно. auto consumerThread = new Thread({ // Локальная переменная для хранения извлеченного слова. string word; // Цикл, продолжающийся до тех пор, пока dequeue возвращает true. // Когда dequeue вернет false (очередь пуста и done=true), цикл завершится. while (q.dequeue(word)) { // Запускаем и ждем завершения дочернего процесса с помощью execute. // execute запускает процесс, ждет его завершения и захватывает вывод (stdout + stderr). // В отличие от spawnProcess + wait, execute возвращает кортеж с кодом выхода и выводом. // Здесь мы игнорируем вывод, но при необходимости можно проверить res.status и res.output. // Это упрощает код, так как не нужно отдельно вызывать wait. auto res = execute([script, word]); // Опционально: проверка на ошибку if (res.status != 0) { writeln("Ошибка выполнения скрипта для слова '", word, "': ", res.output); } else { res.output.write; } } }); // Запускаем поток-потребитель. consumerThread.start(); // Основной цикл: добавляем слова в очередь с задержкой в 500 мс. // Это позволяет добавлять слова быстрее, чем они обрабатываются потребителем, // демонстрируя асинхронное добавление и последовательную обработку. foreach (word; words) { // Выводим слово для логирования. writeln(word); // Добавляем слово в очередь. q.enqueue(word); // Задержка между добавлениями. Thread.sleep(dur!"msecs"(500)); } // После добавления всех слов отправляем сигнал завершения через finish. // Это устанавливает флаг done и уведомляет потребителя. q.finish(); // Ждем завершения потока-потребителя. consumerThread.join(); // Выводим сообщение о завершении. writeln("Done."); }