commit f1dd2aebb2d6fd43bf1b947a749f3adade563590 Author: Alexander Zhirov Date: Thu Sep 4 21:51:23 2025 +0300 init diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..402c534 --- /dev/null +++ b/.gitignore @@ -0,0 +1,17 @@ +.dub +docs.json +__dummy.html +docs/ +/cdc +cdc.so +cdc.dylib +cdc.dll +cdc.a +cdc.lib +cdc-test-* +*.exe +*.pdb +*.o +*.obj +*.lst +bin diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..35d0dcd --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,39 @@ +{ + // Используйте IntelliSense, чтобы узнать о возможных атрибутах. + // Наведите указатель мыши, чтобы просмотреть описания существующих атрибутов. + // Для получения дополнительной информации посетите: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "type": "code-d", + "request": "launch", + "dubBuild": true, + "name": "Build & Debug DUB project", + "cwd": "${command:dubWorkingDirectory}", + "program": "bin/${command:dubTarget}", + "args": [ + "/tmp/test" + ] + }, + { + "name": "Debug D Program with sudo-gdb", + "type": "cppdbg", + "request": "launch", + "program": "${workspaceFolder}/bin/dwatch", + "args": ["-d", "/tmp/scripts"], // Аргументы командной строки для программы, если нужны + "stopAtEntry": false, // Остановить на входе в main + "cwd": "${workspaceFolder}", + "environment": [], + "externalConsole": false, + "MIMode": "gdb", + "miDebuggerPath": "/usr/bin/sudo-gdb", // Путь к вашему скрипту + "setupCommands": [ + { + "description": "Enable pretty-printing for gdb", + "text": "-enable-pretty-printing", + "ignoreFailures": true + } + ] + } + ] +} \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..d1c022f --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,5 @@ +{ + "editor.insertSpaces": false, + "editor.tabSize": 4, + "editor.detectIndentation": false +} \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..a80bfc6 --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +# Implementing Change Data Capture diff --git a/dub.json b/dub.json new file mode 100644 index 0000000..fabf1f1 --- /dev/null +++ b/dub.json @@ -0,0 +1,11 @@ +{ + "authors": [ + "Alexander Zhirov" + ], + "copyright": "Copyright © 2025, Alexander Zhirov", + "description": "Implementing Change Data Capture.", + "license": "BSL-1.0", + "name": "cdc", + "targetPath": "bin", + "targetType": "executable" +} \ No newline at end of file diff --git a/dub.selections.json b/dub.selections.json new file mode 100644 index 0000000..322586b --- /dev/null +++ b/dub.selections.json @@ -0,0 +1,5 @@ +{ + "fileVersion": 1, + "versions": { + } +} diff --git a/source/app.d b/source/app.d new file mode 100644 index 0000000..c51eff5 --- /dev/null +++ b/source/app.d @@ -0,0 +1,272 @@ +module app; + +import std.stdio : writeln, writefln, File; +import std.file : exists, mkdirRecurse, read, write, readText; +import std.path : baseName, buildPath, absolutePath; +import std.getopt : getopt; +import std.string : strip, split, splitLines; +import std.algorithm.searching : startsWith; +import std.conv : to; +import std.datetime : Clock; +import std.exception : enforce; +import std.digest.sha : sha256Of; + +import fastcdc; // твой модуль FastCDC + +// ---------- утилиты ---------- + +// hex: параметр scope, чтобы можно было безопасно передавать срез локального массива +@safe pure +string toHex(scope const(ubyte)[] bytes) +{ + immutable char[16] HEX = "0123456789abcdef"; + auto buf = new char[bytes.length * 2]; + size_t j = 0; + foreach (b; bytes) + { + buf[j++] = HEX[(b >> 4) & 0xF]; + buf[j++] = HEX[b & 0xF]; + } + return buf.idup; // immutable string +} + +// fanout: store/chunks/aa/bb/.bin +@safe +string chunkPath(string storeDir, string hashHex) +{ + auto a = hashHex[0 .. 2]; + auto b = hashHex[2 .. 4]; + return buildPath(storeDir, "chunks", a, b, hashHex ~ ".bin"); +} + +// manifest: store/manifests/..manifest +@safe +string manifestPath(string storeDir, string srcPath, long epoch) +{ + auto name = baseName(srcPath); + return buildPath(storeDir, "manifests", name ~ "." ~ to!string(epoch) ~ ".manifest"); +} + +@safe +void ensureDirs(string storeDir) +{ + mkdirRecurse(buildPath(storeDir, "chunks")); + mkdirRecurse(buildPath(storeDir, "manifests")); +} + +@trusted ubyte[] readBytes(string path) +{ + // std.file.read выделяет новый буфер байтов → безопасно привести к ubyte[] + auto v = read(path); // void[] + return cast(ubyte[]) v; // доверяем Phobos: это сырой байтовый буфер +} + +// ---------- split ---------- + +struct SplitOpts +{ + string storeDir; + string filePath; + size_t minSize = 8 * 1024; + size_t avgSize = 64 * 1024; + size_t maxSize = 256 * 1024; +} + +@safe +int cmdSplit(SplitOpts opt) +{ + enforce(exists(opt.filePath), "Файл не найден: " ~ opt.filePath); + ensureDirs(opt.storeDir); + + // бинарное чтение: std.file.read возвращает ubyte[] + ubyte[] data = readBytes(opt.filePath); + + FastCDCParams p = {opt.minSize, opt.avgSize, opt.maxSize}; + p.normalize(); + + size_t chunkCount = 0; + size_t totalBytes = data.length; + + auto epoch = Clock.currTime().toUnixTime(); + auto mfPath = manifestPath(opt.storeDir, opt.filePath, epoch); + mkdirRecurse(buildPath(opt.storeDir, "manifests")); + auto mf = File(mfPath, "w"); + + // шапка манифеста + mf.writeln("# FastCDC manifest"); + mf.writefln("path\t%s", absolutePath(opt.filePath)); + mf.writefln("size\t%s", to!string(totalBytes)); + mf.writefln("algo\tsha256"); + mf.writefln("min\t%u", cast(uint) p.minSize); + mf.writefln("avg\t%u", cast(uint) p.avgSize); + mf.writefln("max\t%u", cast(uint) p.maxSize); + mf.writeln("ord\thash\tsize"); + + size_t ord = 0; + processStream(data, p, (size_t start, size_t len) @safe { + auto slice = data[start .. start + len]; + auto digest = sha256Of(slice); // ubyte[32] (на стеке) + auto hex = toHex(digest[]); // scope-параметр — ок + + mkdirRecurse(buildPath(opt.storeDir, "chunks", hex[0 .. 2])); + mkdirRecurse(buildPath(opt.storeDir, "chunks", hex[0 .. 2], hex[2 .. 4])); + + auto cpath = chunkPath(opt.storeDir, hex); + if (!exists(cpath)) + write(cpath, slice); + + mf.writefln("%u\t%s\t%u", cast(uint) ord, hex, cast(uint) len); + ++ord; + ++chunkCount; + return 0; + }); + + mf.flush(); + mf.close(); + + writefln("split: %s", opt.filePath); + writefln("store: %s", opt.storeDir); + writefln("manifest: %s", mfPath); + writefln("chunks: %u, bytes: %u", + cast(uint) chunkCount, cast(uint) totalBytes); + return 0; +} + +// ---------- restore ---------- + +struct RestoreOpts +{ + string storeDir; + string manifestFile; + string outFile; +} + +@safe +int cmdRestore(RestoreOpts opt) +{ + enforce(exists(opt.manifestFile), "Манифест не найден: " ~ opt.manifestFile); + + string text = readText(opt.manifestFile); + auto lines = splitLines(text); + + // найти строку "ord\thash\tsize" + size_t i = 0; + while (i < lines.length && !lines[i].strip.startsWith("ord")) + ++i; + enforce(i < lines.length, "Не найден заголовок секции данных в манифесте"); + ++i; + + auto dst = File(opt.outFile, "wb"); + + size_t count = 0; + for (; i < lines.length; ++i) + { + auto ln = lines[i].strip; + if (ln.length == 0 || ln[0] == '#') + continue; + + auto cols = ln.split('\t'); + enforce(cols.length == 3, "Строка манифеста повреждена: " ~ ln); + + auto hashHex = cols[1]; + auto cpath = chunkPath(opt.storeDir, hashHex); + enforce(exists(cpath), "Чанк не найден: " ~ cpath); + + ubyte[] chunkData = readBytes(cpath); + dst.rawWrite(chunkData); + ++count; + } + + dst.close(); + writefln("restore: %s <- %s (chunks: %u)", + opt.outFile, opt.manifestFile, cast(uint) count); + return 0; +} + +// ---------- CLI ---------- + +@safe +void printHelp(string prog) +{ + writeln("Usage:"); + writeln(" ", prog, " split --store [--min N] [--avg N] [--max N]"); + writeln(" ", prog, " restore --store "); +} + +int main(string[] args) // без @safe: getopt требует &var +{ + if (args.length < 2) + { + printHelp(args[0]); + return 1; + } + + switch (args[1]) + { + case "split": + { + SplitOpts opt; + string store; + size_t minS = 0, avgS = 0, maxS = 0; + + auto res = getopt(args, + "store", &store, + "min", &minS, + "avg", &avgS, + "max", &maxS + ); + if (res.helpWanted) + { + printHelp(args[0]); + return 0; + } + + // после getopt в args остаются позиционные + if (args.length < 3 || store.length == 0) + { + printHelp(args[0]); + return 1; + } + + opt.storeDir = store; + opt.filePath = args[2]; + if (minS) + opt.minSize = minS; + if (avgS) + opt.avgSize = avgS; + if (maxS) + opt.maxSize = maxS; + + return cmdSplit(opt); + } + + case "restore": + { + RestoreOpts opt; + string store; + + auto res = getopt(args, "store", &store); + if (res.helpWanted) + { + printHelp(args[0]); + return 0; + } + + if (args.length < 4 || store.length == 0) + { + printHelp(args[0]); + return 1; + } + + opt.storeDir = store; + opt.manifestFile = args[2]; + opt.outFile = args[3]; + + return cmdRestore(opt); + } + + default: + printHelp(args[0]); + return 1; + } +} diff --git a/source/fastcdc.d b/source/fastcdc.d new file mode 100644 index 0000000..180c05b --- /dev/null +++ b/source/fastcdc.d @@ -0,0 +1,292 @@ +module fastcdc; + +/** + * FastCDC (скользящее контент-зависимое разбиение). + * + * Идея: идём по данным байт за байтом, считаем rolling-hash (Gear), + * и ставим границы чанков там, где hash удовлетворяет простому условию: + * hash & mask == 0 + * + * При этом действуют три порога: + * - minSize: до этой длины чанк не разрываем (слишком мелко) + * - avgSize: «целевой» средний размер; после него границы ищем активнее + * - maxSize: жёсткий максимум; если так и не нашли границу — режем принудительно + * + * Благодаря контент-зависимым границам, маленькие вставки/удаления + * меняют только локальные чанки; большинство остальных переиспользуется. + */ + +/// Параметры FastCDC. Рекомендуется avgSize как степень двойки (например, 64 KiB). +struct FastCDCParams +{ + /// Минимальная длина чанка: пока длина меньше — границы не ставим. + size_t minSize = 8 * 1024; + + /// Целевой средний размер чанка (желательно 2^k: 32, 64, 128 KiB и т.п.). + size_t avgSize = 64 * 1024; + + /// Жёсткий максимум длины чанка: достигли — режем независимо от хэша. + size_t maxSize = 256 * 1024; + + /** + * Нормализация границ (делает параметры самосогласованными): + * - minSize >= 4 KiB + * - avgSize > minSize (если нет — удваиваем) + * - maxSize > avgSize (если нет — умножаем на 4) + * + * @safe nothrow @nogc — чтобы можно было вызывать из безопасных функций + * без аллокаций/исключений. + */ + @safe nothrow @nogc + void normalize() + { + enum ki = 1024; + if (minSize < 4 * ki) + minSize = 4 * ki; + if (avgSize <= minSize) + avgSize = minSize * 2; + if (maxSize <= avgSize) + maxSize = avgSize * 4; + } +} + +/// Диапазон чанка: [start .. start+len) в исходном буфере. +struct ChunkRange +{ + size_t start; // смещение начала чанка + size_t len; // длина чанка в байтах +} + +/** + * Таблица для Gear-rolling-hash: 256 псевдослучайных 32-битных констант. + * Хэш обновляется так: h = (h << 1) + GEAR[byte]. + * Константы фиксированы → детерминированные границы при одинаковых параметрах. + */ +private immutable uint[256] GEAR = [ + 0x4f1bbcdc, 0xe47c2b1a, 0x1a16b407, 0xa88f9d43, 0x33b0b5c5, 0x0f0b1192, + 0xb1c2e5b8, 0xc6f5a2a9, + 0x7d1ea04c, 0x26358f23, 0x8f9a7902, 0x5ab7d6ee, 0x2f6d3a8c, 0x9e13c540, + 0x4d7c8b99, 0xf3a1d2b1, + 0x0b7d4c62, 0x81f2a5d3, 0x19a8b604, 0x6cc7e190, 0x559e43a1, 0xd2f01937, + 0x7a53cc4f, 0x0c1d5e66, + 0x3b7f6a22, 0x99d104b7, 0x4aa7f9e1, 0x2ce3b8c0, 0x6a8f73d4, 0xe1b2c9a8, + 0x57c04b13, 0xa4e91572, + 0x13f7c2aa, 0x8b1c0f5e, 0x5e6a92c1, 0x0af41937, 0x7fe0bd54, 0x26b3e71a, + 0x942d6c83, 0x3c51a0ef, + 0xd57f2b33, 0x61a4cc09, 0x0d9b8a71, 0xb7e50f46, 0x48a3d1f0, 0x2f1e6cb2, + 0x73cd98a5, 0xe92a13c9, + 0xa1c7f02e, 0x5b0e6a97, 0x0c8f2d31, 0xd1a47b66, 0x6fe3b920, 0x20b9d4a1, + 0x9a5c0f3d, 0x4e81a2c7, + 0xf02b5934, 0x1bc7d8a2, 0x8e0a64f1, 0x37d4b20c, 0x6c09f5d3, 0xa2391e84, + 0x5f7ab0e2, 0x0b1c6d57, + 0x7c3f9a15, 0x12ad54e3, 0x8b6f0c2d, 0x45e1d7a9, 0x2af39b60, 0x9c07e4d1, + 0x3d5a81b2, 0xe6c21458, + 0xd9a03f1c, 0x64b7e0a3, 0x0ea19c76, 0xb2d5480f, 0x49f3a7d5, 0x21c58e92, + 0x75ae39c1, 0xed1046ab, + 0xa8c3f12d, 0x5c0e7b94, 0x0d8f2e31, 0xd4a57c66, 0x6ee4ba20, 0x23bad5a1, + 0x985d0f3d, 0x4f82a3c7, + 0xf12c5a34, 0x1ac8d9a2, 0x8f0b65f1, 0x36d5b30c, 0x6b0af6d3, 0xa33a1f84, + 0x5e7bb1e2, 0x0a1d6e57, + 0x7d409b15, 0x11ae55e3, 0x8a700d2d, 0x44e2d8a9, 0x2bf49c60, 0x9d08e5d1, + 0x3c5b82b2, 0xe7c31558, + 0xd8a1401c, 0x65b8e1a3, 0x0fa29d76, 0xb3d6490f, 0x48f4a8d5, 0x22c68f92, + 0x74af3ac1, 0xec1147ab, + 0xa9c4f22d, 0x5d0f7c94, 0x0e902f31, 0xd5a67d66, 0x6de5bb20, 0x24bbd6a1, + 0x975e103d, 0x4e83a4c7, + 0xf22d5b34, 0x1bc9daa2, 0x8e0c66f1, 0x35d6b40c, 0x6a0bf7d3, 0xa23b2084, + 0x5f7cb2e2, 0x0b1e6f57, + 0x7c419c15, 0x10af56e3, 0x8b710e2d, 0x45e3d9a9, 0x2af59d60, 0x9c09e6d1, + 0x3d5c83b2, 0xe6c41658, + 0xd9a2411c, 0x64b9e2a3, 0x0ea39e76, 0xb2d74a0f, 0x49f5a9d5, 0x23c79092, + 0x75b03bc1, 0xed1248ab, + 0xa8c5f32d, 0x5c107d94, 0x0d913031, 0xd4a77e66, 0x6ee6bc20, 0x25bcd7a1, + 0x985f113d, 0x4f84a5c7, + 0xf12e5c34, 0x1acadba2, 0x8f0d67f1, 0x36d7b50c, 0x6b0cf8d3, 0xa33c2184, + 0x5e7db3e2, 0x0a1f7057, + 0x7d429d15, 0x11b057e3, 0x8a720f2d, 0x44e4daa9, 0x2bf69e60, 0x9d0ae7d1, + 0x3c5d84b2, 0xe7c51758, + 0xd8a3421c, 0x65bae3a3, 0x0fa49f76, 0xb3d84b0f, 0x48f6aad5, 0x22c89192, + 0x74b13cc1, 0xec1349ab, + 0xa9c6f42d, 0x5d117e94, 0x0e923131, 0xd5a87f66, 0x6de7bd20, 0x24bdd8a1, + 0x9750123d, 0x4e85a6c7, + 0xf22f5d34, 0x1bcbdca2, 0x8e0e68f1, 0x35d8b60c, 0x6a0df9d3, 0xa23d2284, + 0x5f7eb4e2, 0x0b207157, + 0x7c439e15, 0x10b158e3, 0x8b73102d, 0x45e5dba9, 0x2af79f60, 0x9c0be8d1, + 0x3d5e85b2, 0xe6c61858, + 0xd9a4431c, 0x64bbe4a3, 0x0ea5a076, 0xb2d94c0f, 0x49f7abd5, 0x23c99292, + 0x75b23dc1, 0xed144aab, + 0xa8c7f52d, 0x5c127f94, 0x0d933231, 0xd4a98066, 0x6ee8be20, 0x25bed9a1, + 0x9861133d, 0x4f86a7c7, + 0xf1305e34, 0x1acca0a2, 0x8f0f69f1, 0x36d9b70c, 0x6b0efaD3, 0xa33e2384, + 0x5e7fb5e2, 0x0a217257, + 0x7d449f15, 0x11b259e3, 0x8a74112d, 0x44e6dca9, 0x2bf8a060, 0x9d0ce9d1, + 0x3c5f86b2, 0xe7c71958, + 0xd8a5441c, 0x65bce5a3, 0x0fa6a176, 0xb3da4d0f, 0x48f8acd5, 0x22ca9392, + 0x74b33ec1, 0xec154bab +]; + +/// Обновление Gear-хэша одним байтом. @safe @nothrow @nogc — без аллокаций/исключений. +@safe nothrow @nogc +private uint gearUpdate(uint h, ubyte b) +{ + // Побитовый сдвиг + добавление таблицы → быстрый, но «живой» rolling-hash. + return (h << 1) + GEAR[b]; +} + +/** + * Ближайшая вниз степень двойки для x (>=1). + * Нужна, чтобы построить маски вида (2^k − 1) и (2^(k+1) − 1). + * При таких масках условие (hash & mask) == 0 имеет ожидаемый период ~2^k. + */ +@safe nothrow @nogc +private size_t floorPow2(size_t x) +{ + size_t p = 1; + while ((p << 1) != 0 && (p << 1) <= x) + p <<= 1; + return p; +} + +/** + * Batch-API: режет весь буфер и возвращает список чанков. + * Удобно, если объём данных умеренный и важна простота. + * + * Сложность: O(n), где n — длина data. + */ +@safe +ChunkRange[] chunkify(const(ubyte)[] data, FastCDCParams params = FastCDCParams.init) +{ + FastCDCParams p = params; + p.normalize(); // приводим пороги в адекватные границы + + ChunkRange[] chunks; // итоговый список диапазонов + // Необязательный pre-reserve: уменьшает реаллокации; можно удалить, если не нужно. + chunks.reserve(data.length / (p.avgSize ? p.avgSize : 1) + 1); + + // Строим две маски: + // - maskN (normal): 2^k − 1 → «частота» границ ~ avgSize + // - maskE (early): 2^(k+1) − 1 → «строже», реже до avgSize (не режем слишком рано) + const size_t avgPow2 = floorPow2(p.avgSize); + immutable uint maskN = cast(uint)(avgPow2 - 1); + immutable uint maskE = cast(uint)(((avgPow2 << 1) != 0) ? ((avgPow2 << 1) - 1) : maskN); + + size_t start = 0; // начало текущего потенциального чанка + uint rolling = 0; // текущее значение rolling-hash + + foreach (i, b; data) // идём по байтам + { + rolling = gearUpdate(rolling, b); // обновили rolling-hash + const size_t clen = (i + 1) - start; // текущая длина кандидат-чанка + + // 1) До minSize границы запрещены — продолжаем копить байты. + if (clen < p.minSize) + continue; + + // 2) Если превысили maxSize — принудительно режем тут. + if (clen >= p.maxSize) + { + chunks ~= ChunkRange(start, clen); + start = i + 1; // новый чанк начинается со следующего байта + rolling = 0; // сбрасываем хэш (независимая последовательность) + continue; + } + + // 3) Зона «ранняя» (min..avg): применяем более строгую маску, чтобы + // не нарезать слишком короткие чанки. + if (clen < p.avgSize) + { + if ((rolling & maskE) == 0) // редкое совпадение → ставим границу + { + chunks ~= ChunkRange(start, clen); + start = i + 1; + rolling = 0; + } + continue; // иначе продолжаем наращивать чанк + } + + // 4) Зона «нормальная» (avg..max): используем обычную маску — средний размер ≈ avgSize. + if ((rolling & maskN) == 0) + { + chunks ~= ChunkRange(start, clen); + start = i + 1; + rolling = 0; + } + } + + // 5) Хвост: если что-то осталось — это последний чанк. + if (start < data.length) + chunks ~= ChunkRange(start, data.length - start); + + return chunks; +} + +/** + * Стримовый API: вызывает sink(start,len) для каждого найденного чанка. + * + * Плюсы: + * - не аллоцирует список чанков (можно сделать @nogc-путь); + * - удобно сразу считать хэш чанка и писать в БД. + * + * Контракт sink: + * - возвращает 0 → «продолжать»; + * - возвращает != 0 → «остановить» (функция вернёт количество уже эмитнутых чанков). + */ +@safe +size_t processStream(const(ubyte)[] data, FastCDCParams params, + scope int delegate(size_t, size_t) @safe sink) +{ + FastCDCParams p = params; + p.normalize(); + + const size_t avgPow2 = floorPow2(p.avgSize); + immutable uint maskN = cast(uint)(avgPow2 - 1); + immutable uint maskE = cast(uint)(((avgPow2 << 1) != 0) ? ((avgPow2 << 1) - 1) : maskN); + + size_t start = 0; + uint rolling = 0; + size_t count = 0; // числим эмитнутые чанки + + foreach (i, b; data) + { + rolling = gearUpdate(rolling, b); + const size_t clen = (i + 1) - start; + + // До minSize границы не допускаются + if (clen < p.minSize) + continue; + + // Решение «резать/не резать» для текущей позиции + bool cut = false; + if (clen >= p.maxSize) // принудительный разрыв на maxSize + cut = true; + else if (clen < p.avgSize) // раннее окно — строгая маска + cut = ((rolling & maskE) == 0); + else // нормальное окно — обычная маска + cut = ((rolling & maskN) == 0); + + if (cut) + { + // Отдаём чанк потребителю. Он может попросить остановиться (!=0). + if (sink(start, clen) != 0) + { + ++count; // этот чанк уже отдан — учитываем + return count; // и выходим + } + ++count; + start = i + 1; // следующий чанк начинается со следующего байта + rolling = 0; // сбрасываем rolling-hash + } + } + + // Хвостовой чанк (если есть): отдаём целиком. + if (start < data.length) + { + if (sink(start, data.length - start) != 0) + { + ++count; // учли последний чанк + return count; // останов по сигналу sink + } + ++count; + } + return count; +}