From 4dace7c53cbfc5be41ff9cbee7b2498cc910ea79 Mon Sep 17 00:00:00 2001 From: Alexander Zhirov Date: Sat, 23 Aug 2025 00:52:34 +0300 Subject: [PATCH] init --- .gitignore | 17 +++++ .vscode/launch.json | 17 +++++ .vscode/settings.json | 5 ++ README.md | 3 + dub.json | 11 ++++ dub.selections.json | 5 ++ script.sh | 4 ++ source/app.d | 142 ++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 204 insertions(+) create mode 100644 .gitignore create mode 100644 .vscode/launch.json create mode 100644 .vscode/settings.json create mode 100644 README.md create mode 100644 dub.json create mode 100644 dub.selections.json create mode 100755 script.sh create mode 100644 source/app.d diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0305022 --- /dev/null +++ b/.gitignore @@ -0,0 +1,17 @@ +.dub +docs.json +__dummy.html +docs/ +/fifo-process +fifo-process.so +fifo-process.dylib +fifo-process.dll +fifo-process.a +fifo-process.lib +fifo-process-test-* +*.exe +*.pdb +*.o +*.obj +*.lst +bin diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..03d6784 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,17 @@ +{ + // Используйте IntelliSense, чтобы узнать о возможных атрибутах. + // Наведите указатель мыши, чтобы просмотреть описания существующих атрибутов. + // Для получения дополнительной информации посетите: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "type": "code-d", + "request": "launch", + "dubBuild": true, + "name": "Build & Debug DUB project", + "cwd": "${command:dubWorkingDirectory}", + "program": "bin/${command:dubTarget}", + "args": ["list"] + } + ] +} \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..d1c022f --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,5 @@ +{ + "editor.insertSpaces": false, + "editor.tabSize": 4, + "editor.detectIndentation": false +} \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..1301db7 --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# fifo-process + +Эта программа демонстрирует паттерн producer-consumer с использованием потокобезопасной FIFO-очереди. Основной поток (producer) добавляет слова в очередь с задержкой 500 мс, а отдельный поток (consumer) последовательно извлекает их, запускает внешний bash-скрипт (`script.sh`) с каждым словом в качестве аргумента и ожидает завершения процесса. Программа завершается после обработки всех слов, обеспечивая синхронизацию через мьютекс и условие. diff --git a/dub.json b/dub.json new file mode 100644 index 0000000..a83ce9b --- /dev/null +++ b/dub.json @@ -0,0 +1,11 @@ +{ + "authors": [ + "Alexander Zhirov" + ], + "copyright": "Copyright © 2025, Alexander Zhirov", + "description": "fifo-process", + "license": "BSL-1.0", + "name": "fifo-process", + "targetPath": "bin", + "targetType": "executable" +} \ No newline at end of file diff --git a/dub.selections.json b/dub.selections.json new file mode 100644 index 0000000..322586b --- /dev/null +++ b/dub.selections.json @@ -0,0 +1,5 @@ +{ + "fileVersion": 1, + "versions": { + } +} diff --git a/script.sh b/script.sh new file mode 100755 index 0000000..9661ba8 --- /dev/null +++ b/script.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +echo -e "\t$1" +sleep 2 diff --git a/source/app.d b/source/app.d new file mode 100644 index 0000000..fb20e0e --- /dev/null +++ b/source/app.d @@ -0,0 +1,142 @@ +// Импортируем необходимые модули: +// - std.stdio: для ввода/вывода, включая writeln +// - std.process: для создания и управления дочерними процессами (spawnProcess, wait) +// - core.thread: для работы с потоками (Thread, sleep) +// - core.sync.mutex: для мьютекса (Mutex) для синхронизации доступа к общим ресурсам +// - core.sync.condition: для условия (Condition) для уведомлений между потоками +// - std.file: для работы с файлами (используется косвенно через std.path) +// - std.path: для работы с путями к файлам (dirName, buildPath, buildNormalizedPath) +import std.stdio; +import std.process; +import core.thread; +import core.sync.mutex; +import core.sync.condition; +import std.file; +import std.path; + +// Определяем класс Queue для реализации потокобезопасной FIFO-очереди. +// T - это шаблонный тип элементов очереди (в нашем случае string). +class Queue(T) +{ + // Приватные поля: + // - elements: массив для хранения элементов очереди + private T[] elements; + // - mutex: мьютекс для синхронизации доступа к очереди + private Mutex mutex; + // - cond: условие для уведомления потоков о изменениях в очереди (например, о добавлении элемента) + private Condition cond; + + // Конструктор класса. + // Инициализирует мьютекс и условие. + this() + { + mutex = new Mutex(); + cond = new Condition(mutex); + } + + // Метод для добавления элемента в очередь (enqueue). + // Синхронизирован с помощью мьютекса для потокобезопасности. + void enqueue(T item) + { + // Захватываем мьютекс для эксклюзивного доступа. + synchronized (mutex) + { + // Добавляем элемент в конец массива (FIFO). + elements ~= item; + // Уведомляем ожидающие потоки, что очередь изменилась (стала не пустой). + cond.notify(); + } + } + + // Метод для извлечения элемента из очереди (dequeue). + // Блокируется, если очередь пуста, до тех пор, пока не появится элемент. + T dequeue() + { + // Захватываем мьютекс для эксклюзивного доступа. + synchronized (mutex) + { + // Ждем, пока очередь не станет не пустой. + while (elements.length == 0) + { + // Ожидаем уведомления от cond (от enqueue). + cond.wait(); + } + // Извлекаем первый элемент (FIFO). + T item = elements[0]; + // Удаляем его из массива, сдвигая остальные. + elements = elements[1 .. $]; + // Возвращаем извлеченный элемент. + return item; + } + } + + // Метод для проверки, пуста ли очередь. + // Также синхронизирован для потокобезопасности. + bool isEmpty() + { + // Захватываем мьютекс. + synchronized (mutex) + { + // Проверяем длину массива. + return elements.length == 0; + } + } +} + +// Главная функция программы. +void main() +{ + // Определяем массив слов для обработки. + string[] words = ["hello", "world", "foo", "bar", "baz", "qux"]; + // Получаем директорию исполняемого файла. + string exeDir = thisExePath.dirName; + // Строим путь к скрипту script.sh, предполагая, что он находится на уровень выше. + string script = exeDir.buildPath("../script.sh").buildNormalizedPath; + + // Создаем экземпляр очереди для строк. + auto q = new Queue!string(); + + // Создаем поток-потребитель (consumerThread). + // Этот поток будет извлекать слова из очереди и запускать bash-процессы последовательно. + auto consumerThread = new Thread({ + // Бесконечный цикл, пока не получим сигнал завершения (пустая строка). + while (true) + { + // Извлекаем слово из очереди (блокируется, если очередь пуста). + string word = q.dequeue(); + // Если слово пустое, это сигнал завершения - выходим из цикла. + if (word == "") + { + break; + } + // Запускаем дочерний процесс: выполнение скрипта с аргументом word. + // Это выполняется последовательно, один за другим, без параллелизма дочерних процессов. + auto pid = spawnProcess([script, word]); + // Ждем завершения процесса (wait), чтобы следующий не запускался параллельно. + wait(pid); + } + }); + + // Запускаем поток-потребитель. + consumerThread.start(); + + // Основной цикл: добавляем слова в очередь с задержкой в 500 миллисекунд. + // Это позволяет добавлять слова быстрее, чем они обрабатываются потребителем. + foreach (word; words) + { + writeln(word); + // Добавляем слово в очередь. + q.enqueue(word); + // Задержка 500 миллисекунд. + Thread.sleep(dur!"msecs"(500)); + } + + // После добавления всех слов отправляем сигнал завершения (пустая строка). + q.enqueue(""); + + // Ждем завершения потока-потребителя. + consumerThread.join(); + + // Выводим сообщение о завершении. + writeln("Done."); +}