diff --git a/source/cdcdb/cdc/cas.d b/source/cdcdb/cdc/cas.d index d3e8034..16e054e 100644 --- a/source/cdcdb/cdc/cas.d +++ b/source/cdcdb/cdc/cas.d @@ -6,40 +6,116 @@ import cdcdb.cdc.core; import std.digest.sha : SHA256, digest; import std.format : format; +import zstd; + +import std.exception : enforce; +import std.stdio : writeln; +import std.conv : to; + +import std.file : write; + // CAS-хранилище (Content-Addressable Storage) со снапшотами final class CAS { private: DBLite _db; + bool _zstd; public: - this(string database) + this(string database, bool zstd = false) { _db = new DBLite(database); + _zstd = zstd; } size_t saveSnapshot(string filePath, const(ubyte)[] data) { ubyte[32] hashSource = digest!SHA256(data); // Сделать запрос в БД по filePath и сверить хеш файлов + + // writeln(hashSource.length); // Параметры для CDC вынести в отдельные настройки (продумать) - auto cdc = new CDC(100, 200, 500, 0xFF, 0x0F); + auto cdc = new CDC(300, 700, 1000, 0xFF, 0x0F); // Разбить на фрагменты auto chunks = cdc.split(data); - import std.stdio : writeln; + Snapshot snapshot; + snapshot.filePath = filePath; + snapshot.fileSha256 = hashSource; + snapshot.label = "Файл для теста"; + snapshot.sourceLength = data.length; _db.beginImmediate(); + + auto idSnapshot = _db.addSnapshot(snapshot); + + SnapshotChunk snapshotChunk; + Blob blob; + + blob.zstd = _zstd; + // Записать фрагменты в БД foreach (chunk; chunks) { - writeln(format("%(%02x%)", chunk.sha256)); + blob.sha256 = chunk.sha256; + blob.size = chunk.size; + + auto content = data[chunk.offset .. chunk.offset + chunk.size]; + + if (_zstd) { + ubyte[] zBytes = compress(content, 22); + size_t zSize = zBytes.length; + ubyte[32] zHash = digest!SHA256(zBytes); + + blob.zSize = zSize; + blob.zSha256 = zHash; + blob.content = zBytes; + } else { + blob.content = content.dup; + } + + _db.addBlob(blob); + + snapshotChunk.snapshotId = idSnapshot; + snapshotChunk.chunkIndex = chunk.index; + snapshotChunk.offset = chunk.offset; + snapshotChunk.size = chunk.size; + snapshotChunk.sha256 = chunk.sha256; + + _db.addSnapshotChunk(snapshotChunk); } _db.commit(); // Записать манифест в БД // Вернуть ID манифеста return 0; } + + void restoreSnapshot() + { + string restoreFile = "/tmp/restore.d"; + + foreach (Snapshot snapshot; _db.getSnapshots("/tmp/text")) { + auto dataChunks = _db.getChunks(snapshot.id); + ubyte[] content; + + foreach (SnapshotDataChunk chunk; dataChunks) { + ubyte[] bytes; + if (chunk.zstd) { + enforce(chunk.zSize == chunk.content.length, "Размер сжатого фрагмента не соответствует ожидаемому"); + bytes = cast(ubyte[]) uncompress(chunk.content); + } else { + bytes = chunk.content; + } + enforce(chunk.size == bytes.length, "Оригинальный размер не соответствует ожидаемому"); + + content ~= bytes; + } + + enforce(snapshot.fileSha256 == digest!SHA256(content), "Хеш-сумма файла не совпадает"); + + write(snapshot.filePath ~ snapshot.id.to!string, content); + } + } } diff --git a/source/cdcdb/db/dblite.d b/source/cdcdb/db/dblite.d index 3bba325..f4d62f0 100644 --- a/source/cdcdb/db/dblite.d +++ b/source/cdcdb/db/dblite.d @@ -2,6 +2,10 @@ module cdcdb.db.dblite; import arsd.sqlite; import std.file : exists, isFile; +import std.exception : enforce; +import std.conv : to; + +import cdcdb.db.types; final class DBLite : Sqlite { @@ -45,5 +49,209 @@ public: query("ROLLBACK"); } - // findFile() + // Snapshot getSnapshot(string filePath, immutable ubyte[32] sha256) + // { + // auto queryResult = sql( + // q{ + // SELECT * FROM snapshots + // WHERE file_path = ? AND file_sha256 = ? + // }, filePath, sha256 + // ); + // } + + long addSnapshot(Snapshot snapshot) + { + auto queryResult = sql( + q{ + INSERT INTO snapshots( + file_path, + file_sha256, + label, + source_length, + algo_min, + algo_normal, + algo_max, + mask_s, + mask_l, + status + ) VALUES (?,?,?,?,?,?,?,?,?,?) + RETURNING id + }, + snapshot.filePath, + snapshot.fileSha256[], + snapshot.label, + snapshot.sourceLength, + snapshot.algoMin, + snapshot.algoNormal, + snapshot.algoMax, + snapshot.maskS, + snapshot.maskL, + snapshot.status.to!int + ); + + if (!queryResult.empty()) + return queryResult.front()["id"].to!long; + return 0; + } + + void addBlob(Blob blob) + { + sql( + q{ + INSERT INTO blobs (sha256, z_sha256, size, z_size, content, zstd) + VALUES (?,?,?,?,?,?) + ON CONFLICT (sha256) DO NOTHING + }, + blob.sha256[], + blob.zSize ? blob.zSha256[] : null, + blob.size, + blob.zSize, + blob.content, + blob.zstd.to!int + ); + } + + void addSnapshotChunk(SnapshotChunk snapshotChunk) + { + sql( + q{ + INSERT INTO snapshot_chunks (snapshot_id, chunk_index, offset, size, sha256) + VALUES(?,?,?,?,?) + }, + snapshotChunk.snapshotId, + snapshotChunk.chunkIndex, + snapshotChunk.offset, + snapshotChunk.size, + snapshotChunk.sha256[] + ); + } + + // struct ChunkInput + // { + // long index; + // long offset; + // long size; + // ubyte[32] sha256; + // const(ubyte)[] content; + // } + + // long saveSnapshotWithChunks( + // string filePath, string label, long sourceLength, + // long algoMin, long algoNormal, long algoMax, + // long maskS, long maskL, + // const ChunkInput[] chunks + // ) + // { + // beginImmediate(); + + // bool ok; + + // scope (exit) + // { + // if (!ok) + // rollback(); + // } + // scope (success) + // { + // commit(); + // } + + // const snapId = insertSnapshotMeta( + // filePath, label, sourceLength, + // algoMin, algoNormal, algoMax, + // maskS, maskL, SnapshotStatus.pending + // ); + + // foreach (c; chunks) + // { + // insertBlobIfMissing(c.sha256, c.size, c.content); + // insertSnapshotChunk(snapId, c.index, c.offset, c.size, c.sha256); + // } + + // ok = true; + + // return snapId; + // } + + + + + + + + + + + + + + // // --- чтение --- + + Snapshot[] getSnapshots(string filePath) + { + auto queryResult = sql( + q{ + SELECT id, file_path, file_sha256, label, created_utc, source_length, + algo_min, algo_normal, algo_max, mask_s, mask_l, status + FROM snapshots WHERE file_path = ? + }, filePath + ); + + Snapshot[] snapshots; + // bool found = false; + foreach (row; queryResult) + { + Snapshot snapshot; + + snapshot.id = row["id"].to!long; + snapshot.filePath = row["file_path"].to!string; + snapshot.fileSha256 = cast(ubyte[]) row["file_sha256"].dup; + snapshot.label = row["label"].to!string; + snapshot.createdUtc = row["created_utc"].to!string; + snapshot.sourceLength = row["source_length"].to!long; + snapshot.algoMin = row["algo_min"].to!long; + snapshot.algoNormal = row["algo_normal"].to!long; + snapshot.algoMax = row["algo_max"].to!long; + snapshot.maskS = row["mask_s"].to!long; + snapshot.maskL = row["mask_l"].to!long; + snapshot.status = cast(SnapshotStatus)row["status"].to!int; + // found = true; + snapshots ~= snapshot; + } + // enforce(found, "getSnapshot: not found"); + return snapshots; + } + + SnapshotDataChunk[] getChunks(long snapshotId) { + auto queryResult = sql( + q{ + SELECT sc.chunk_index, sc.offset, sc.size, + b.content, b.zstd, b.z_size, b.sha256, b.z_sha256 + FROM snapshot_chunks sc + JOIN blobs b ON b.sha256 = sc.sha256 + WHERE sc.snapshot_id = ? + ORDER BY sc.chunk_index + }, snapshotId + ); + + SnapshotDataChunk[] sdchs; + + foreach (row; queryResult) + { + SnapshotDataChunk sdch; + + sdch.chunkIndex = row["chunk_index"].to!long; + sdch.offset = row["offset"].to!long; + sdch.size = row["size"].to!long; + sdch.content = cast(ubyte[]) row["content"].dup; + sdch.zstd = cast(bool) row["zstd"].to!int; + sdch.zSize = row["z_size"].to!long; + sdch.sha256 = cast(ubyte[]) row["sha256"].dup; + sdch.zSha256 = cast(ubyte[]) row["z_sha256"].dup; + + sdchs ~= sdch; + } + + return sdchs; + } } diff --git a/source/cdcdb/db/package.d b/source/cdcdb/db/package.d index a15624b..5e2232c 100644 --- a/source/cdcdb/db/package.d +++ b/source/cdcdb/db/package.d @@ -1,3 +1,4 @@ module cdcdb.db; public import cdcdb.db.dblite; +public import cdcdb.db.types; diff --git a/source/cdcdb/db/scheme.d b/source/cdcdb/db/scheme.d index adabd60..adfcb88 100644 --- a/source/cdcdb/db/scheme.d +++ b/source/cdcdb/db/scheme.d @@ -10,6 +10,8 @@ auto _scheme = [ -- Путь к исходному файлу, для удобства навигации/поиска. file_path TEXT, + file_sha256 BLOB NOT NULL CHECK (length(file_sha256) = 32), + -- Произвольная метка/название снимка (для человека). label TEXT, @@ -31,9 +33,10 @@ auto _scheme = [ mask_l INTEGER NOT NULL, -- Состояние снимка: - -- "pending" - метаданные созданы, состав не полностью загружен; - -- "ready" - все чанки привязаны, снимок готов к использованию. - status TEXT NOT NULL DEFAULT "pending" CHECK (status IN ("pending","ready")) + -- 0 - "pending" - метаданные созданы, состав не полностью загружен; + -- 1 - "ready" - все чанки привязаны, снимок готов к использованию. + status INTEGER NOT NULL DEFAULT 0 + CHECK (typeof(status) = "integer" AND status IN (0,1)) ) }, q{ @@ -41,29 +44,45 @@ auto _scheme = [ -- Уникальные куски содержимого (дедупликация по sha256) -- ------------------------------------------------------------ CREATE TABLE IF NOT EXISTS blobs ( - -- Хэш содержимого чанка. Обеспечивает уникальность контента. - -- Храним как BLOB(32) (сырые 32 байта SHA-256), а не hex-строку. + -- Хэш содержимого чанка. Храним как BLOB(32) (сырые 32 байта SHA-256). sha256 BLOB PRIMARY KEY CHECK (length(sha256) = 32), - -- Размер чанка в байтах. Должен совпадать с длиной content. + -- Хэш сжатого содержимого (если zstd=1). Может быть NULL при zstd=0. + z_sha256 BLOB, + + -- Размер чанка в байтах (до сжатия). size INTEGER NOT NULL, - -- Сырые байты чанка. + -- Размер сжатого чанка (в байтах). Можно держать NOT NULL и заполнять =size при zstd=0, + -- либо сделать NULL при zstd=0 (см. CHECK ниже допускает NULL). + z_size INTEGER, + + -- Сырые байты: при zstd=1 здесь лежит сжатый блок, иначе - исходные байты. content BLOB NOT NULL, - -- Момент, когда этот контент впервые появился в базе (UTC). - created_utc TEXT NOT NULL DEFAULT (CURRENT_TIMESTAMP), - - -- Последний раз, когда на контент сослались (для аналитики/GC). + -- Таймштампы + created_utc TEXT NOT NULL DEFAULT (CURRENT_TIMESTAMP), last_seen_utc TEXT NOT NULL DEFAULT (CURRENT_TIMESTAMP), - -- Счётчик ссылок: сколько строк в snapshot_chunks ссылаются на этот sha256. - -- Используется для безопасного удаления неиспользуемых blob-ов. + -- Счётчик ссылок из snapshot_chunks refcount INTEGER NOT NULL DEFAULT 0, + -- Флаг сжатия: 0 - без сжатия; 1 - zstd + zstd INTEGER NOT NULL DEFAULT 0 + CHECK (typeof(zstd) = "integer" AND zstd IN (0,1)), + -- Дополнительные гарантии целостности: CHECK (refcount >= 0), - CHECK (size = length(content)) + + -- Если zstd=1, длина content должна равняться z_size; + -- если zstd=0 - длина content должна равняться size. + CHECK ( + (zstd = 1 AND z_size IS NOT NULL AND length(content) = z_size) + OR (zstd = 0 AND length(content) = size ) + ), + + -- Согласованность z_sha256 (если задан) + CHECK (z_sha256 IS NULL OR length(z_sha256) = 32) ) }, q{ @@ -106,6 +125,10 @@ auto _scheme = [ ON DELETE RESTRICT ) }, + q{ + CREATE INDEX IF NOT EXISTS idx_snapshots_path_sha + ON snapshots(file_path, file_sha256) + }, q{ -- Быстрый выбор всех чанков конкретного снимка (частый запрос). CREATE INDEX IF NOT EXISTS idx_snapshot_chunks_snapshot @@ -172,15 +195,15 @@ auto _scheme = [ END }, q{ - -- Автоматическая смена статуса снимка на "ready", + -- Автоматическая смена статуса снимка на 1 - "ready", -- когда сумма размеров его чанков стала равна source_length. -- Примечание: простая эвристика; если потом удалишь/поменяешь чанки, - -- триггер ниже вернёт статус обратно на "pending". + -- триггер ниже вернёт статус обратно на 0 - "pending". CREATE TRIGGER IF NOT EXISTS trg_snapshots_mark_ready AFTER INSERT ON snapshot_chunks BEGIN UPDATE snapshots - SET status = "ready" + SET status = 1 WHERE id = NEW.snapshot_id AND (SELECT COALESCE(SUM(size),0) FROM snapshot_chunks @@ -189,13 +212,13 @@ auto _scheme = [ END }, q{ - -- При удалении любого чанка снимок снова помечается как "pending". - -- Это простой безопасный фоллбэк; следующая вставка приравняет суммы и вернёт "ready". + -- При удалении любого чанка снимок снова помечается как 0 - "pending". + -- Это простой безопасный фоллбэк; следующая вставка приравняет суммы и вернёт 1 - "ready". CREATE TRIGGER IF NOT EXISTS trg_snapshots_mark_pending AFTER DELETE ON snapshot_chunks BEGIN UPDATE snapshots - SET status = "pending" + SET status = 0 WHERE id = OLD.snapshot_id; END } diff --git a/source/cdcdb/db/scheme.md b/source/cdcdb/db/scheme.md index 5ba0d96..70d2776 100644 --- a/source/cdcdb/db/scheme.md +++ b/source/cdcdb/db/scheme.md @@ -149,3 +149,49 @@ sequenceDiagram APP-->>APP: Фиксирует ошибку (несоответствие сумм) end ``` + +## Схема записи в БД + +```mermaid +sequenceDiagram + autonumber + participant APP as Приложение + participant DB as SQLite + participant CH as Разбиение (FastCDC) + participant HS as SHA-256 + + Note over APP,DB: Подготовка к записи + APP->>DB: PRAGMA foreign_keys=ON + APP->>DB: BEGIN IMMEDIATE + + Note over APP,DB: Метаданные снимка + APP->>DB: INSERT INTO snapshots(..., status='pending') + DB-->>APP: snap_id := last_insert_rowid() + + Note over APP,CH: Поток файла → чанки (min/normal/max, mask_s/mask_l) + loop Для каждого чанка по порядку + CH-->>APP: {chunk_index, offset, size, bytes} + + Note over APP,HS: Хеширование + APP->>HS: SHA-256(bytes) + HS-->>APP: sha256 (32 байта) + + Note over APP,DB: Дедупликация содержимого + APP->>DB: INSERT INTO blobs(sha256,size,content) ON CONFLICT DO NOTHING + DB-->>APP: OK (новая строка или уже была) + + Note over APP,DB: Привязка к снимку + APP->>DB: INSERT INTO snapshot_chunks(snapshot_id,chunk_index,offset,size,sha256) + DB-->>APP: OK (триггер ++refcount, last_seen_utc=now) + end + + Note over APP,DB: Валидация и финал + APP->>DB: SELECT SUM(size) FROM snapshot_chunks WHERE snapshot_id = snap_id + DB-->>APP: total_size + alt total_size == snapshots.source_length + Note over DB: триггер mark_ready ставит status='ready' + APP->>DB: COMMIT + else несовпадение / ошибка + APP->>DB: ROLLBACK + end +``` diff --git a/source/cdcdb/db/types.d b/source/cdcdb/db/types.d new file mode 100644 index 0000000..3ef0dcb --- /dev/null +++ b/source/cdcdb/db/types.d @@ -0,0 +1,56 @@ +module cdcdb.db.types; + +enum SnapshotStatus : int +{ + pending = 0, + ready = 1 +} + +struct Snapshot +{ + long id; + string filePath; + ubyte[32] fileSha256; + string label; + string createdUtc; + long sourceLength; + long algoMin; + long algoNormal; + long algoMax; + long maskS; + long maskL; + SnapshotStatus status; +} + +struct Blob +{ + ubyte[32] sha256; // BLOB(32) + ubyte[32] zSha256; // BLOB(32) + long size; + long zSize; + ubyte[] content; // BLOB + string createdUtc; + string lastSeenUtc; + long refcount; + bool zstd; +} + +struct SnapshotChunk +{ + long snapshotId; + long chunkIndex; + long offset; + long size; + ubyte[32] sha256; // BLOB(32) +} + +struct SnapshotDataChunk { + long chunkIndex; + long offset; + long size; + ubyte[] content; + bool zstd; + long zSize; + ubyte[32] sha256; + ubyte[32] zSha256; +} diff --git a/test/app.d b/test/app.d index 5383fda..1597dba 100644 --- a/test/app.d +++ b/test/app.d @@ -6,6 +6,6 @@ import std.file : read; void main() { - auto cas = new CAS("/tmp/base.db"); + auto cas = new CAS("/tmp/base.db", true); cas.saveSnapshot("/tmp/text", cast(ubyte[]) read("/tmp/text")); }