Compare commits

..

2 commits

7 changed files with 436 additions and 26 deletions

View file

@ -6,40 +6,116 @@ import cdcdb.cdc.core;
import std.digest.sha : SHA256, digest;
import std.format : format;
import zstd;
import std.exception : enforce;
import std.stdio : writeln;
import std.conv : to;
import std.file : write;
// CAS-хранилище (Content-Addressable Storage) со снапшотами
final class CAS
{
private:
DBLite _db;
bool _zstd;
public:
this(string database)
this(string database, bool zstd = false)
{
_db = new DBLite(database);
_zstd = zstd;
}
size_t saveSnapshot(string filePath, const(ubyte)[] data)
{
ubyte[32] hashSource = digest!SHA256(data);
// Сделать запрос в БД по filePath и сверить хеш файлов
// writeln(hashSource.length);
// Параметры для CDC вынести в отдельные настройки (продумать)
auto cdc = new CDC(100, 200, 500, 0xFF, 0x0F);
auto cdc = new CDC(300, 700, 1000, 0xFF, 0x0F);
// Разбить на фрагменты
auto chunks = cdc.split(data);
import std.stdio : writeln;
Snapshot snapshot;
snapshot.filePath = filePath;
snapshot.fileSha256 = hashSource;
snapshot.label = "Файл для теста";
snapshot.sourceLength = data.length;
_db.beginImmediate();
auto idSnapshot = _db.addSnapshot(snapshot);
SnapshotChunk snapshotChunk;
Blob blob;
blob.zstd = _zstd;
// Записать фрагменты в БД
foreach (chunk; chunks)
{
writeln(format("%(%02x%)", chunk.sha256));
blob.sha256 = chunk.sha256;
blob.size = chunk.size;
auto content = data[chunk.offset .. chunk.offset + chunk.size];
if (_zstd) {
ubyte[] zBytes = compress(content, 22);
size_t zSize = zBytes.length;
ubyte[32] zHash = digest!SHA256(zBytes);
blob.zSize = zSize;
blob.zSha256 = zHash;
blob.content = zBytes;
} else {
blob.content = content.dup;
}
_db.addBlob(blob);
snapshotChunk.snapshotId = idSnapshot;
snapshotChunk.chunkIndex = chunk.index;
snapshotChunk.offset = chunk.offset;
snapshotChunk.size = chunk.size;
snapshotChunk.sha256 = chunk.sha256;
_db.addSnapshotChunk(snapshotChunk);
}
_db.commit();
// Записать манифест в БД
// Вернуть ID манифеста
return 0;
}
void restoreSnapshot()
{
string restoreFile = "/tmp/restore.d";
foreach (Snapshot snapshot; _db.getSnapshots("/tmp/text")) {
auto dataChunks = _db.getChunks(snapshot.id);
ubyte[] content;
foreach (SnapshotDataChunk chunk; dataChunks) {
ubyte[] bytes;
if (chunk.zstd) {
enforce(chunk.zSize == chunk.content.length, "Размер сжатого фрагмента не соответствует ожидаемому");
bytes = cast(ubyte[]) uncompress(chunk.content);
} else {
bytes = chunk.content;
}
enforce(chunk.size == bytes.length, "Оригинальный размер не соответствует ожидаемому");
content ~= bytes;
}
enforce(snapshot.fileSha256 == digest!SHA256(content), "Хеш-сумма файла не совпадает");
write(snapshot.filePath ~ snapshot.id.to!string, content);
}
}
}

View file

@ -2,6 +2,10 @@ module cdcdb.db.dblite;
import arsd.sqlite;
import std.file : exists, isFile;
import std.exception : enforce;
import std.conv : to;
import cdcdb.db.types;
final class DBLite : Sqlite
{
@ -45,5 +49,209 @@ public:
query("ROLLBACK");
}
// findFile()
// Snapshot getSnapshot(string filePath, immutable ubyte[32] sha256)
// {
// auto queryResult = sql(
// q{
// SELECT * FROM snapshots
// WHERE file_path = ? AND file_sha256 = ?
// }, filePath, sha256
// );
// }
long addSnapshot(Snapshot snapshot)
{
auto queryResult = sql(
q{
INSERT INTO snapshots(
file_path,
file_sha256,
label,
source_length,
algo_min,
algo_normal,
algo_max,
mask_s,
mask_l,
status
) VALUES (?,?,?,?,?,?,?,?,?,?)
RETURNING id
},
snapshot.filePath,
snapshot.fileSha256[],
snapshot.label,
snapshot.sourceLength,
snapshot.algoMin,
snapshot.algoNormal,
snapshot.algoMax,
snapshot.maskS,
snapshot.maskL,
snapshot.status.to!int
);
if (!queryResult.empty())
return queryResult.front()["id"].to!long;
return 0;
}
void addBlob(Blob blob)
{
sql(
q{
INSERT INTO blobs (sha256, z_sha256, size, z_size, content, zstd)
VALUES (?,?,?,?,?,?)
ON CONFLICT (sha256) DO NOTHING
},
blob.sha256[],
blob.zSize ? blob.zSha256[] : null,
blob.size,
blob.zSize,
blob.content,
blob.zstd.to!int
);
}
void addSnapshotChunk(SnapshotChunk snapshotChunk)
{
sql(
q{
INSERT INTO snapshot_chunks (snapshot_id, chunk_index, offset, size, sha256)
VALUES(?,?,?,?,?)
},
snapshotChunk.snapshotId,
snapshotChunk.chunkIndex,
snapshotChunk.offset,
snapshotChunk.size,
snapshotChunk.sha256[]
);
}
// struct ChunkInput
// {
// long index;
// long offset;
// long size;
// ubyte[32] sha256;
// const(ubyte)[] content;
// }
// long saveSnapshotWithChunks(
// string filePath, string label, long sourceLength,
// long algoMin, long algoNormal, long algoMax,
// long maskS, long maskL,
// const ChunkInput[] chunks
// )
// {
// beginImmediate();
// bool ok;
// scope (exit)
// {
// if (!ok)
// rollback();
// }
// scope (success)
// {
// commit();
// }
// const snapId = insertSnapshotMeta(
// filePath, label, sourceLength,
// algoMin, algoNormal, algoMax,
// maskS, maskL, SnapshotStatus.pending
// );
// foreach (c; chunks)
// {
// insertBlobIfMissing(c.sha256, c.size, c.content);
// insertSnapshotChunk(snapId, c.index, c.offset, c.size, c.sha256);
// }
// ok = true;
// return snapId;
// }
// // --- чтение ---
Snapshot[] getSnapshots(string filePath)
{
auto queryResult = sql(
q{
SELECT id, file_path, file_sha256, label, created_utc, source_length,
algo_min, algo_normal, algo_max, mask_s, mask_l, status
FROM snapshots WHERE file_path = ?
}, filePath
);
Snapshot[] snapshots;
// bool found = false;
foreach (row; queryResult)
{
Snapshot snapshot;
snapshot.id = row["id"].to!long;
snapshot.filePath = row["file_path"].to!string;
snapshot.fileSha256 = cast(ubyte[]) row["file_sha256"].dup;
snapshot.label = row["label"].to!string;
snapshot.createdUtc = row["created_utc"].to!string;
snapshot.sourceLength = row["source_length"].to!long;
snapshot.algoMin = row["algo_min"].to!long;
snapshot.algoNormal = row["algo_normal"].to!long;
snapshot.algoMax = row["algo_max"].to!long;
snapshot.maskS = row["mask_s"].to!long;
snapshot.maskL = row["mask_l"].to!long;
snapshot.status = cast(SnapshotStatus)row["status"].to!int;
// found = true;
snapshots ~= snapshot;
}
// enforce(found, "getSnapshot: not found");
return snapshots;
}
SnapshotDataChunk[] getChunks(long snapshotId) {
auto queryResult = sql(
q{
SELECT sc.chunk_index, sc.offset, sc.size,
b.content, b.zstd, b.z_size, b.sha256, b.z_sha256
FROM snapshot_chunks sc
JOIN blobs b ON b.sha256 = sc.sha256
WHERE sc.snapshot_id = ?
ORDER BY sc.chunk_index
}, snapshotId
);
SnapshotDataChunk[] sdchs;
foreach (row; queryResult)
{
SnapshotDataChunk sdch;
sdch.chunkIndex = row["chunk_index"].to!long;
sdch.offset = row["offset"].to!long;
sdch.size = row["size"].to!long;
sdch.content = cast(ubyte[]) row["content"].dup;
sdch.zstd = cast(bool) row["zstd"].to!int;
sdch.zSize = row["z_size"].to!long;
sdch.sha256 = cast(ubyte[]) row["sha256"].dup;
sdch.zSha256 = cast(ubyte[]) row["z_sha256"].dup;
sdchs ~= sdch;
}
return sdchs;
}
}

View file

@ -1,3 +1,4 @@
module cdcdb.db;
public import cdcdb.db.dblite;
public import cdcdb.db.types;

View file

@ -10,6 +10,8 @@ auto _scheme = [
-- Путь к исходному файлу, для удобства навигации/поиска.
file_path TEXT,
file_sha256 BLOB NOT NULL CHECK (length(file_sha256) = 32),
-- Произвольная метка/название снимка (для человека).
label TEXT,
@ -31,9 +33,10 @@ auto _scheme = [
mask_l INTEGER NOT NULL,
-- Состояние снимка:
-- "pending" - метаданные созданы, состав не полностью загружен;
-- "ready" - все чанки привязаны, снимок готов к использованию.
status TEXT NOT NULL DEFAULT "pending" CHECK (status IN ("pending","ready"))
-- 0 - "pending" - метаданные созданы, состав не полностью загружен;
-- 1 - "ready" - все чанки привязаны, снимок готов к использованию.
status INTEGER NOT NULL DEFAULT 0
CHECK (typeof(status) = "integer" AND status IN (0,1))
)
},
q{
@ -41,29 +44,45 @@ auto _scheme = [
-- Уникальные куски содержимого (дедупликация по sha256)
-- ------------------------------------------------------------
CREATE TABLE IF NOT EXISTS blobs (
-- Хэш содержимого чанка. Обеспечивает уникальность контента.
-- Храним как BLOB(32) (сырые 32 байта SHA-256), а не hex-строку.
-- Хэш содержимого чанка. Храним как BLOB(32) (сырые 32 байта SHA-256).
sha256 BLOB PRIMARY KEY CHECK (length(sha256) = 32),
-- Размер чанка в байтах. Должен совпадать с длиной content.
-- Хэш сжатого содержимого (если zstd=1). Может быть NULL при zstd=0.
z_sha256 BLOB,
-- Размер чанка в байтах (до сжатия).
size INTEGER NOT NULL,
-- Сырые байты чанка.
-- Размер сжатого чанка (в байтах). Можно держать NOT NULL и заполнять =size при zstd=0,
-- либо сделать NULL при zstd=0 (см. CHECK ниже допускает NULL).
z_size INTEGER,
-- Сырые байты: при zstd=1 здесь лежит сжатый блок, иначе - исходные байты.
content BLOB NOT NULL,
-- Момент, когда этот контент впервые появился в базе (UTC).
created_utc TEXT NOT NULL DEFAULT (CURRENT_TIMESTAMP),
-- Последний раз, когда на контент сослались (для аналитики/GC).
-- Таймштампы
created_utc TEXT NOT NULL DEFAULT (CURRENT_TIMESTAMP),
last_seen_utc TEXT NOT NULL DEFAULT (CURRENT_TIMESTAMP),
-- Счётчик ссылок: сколько строк в snapshot_chunks ссылаются на этот sha256.
-- Используется для безопасного удаления неиспользуемых blob-ов.
-- Счётчик ссылок из snapshot_chunks
refcount INTEGER NOT NULL DEFAULT 0,
-- Флаг сжатия: 0 - без сжатия; 1 - zstd
zstd INTEGER NOT NULL DEFAULT 0
CHECK (typeof(zstd) = "integer" AND zstd IN (0,1)),
-- Дополнительные гарантии целостности:
CHECK (refcount >= 0),
CHECK (size = length(content))
-- Если zstd=1, длина content должна равняться z_size;
-- если zstd=0 - длина content должна равняться size.
CHECK (
(zstd = 1 AND z_size IS NOT NULL AND length(content) = z_size)
OR (zstd = 0 AND length(content) = size )
),
-- Согласованность z_sha256 (если задан)
CHECK (z_sha256 IS NULL OR length(z_sha256) = 32)
)
},
q{
@ -106,6 +125,10 @@ auto _scheme = [
ON DELETE RESTRICT
)
},
q{
CREATE INDEX IF NOT EXISTS idx_snapshots_path_sha
ON snapshots(file_path, file_sha256)
},
q{
-- Быстрый выбор всех чанков конкретного снимка (частый запрос).
CREATE INDEX IF NOT EXISTS idx_snapshot_chunks_snapshot
@ -172,15 +195,15 @@ auto _scheme = [
END
},
q{
-- Автоматическая смена статуса снимка на "ready",
-- Автоматическая смена статуса снимка на 1 - "ready",
-- когда сумма размеров его чанков стала равна source_length.
-- Примечание: простая эвристика; если потом удалишь/поменяешь чанки,
-- триггер ниже вернёт статус обратно на "pending".
-- триггер ниже вернёт статус обратно на 0 - "pending".
CREATE TRIGGER IF NOT EXISTS trg_snapshots_mark_ready
AFTER INSERT ON snapshot_chunks
BEGIN
UPDATE snapshots
SET status = "ready"
SET status = 1
WHERE id = NEW.snapshot_id
AND (SELECT COALESCE(SUM(size),0)
FROM snapshot_chunks
@ -189,13 +212,13 @@ auto _scheme = [
END
},
q{
-- При удалении любого чанка снимок снова помечается как "pending".
-- Это простой безопасный фоллбэк; следующая вставка приравняет суммы и вернёт "ready".
-- При удалении любого чанка снимок снова помечается как 0 - "pending".
-- Это простой безопасный фоллбэк; следующая вставка приравняет суммы и вернёт 1 - "ready".
CREATE TRIGGER IF NOT EXISTS trg_snapshots_mark_pending
AFTER DELETE ON snapshot_chunks
BEGIN
UPDATE snapshots
SET status = "pending"
SET status = 0
WHERE id = OLD.snapshot_id;
END
}

View file

@ -149,3 +149,49 @@ sequenceDiagram
APP-->>APP: Фиксирует ошибку (несоответствие сумм)
end
```
## Схема записи в БД
```mermaid
sequenceDiagram
autonumber
participant APP as Приложение
participant DB as SQLite
participant CH as Разбиение (FastCDC)
participant HS as SHA-256
Note over APP,DB: Подготовка к записи
APP->>DB: PRAGMA foreign_keys=ON
APP->>DB: BEGIN IMMEDIATE
Note over APP,DB: Метаданные снимка
APP->>DB: INSERT INTO snapshots(..., status='pending')
DB-->>APP: snap_id := last_insert_rowid()
Note over APP,CH: Поток файла → чанки (min/normal/max, mask_s/mask_l)
loop Для каждого чанка по порядку
CH-->>APP: {chunk_index, offset, size, bytes}
Note over APP,HS: Хеширование
APP->>HS: SHA-256(bytes)
HS-->>APP: sha256 (32 байта)
Note over APP,DB: Дедупликация содержимого
APP->>DB: INSERT INTO blobs(sha256,size,content) ON CONFLICT DO NOTHING
DB-->>APP: OK (новая строка или уже была)
Note over APP,DB: Привязка к снимку
APP->>DB: INSERT INTO snapshot_chunks(snapshot_id,chunk_index,offset,size,sha256)
DB-->>APP: OK (триггер ++refcount, last_seen_utc=now)
end
Note over APP,DB: Валидация и финал
APP->>DB: SELECT SUM(size) FROM snapshot_chunks WHERE snapshot_id = snap_id
DB-->>APP: total_size
alt total_size == snapshots.source_length
Note over DB: триггер mark_ready ставит status='ready'
APP->>DB: COMMIT
else несовпадение / ошибка
APP->>DB: ROLLBACK
end
```

56
source/cdcdb/db/types.d Normal file
View file

@ -0,0 +1,56 @@
module cdcdb.db.types;
enum SnapshotStatus : int
{
pending = 0,
ready = 1
}
struct Snapshot
{
long id;
string filePath;
ubyte[32] fileSha256;
string label;
string createdUtc;
long sourceLength;
long algoMin;
long algoNormal;
long algoMax;
long maskS;
long maskL;
SnapshotStatus status;
}
struct Blob
{
ubyte[32] sha256; // BLOB(32)
ubyte[32] zSha256; // BLOB(32)
long size;
long zSize;
ubyte[] content; // BLOB
string createdUtc;
string lastSeenUtc;
long refcount;
bool zstd;
}
struct SnapshotChunk
{
long snapshotId;
long chunkIndex;
long offset;
long size;
ubyte[32] sha256; // BLOB(32)
}
struct SnapshotDataChunk {
long chunkIndex;
long offset;
long size;
ubyte[] content;
bool zstd;
long zSize;
ubyte[32] sha256;
ubyte[32] zSha256;
}

View file

@ -6,6 +6,6 @@ import std.file : read;
void main()
{
auto cas = new CAS("/tmp/base.db");
auto cas = new CAS("/tmp/base.db", true);
cas.saveSnapshot("/tmp/text", cast(ubyte[]) read("/tmp/text"));
}