Compare commits
No commits in common. "execute" and "spawn" have entirely different histories.
1 changed files with 27 additions and 65 deletions
92
source/app.d
92
source/app.d
|
@ -1,6 +1,6 @@
|
||||||
// Импортируем необходимые модули:
|
// Импортируем необходимые модули:
|
||||||
// - std.stdio: для ввода/вывода, включая writeln
|
// - std.stdio: для ввода/вывода, включая writeln
|
||||||
// - std.process: для создания и управления дочерними процессами (spawnProcess, wait, execute)
|
// - std.process: для создания и управления дочерними процессами (spawnProcess, wait)
|
||||||
// - core.thread: для работы с потоками (Thread, sleep)
|
// - core.thread: для работы с потоками (Thread, sleep)
|
||||||
// - core.sync.mutex: для мьютекса (Mutex) для синхронизации доступа к общим ресурсам
|
// - core.sync.mutex: для мьютекса (Mutex) для синхронизации доступа к общим ресурсам
|
||||||
// - core.sync.condition: для условия (Condition) для уведомлений между потоками
|
// - core.sync.condition: для условия (Condition) для уведомлений между потоками
|
||||||
|
@ -15,7 +15,6 @@ import std.file;
|
||||||
import std.path;
|
import std.path;
|
||||||
|
|
||||||
// Определяем класс Queue для реализации потокобезопасной FIFO-очереди.
|
// Определяем класс Queue для реализации потокобезопасной FIFO-очереди.
|
||||||
// Используем класс вместо структуры, чтобы избежать проблем с shared и конструкторами в D.
|
|
||||||
// T - это шаблонный тип элементов очереди (в нашем случае string).
|
// T - это шаблонный тип элементов очереди (в нашем случае string).
|
||||||
class Queue(T)
|
class Queue(T)
|
||||||
{
|
{
|
||||||
|
@ -26,23 +25,17 @@ class Queue(T)
|
||||||
private Mutex mutex;
|
private Mutex mutex;
|
||||||
// - cond: условие для уведомления потоков о изменениях в очереди (например, о добавлении элемента)
|
// - cond: условие для уведомления потоков о изменениях в очереди (например, о добавлении элемента)
|
||||||
private Condition cond;
|
private Condition cond;
|
||||||
// - done: флаг завершения добавления элементов в очередь
|
|
||||||
// Этот флаг используется для сигнала потребителю, что больше элементов не будет добавлено,
|
|
||||||
// и позволяет избежать использования специального значения (как пустая строка) для остановки.
|
|
||||||
private bool done;
|
|
||||||
|
|
||||||
// Конструктор класса.
|
// Конструктор класса.
|
||||||
// Инициализирует мьютекс, условие и флаг done (по умолчанию false, т.е. добавление еще возможно).
|
// Инициализирует мьютекс и условие.
|
||||||
this()
|
this()
|
||||||
{
|
{
|
||||||
mutex = new Mutex();
|
mutex = new Mutex();
|
||||||
cond = new Condition(mutex);
|
cond = new Condition(mutex);
|
||||||
done = false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Метод для добавления элемента в очередь (enqueue).
|
// Метод для добавления элемента в очередь (enqueue).
|
||||||
// Синхронизирован с помощью мьютекса для потокобезопасности.
|
// Синхронизирован с помощью мьютекса для потокобезопасности.
|
||||||
// Этот метод добавляет элемент в конец очереди и уведомляет ожидающие потоки.
|
|
||||||
void enqueue(T item)
|
void enqueue(T item)
|
||||||
{
|
{
|
||||||
// Захватываем мьютекс для эксклюзивного доступа.
|
// Захватываем мьютекс для эксклюзивного доступа.
|
||||||
|
@ -56,45 +49,24 @@ class Queue(T)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Метод для извлечения элемента из очереди (dequeue).
|
// Метод для извлечения элемента из очереди (dequeue).
|
||||||
// Блокируется, если очередь пуста, до тех пор, пока не появится элемент или не будет установлен флаг done.
|
// Блокируется, если очередь пуста, до тех пор, пока не появится элемент.
|
||||||
// Возвращает true, если элемент успешно извлечен, false - если очередь завершена (done и пуста).
|
T dequeue()
|
||||||
// Параметр item передается по out, чтобы вернуть извлеченный элемент.
|
|
||||||
bool dequeue(out T item)
|
|
||||||
{
|
{
|
||||||
// Захватываем мьютекс для эксклюзивного доступа.
|
// Захватываем мьютекс для эксклюзивного доступа.
|
||||||
synchronized (mutex)
|
synchronized (mutex)
|
||||||
{
|
{
|
||||||
// Ждем, пока очередь не станет не пустой или не будет установлен done.
|
// Ждем, пока очередь не станет не пустой.
|
||||||
// Это предотвращает бесконечное ожидание, если больше элементов не добавляется.
|
while (elements.length == 0)
|
||||||
while (elements.length == 0 && !done)
|
|
||||||
{
|
{
|
||||||
// Ожидаем уведомления от cond (от enqueue или finish).
|
// Ожидаем уведомления от cond (от enqueue).
|
||||||
cond.wait();
|
cond.wait();
|
||||||
}
|
}
|
||||||
// Если очередь пуста и done установлен, возвращаем false.
|
|
||||||
// Это сигнал, что обработка завершена.
|
|
||||||
if (elements.length == 0 && done)
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
// Извлекаем первый элемент (FIFO).
|
// Извлекаем первый элемент (FIFO).
|
||||||
item = elements[0];
|
T item = elements[0];
|
||||||
// Удаляем его из массива, сдвигая остальные.
|
// Удаляем его из массива, сдвигая остальные.
|
||||||
elements = elements[1 .. $];
|
elements = elements[1 .. $];
|
||||||
// Возвращаем true, указывая на успешное извлечение.
|
// Возвращаем извлеченный элемент.
|
||||||
return true;
|
return item;
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Метод для сигнала завершения добавления элементов.
|
|
||||||
// Устанавливает флаг done и уведомляет ожидающие потоки.
|
|
||||||
// Это позволяет потребителю выйти из цикла без специального значения в очереди.
|
|
||||||
void finish()
|
|
||||||
{
|
|
||||||
synchronized (mutex)
|
|
||||||
{
|
|
||||||
done = true;
|
|
||||||
cond.notify();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -115,7 +87,6 @@ class Queue(T)
|
||||||
void main()
|
void main()
|
||||||
{
|
{
|
||||||
// Определяем массив слов для обработки.
|
// Определяем массив слов для обработки.
|
||||||
// Эти слова будут добавлены в очередь для последовательной обработки.
|
|
||||||
string[] words = ["hello", "world", "foo", "bar", "baz", "qux"];
|
string[] words = ["hello", "world", "foo", "bar", "baz", "qux"];
|
||||||
// Получаем директорию исполняемого файла.
|
// Получаем директорию исполняемого файла.
|
||||||
string exeDir = thisExePath.dirName;
|
string exeDir = thisExePath.dirName;
|
||||||
|
@ -128,49 +99,40 @@ void main()
|
||||||
// Создаем поток-потребитель (consumerThread).
|
// Создаем поток-потребитель (consumerThread).
|
||||||
// Этот поток будет извлекать слова из очереди и запускать bash-процессы последовательно.
|
// Этот поток будет извлекать слова из очереди и запускать bash-процессы последовательно.
|
||||||
auto consumerThread = new Thread({
|
auto consumerThread = new Thread({
|
||||||
// Локальная переменная для хранения извлеченного слова.
|
// Бесконечный цикл, пока не получим сигнал завершения (пустая строка).
|
||||||
string word;
|
while (true)
|
||||||
// Цикл, продолжающийся до тех пор, пока dequeue возвращает true.
|
|
||||||
// Когда dequeue вернет false (очередь пуста и done=true), цикл завершится.
|
|
||||||
while (q.dequeue(word))
|
|
||||||
{
|
{
|
||||||
// Запускаем и ждем завершения дочернего процесса с помощью execute.
|
// Извлекаем слово из очереди (блокируется, если очередь пуста).
|
||||||
// execute запускает процесс, ждет его завершения и захватывает вывод (stdout + stderr).
|
string word = q.dequeue();
|
||||||
// В отличие от spawnProcess + wait, execute возвращает кортеж с кодом выхода и выводом.
|
// Если слово пустое, это сигнал завершения - выходим из цикла.
|
||||||
// Здесь мы игнорируем вывод, но при необходимости можно проверить res.status и res.output.
|
if (word == "")
|
||||||
// Это упрощает код, так как не нужно отдельно вызывать wait.
|
|
||||||
auto res = execute([script, word]);
|
|
||||||
// Опционально: проверка на ошибку
|
|
||||||
if (res.status != 0)
|
|
||||||
{
|
{
|
||||||
writeln("Ошибка выполнения скрипта для слова '", word, "': ", res.output);
|
break;
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
res.output.write;
|
|
||||||
}
|
}
|
||||||
|
// Запускаем дочерний процесс: выполнение скрипта с аргументом word.
|
||||||
|
// Это выполняется последовательно, один за другим, без параллелизма дочерних процессов.
|
||||||
|
auto pid = spawnProcess([script, word]);
|
||||||
|
// Ждем завершения процесса (wait), чтобы следующий не запускался параллельно.
|
||||||
|
wait(pid);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// Запускаем поток-потребитель.
|
// Запускаем поток-потребитель.
|
||||||
consumerThread.start();
|
consumerThread.start();
|
||||||
|
|
||||||
// Основной цикл: добавляем слова в очередь с задержкой в 500 мс.
|
// Основной цикл: добавляем слова в очередь с задержкой в 500 миллисекунд.
|
||||||
// Это позволяет добавлять слова быстрее, чем они обрабатываются потребителем,
|
// Это позволяет добавлять слова быстрее, чем они обрабатываются потребителем.
|
||||||
// демонстрируя асинхронное добавление и последовательную обработку.
|
|
||||||
foreach (word; words)
|
foreach (word; words)
|
||||||
{
|
{
|
||||||
// Выводим слово для логирования.
|
|
||||||
writeln(word);
|
writeln(word);
|
||||||
// Добавляем слово в очередь.
|
// Добавляем слово в очередь.
|
||||||
q.enqueue(word);
|
q.enqueue(word);
|
||||||
// Задержка между добавлениями.
|
// Задержка 500 миллисекунд.
|
||||||
Thread.sleep(dur!"msecs"(500));
|
Thread.sleep(dur!"msecs"(500));
|
||||||
}
|
}
|
||||||
|
|
||||||
// После добавления всех слов отправляем сигнал завершения через finish.
|
// После добавления всех слов отправляем сигнал завершения (пустая строка).
|
||||||
// Это устанавливает флаг done и уведомляет потребителя.
|
q.enqueue("");
|
||||||
q.finish();
|
|
||||||
|
|
||||||
// Ждем завершения потока-потребителя.
|
// Ждем завершения потока-потребителя.
|
||||||
consumerThread.join();
|
consumerThread.join();
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue