Compare commits
2 commits
18bcf3742d
...
024f37cb3b
Author | SHA1 | Date | |
---|---|---|---|
024f37cb3b | |||
8631c65e39 |
7 changed files with 436 additions and 26 deletions
|
@ -6,40 +6,116 @@ import cdcdb.cdc.core;
|
|||
import std.digest.sha : SHA256, digest;
|
||||
import std.format : format;
|
||||
|
||||
import zstd;
|
||||
|
||||
import std.exception : enforce;
|
||||
import std.stdio : writeln;
|
||||
import std.conv : to;
|
||||
|
||||
import std.file : write;
|
||||
|
||||
// CAS-хранилище (Content-Addressable Storage) со снапшотами
|
||||
final class CAS
|
||||
{
|
||||
private:
|
||||
DBLite _db;
|
||||
bool _zstd;
|
||||
public:
|
||||
this(string database)
|
||||
this(string database, bool zstd = false)
|
||||
{
|
||||
_db = new DBLite(database);
|
||||
_zstd = zstd;
|
||||
}
|
||||
|
||||
size_t saveSnapshot(string filePath, const(ubyte)[] data)
|
||||
{
|
||||
ubyte[32] hashSource = digest!SHA256(data);
|
||||
// Сделать запрос в БД по filePath и сверить хеш файлов
|
||||
|
||||
// writeln(hashSource.length);
|
||||
|
||||
|
||||
|
||||
// Параметры для CDC вынести в отдельные настройки (продумать)
|
||||
auto cdc = new CDC(100, 200, 500, 0xFF, 0x0F);
|
||||
auto cdc = new CDC(300, 700, 1000, 0xFF, 0x0F);
|
||||
// Разбить на фрагменты
|
||||
auto chunks = cdc.split(data);
|
||||
|
||||
import std.stdio : writeln;
|
||||
Snapshot snapshot;
|
||||
snapshot.filePath = filePath;
|
||||
snapshot.fileSha256 = hashSource;
|
||||
snapshot.label = "Файл для теста";
|
||||
snapshot.sourceLength = data.length;
|
||||
|
||||
_db.beginImmediate();
|
||||
|
||||
auto idSnapshot = _db.addSnapshot(snapshot);
|
||||
|
||||
SnapshotChunk snapshotChunk;
|
||||
Blob blob;
|
||||
|
||||
blob.zstd = _zstd;
|
||||
|
||||
// Записать фрагменты в БД
|
||||
foreach (chunk; chunks)
|
||||
{
|
||||
writeln(format("%(%02x%)", chunk.sha256));
|
||||
blob.sha256 = chunk.sha256;
|
||||
blob.size = chunk.size;
|
||||
|
||||
auto content = data[chunk.offset .. chunk.offset + chunk.size];
|
||||
|
||||
if (_zstd) {
|
||||
ubyte[] zBytes = compress(content, 22);
|
||||
size_t zSize = zBytes.length;
|
||||
ubyte[32] zHash = digest!SHA256(zBytes);
|
||||
|
||||
blob.zSize = zSize;
|
||||
blob.zSha256 = zHash;
|
||||
blob.content = zBytes;
|
||||
} else {
|
||||
blob.content = content.dup;
|
||||
}
|
||||
|
||||
_db.addBlob(blob);
|
||||
|
||||
snapshotChunk.snapshotId = idSnapshot;
|
||||
snapshotChunk.chunkIndex = chunk.index;
|
||||
snapshotChunk.offset = chunk.offset;
|
||||
snapshotChunk.size = chunk.size;
|
||||
snapshotChunk.sha256 = chunk.sha256;
|
||||
|
||||
_db.addSnapshotChunk(snapshotChunk);
|
||||
}
|
||||
_db.commit();
|
||||
// Записать манифест в БД
|
||||
// Вернуть ID манифеста
|
||||
return 0;
|
||||
}
|
||||
|
||||
void restoreSnapshot()
|
||||
{
|
||||
string restoreFile = "/tmp/restore.d";
|
||||
|
||||
foreach (Snapshot snapshot; _db.getSnapshots("/tmp/text")) {
|
||||
auto dataChunks = _db.getChunks(snapshot.id);
|
||||
ubyte[] content;
|
||||
|
||||
foreach (SnapshotDataChunk chunk; dataChunks) {
|
||||
ubyte[] bytes;
|
||||
if (chunk.zstd) {
|
||||
enforce(chunk.zSize == chunk.content.length, "Размер сжатого фрагмента не соответствует ожидаемому");
|
||||
bytes = cast(ubyte[]) uncompress(chunk.content);
|
||||
} else {
|
||||
bytes = chunk.content;
|
||||
}
|
||||
enforce(chunk.size == bytes.length, "Оригинальный размер не соответствует ожидаемому");
|
||||
|
||||
content ~= bytes;
|
||||
}
|
||||
|
||||
enforce(snapshot.fileSha256 == digest!SHA256(content), "Хеш-сумма файла не совпадает");
|
||||
|
||||
write(snapshot.filePath ~ snapshot.id.to!string, content);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,6 +2,10 @@ module cdcdb.db.dblite;
|
|||
|
||||
import arsd.sqlite;
|
||||
import std.file : exists, isFile;
|
||||
import std.exception : enforce;
|
||||
import std.conv : to;
|
||||
|
||||
import cdcdb.db.types;
|
||||
|
||||
final class DBLite : Sqlite
|
||||
{
|
||||
|
@ -45,5 +49,209 @@ public:
|
|||
query("ROLLBACK");
|
||||
}
|
||||
|
||||
// findFile()
|
||||
// Snapshot getSnapshot(string filePath, immutable ubyte[32] sha256)
|
||||
// {
|
||||
// auto queryResult = sql(
|
||||
// q{
|
||||
// SELECT * FROM snapshots
|
||||
// WHERE file_path = ? AND file_sha256 = ?
|
||||
// }, filePath, sha256
|
||||
// );
|
||||
// }
|
||||
|
||||
long addSnapshot(Snapshot snapshot)
|
||||
{
|
||||
auto queryResult = sql(
|
||||
q{
|
||||
INSERT INTO snapshots(
|
||||
file_path,
|
||||
file_sha256,
|
||||
label,
|
||||
source_length,
|
||||
algo_min,
|
||||
algo_normal,
|
||||
algo_max,
|
||||
mask_s,
|
||||
mask_l,
|
||||
status
|
||||
) VALUES (?,?,?,?,?,?,?,?,?,?)
|
||||
RETURNING id
|
||||
},
|
||||
snapshot.filePath,
|
||||
snapshot.fileSha256[],
|
||||
snapshot.label,
|
||||
snapshot.sourceLength,
|
||||
snapshot.algoMin,
|
||||
snapshot.algoNormal,
|
||||
snapshot.algoMax,
|
||||
snapshot.maskS,
|
||||
snapshot.maskL,
|
||||
snapshot.status.to!int
|
||||
);
|
||||
|
||||
if (!queryResult.empty())
|
||||
return queryResult.front()["id"].to!long;
|
||||
return 0;
|
||||
}
|
||||
|
||||
void addBlob(Blob blob)
|
||||
{
|
||||
sql(
|
||||
q{
|
||||
INSERT INTO blobs (sha256, z_sha256, size, z_size, content, zstd)
|
||||
VALUES (?,?,?,?,?,?)
|
||||
ON CONFLICT (sha256) DO NOTHING
|
||||
},
|
||||
blob.sha256[],
|
||||
blob.zSize ? blob.zSha256[] : null,
|
||||
blob.size,
|
||||
blob.zSize,
|
||||
blob.content,
|
||||
blob.zstd.to!int
|
||||
);
|
||||
}
|
||||
|
||||
void addSnapshotChunk(SnapshotChunk snapshotChunk)
|
||||
{
|
||||
sql(
|
||||
q{
|
||||
INSERT INTO snapshot_chunks (snapshot_id, chunk_index, offset, size, sha256)
|
||||
VALUES(?,?,?,?,?)
|
||||
},
|
||||
snapshotChunk.snapshotId,
|
||||
snapshotChunk.chunkIndex,
|
||||
snapshotChunk.offset,
|
||||
snapshotChunk.size,
|
||||
snapshotChunk.sha256[]
|
||||
);
|
||||
}
|
||||
|
||||
// struct ChunkInput
|
||||
// {
|
||||
// long index;
|
||||
// long offset;
|
||||
// long size;
|
||||
// ubyte[32] sha256;
|
||||
// const(ubyte)[] content;
|
||||
// }
|
||||
|
||||
// long saveSnapshotWithChunks(
|
||||
// string filePath, string label, long sourceLength,
|
||||
// long algoMin, long algoNormal, long algoMax,
|
||||
// long maskS, long maskL,
|
||||
// const ChunkInput[] chunks
|
||||
// )
|
||||
// {
|
||||
// beginImmediate();
|
||||
|
||||
// bool ok;
|
||||
|
||||
// scope (exit)
|
||||
// {
|
||||
// if (!ok)
|
||||
// rollback();
|
||||
// }
|
||||
// scope (success)
|
||||
// {
|
||||
// commit();
|
||||
// }
|
||||
|
||||
// const snapId = insertSnapshotMeta(
|
||||
// filePath, label, sourceLength,
|
||||
// algoMin, algoNormal, algoMax,
|
||||
// maskS, maskL, SnapshotStatus.pending
|
||||
// );
|
||||
|
||||
// foreach (c; chunks)
|
||||
// {
|
||||
// insertBlobIfMissing(c.sha256, c.size, c.content);
|
||||
// insertSnapshotChunk(snapId, c.index, c.offset, c.size, c.sha256);
|
||||
// }
|
||||
|
||||
// ok = true;
|
||||
|
||||
// return snapId;
|
||||
// }
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
// // --- чтение ---
|
||||
|
||||
Snapshot[] getSnapshots(string filePath)
|
||||
{
|
||||
auto queryResult = sql(
|
||||
q{
|
||||
SELECT id, file_path, file_sha256, label, created_utc, source_length,
|
||||
algo_min, algo_normal, algo_max, mask_s, mask_l, status
|
||||
FROM snapshots WHERE file_path = ?
|
||||
}, filePath
|
||||
);
|
||||
|
||||
Snapshot[] snapshots;
|
||||
// bool found = false;
|
||||
foreach (row; queryResult)
|
||||
{
|
||||
Snapshot snapshot;
|
||||
|
||||
snapshot.id = row["id"].to!long;
|
||||
snapshot.filePath = row["file_path"].to!string;
|
||||
snapshot.fileSha256 = cast(ubyte[]) row["file_sha256"].dup;
|
||||
snapshot.label = row["label"].to!string;
|
||||
snapshot.createdUtc = row["created_utc"].to!string;
|
||||
snapshot.sourceLength = row["source_length"].to!long;
|
||||
snapshot.algoMin = row["algo_min"].to!long;
|
||||
snapshot.algoNormal = row["algo_normal"].to!long;
|
||||
snapshot.algoMax = row["algo_max"].to!long;
|
||||
snapshot.maskS = row["mask_s"].to!long;
|
||||
snapshot.maskL = row["mask_l"].to!long;
|
||||
snapshot.status = cast(SnapshotStatus)row["status"].to!int;
|
||||
// found = true;
|
||||
snapshots ~= snapshot;
|
||||
}
|
||||
// enforce(found, "getSnapshot: not found");
|
||||
return snapshots;
|
||||
}
|
||||
|
||||
SnapshotDataChunk[] getChunks(long snapshotId) {
|
||||
auto queryResult = sql(
|
||||
q{
|
||||
SELECT sc.chunk_index, sc.offset, sc.size,
|
||||
b.content, b.zstd, b.z_size, b.sha256, b.z_sha256
|
||||
FROM snapshot_chunks sc
|
||||
JOIN blobs b ON b.sha256 = sc.sha256
|
||||
WHERE sc.snapshot_id = ?
|
||||
ORDER BY sc.chunk_index
|
||||
}, snapshotId
|
||||
);
|
||||
|
||||
SnapshotDataChunk[] sdchs;
|
||||
|
||||
foreach (row; queryResult)
|
||||
{
|
||||
SnapshotDataChunk sdch;
|
||||
|
||||
sdch.chunkIndex = row["chunk_index"].to!long;
|
||||
sdch.offset = row["offset"].to!long;
|
||||
sdch.size = row["size"].to!long;
|
||||
sdch.content = cast(ubyte[]) row["content"].dup;
|
||||
sdch.zstd = cast(bool) row["zstd"].to!int;
|
||||
sdch.zSize = row["z_size"].to!long;
|
||||
sdch.sha256 = cast(ubyte[]) row["sha256"].dup;
|
||||
sdch.zSha256 = cast(ubyte[]) row["z_sha256"].dup;
|
||||
|
||||
sdchs ~= sdch;
|
||||
}
|
||||
|
||||
return sdchs;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
module cdcdb.db;
|
||||
|
||||
public import cdcdb.db.dblite;
|
||||
public import cdcdb.db.types;
|
||||
|
|
|
@ -10,6 +10,8 @@ auto _scheme = [
|
|||
-- Путь к исходному файлу, для удобства навигации/поиска.
|
||||
file_path TEXT,
|
||||
|
||||
file_sha256 BLOB NOT NULL CHECK (length(file_sha256) = 32),
|
||||
|
||||
-- Произвольная метка/название снимка (для человека).
|
||||
label TEXT,
|
||||
|
||||
|
@ -31,9 +33,10 @@ auto _scheme = [
|
|||
mask_l INTEGER NOT NULL,
|
||||
|
||||
-- Состояние снимка:
|
||||
-- "pending" - метаданные созданы, состав не полностью загружен;
|
||||
-- "ready" - все чанки привязаны, снимок готов к использованию.
|
||||
status TEXT NOT NULL DEFAULT "pending" CHECK (status IN ("pending","ready"))
|
||||
-- 0 - "pending" - метаданные созданы, состав не полностью загружен;
|
||||
-- 1 - "ready" - все чанки привязаны, снимок готов к использованию.
|
||||
status INTEGER NOT NULL DEFAULT 0
|
||||
CHECK (typeof(status) = "integer" AND status IN (0,1))
|
||||
)
|
||||
},
|
||||
q{
|
||||
|
@ -41,29 +44,45 @@ auto _scheme = [
|
|||
-- Уникальные куски содержимого (дедупликация по sha256)
|
||||
-- ------------------------------------------------------------
|
||||
CREATE TABLE IF NOT EXISTS blobs (
|
||||
-- Хэш содержимого чанка. Обеспечивает уникальность контента.
|
||||
-- Храним как BLOB(32) (сырые 32 байта SHA-256), а не hex-строку.
|
||||
-- Хэш содержимого чанка. Храним как BLOB(32) (сырые 32 байта SHA-256).
|
||||
sha256 BLOB PRIMARY KEY CHECK (length(sha256) = 32),
|
||||
|
||||
-- Размер чанка в байтах. Должен совпадать с длиной content.
|
||||
-- Хэш сжатого содержимого (если zstd=1). Может быть NULL при zstd=0.
|
||||
z_sha256 BLOB,
|
||||
|
||||
-- Размер чанка в байтах (до сжатия).
|
||||
size INTEGER NOT NULL,
|
||||
|
||||
-- Сырые байты чанка.
|
||||
-- Размер сжатого чанка (в байтах). Можно держать NOT NULL и заполнять =size при zstd=0,
|
||||
-- либо сделать NULL при zstd=0 (см. CHECK ниже допускает NULL).
|
||||
z_size INTEGER,
|
||||
|
||||
-- Сырые байты: при zstd=1 здесь лежит сжатый блок, иначе - исходные байты.
|
||||
content BLOB NOT NULL,
|
||||
|
||||
-- Момент, когда этот контент впервые появился в базе (UTC).
|
||||
created_utc TEXT NOT NULL DEFAULT (CURRENT_TIMESTAMP),
|
||||
|
||||
-- Последний раз, когда на контент сослались (для аналитики/GC).
|
||||
-- Таймштампы
|
||||
created_utc TEXT NOT NULL DEFAULT (CURRENT_TIMESTAMP),
|
||||
last_seen_utc TEXT NOT NULL DEFAULT (CURRENT_TIMESTAMP),
|
||||
|
||||
-- Счётчик ссылок: сколько строк в snapshot_chunks ссылаются на этот sha256.
|
||||
-- Используется для безопасного удаления неиспользуемых blob-ов.
|
||||
-- Счётчик ссылок из snapshot_chunks
|
||||
refcount INTEGER NOT NULL DEFAULT 0,
|
||||
|
||||
-- Флаг сжатия: 0 - без сжатия; 1 - zstd
|
||||
zstd INTEGER NOT NULL DEFAULT 0
|
||||
CHECK (typeof(zstd) = "integer" AND zstd IN (0,1)),
|
||||
|
||||
-- Дополнительные гарантии целостности:
|
||||
CHECK (refcount >= 0),
|
||||
CHECK (size = length(content))
|
||||
|
||||
-- Если zstd=1, длина content должна равняться z_size;
|
||||
-- если zstd=0 - длина content должна равняться size.
|
||||
CHECK (
|
||||
(zstd = 1 AND z_size IS NOT NULL AND length(content) = z_size)
|
||||
OR (zstd = 0 AND length(content) = size )
|
||||
),
|
||||
|
||||
-- Согласованность z_sha256 (если задан)
|
||||
CHECK (z_sha256 IS NULL OR length(z_sha256) = 32)
|
||||
)
|
||||
},
|
||||
q{
|
||||
|
@ -106,6 +125,10 @@ auto _scheme = [
|
|||
ON DELETE RESTRICT
|
||||
)
|
||||
},
|
||||
q{
|
||||
CREATE INDEX IF NOT EXISTS idx_snapshots_path_sha
|
||||
ON snapshots(file_path, file_sha256)
|
||||
},
|
||||
q{
|
||||
-- Быстрый выбор всех чанков конкретного снимка (частый запрос).
|
||||
CREATE INDEX IF NOT EXISTS idx_snapshot_chunks_snapshot
|
||||
|
@ -172,15 +195,15 @@ auto _scheme = [
|
|||
END
|
||||
},
|
||||
q{
|
||||
-- Автоматическая смена статуса снимка на "ready",
|
||||
-- Автоматическая смена статуса снимка на 1 - "ready",
|
||||
-- когда сумма размеров его чанков стала равна source_length.
|
||||
-- Примечание: простая эвристика; если потом удалишь/поменяешь чанки,
|
||||
-- триггер ниже вернёт статус обратно на "pending".
|
||||
-- триггер ниже вернёт статус обратно на 0 - "pending".
|
||||
CREATE TRIGGER IF NOT EXISTS trg_snapshots_mark_ready
|
||||
AFTER INSERT ON snapshot_chunks
|
||||
BEGIN
|
||||
UPDATE snapshots
|
||||
SET status = "ready"
|
||||
SET status = 1
|
||||
WHERE id = NEW.snapshot_id
|
||||
AND (SELECT COALESCE(SUM(size),0)
|
||||
FROM snapshot_chunks
|
||||
|
@ -189,13 +212,13 @@ auto _scheme = [
|
|||
END
|
||||
},
|
||||
q{
|
||||
-- При удалении любого чанка снимок снова помечается как "pending".
|
||||
-- Это простой безопасный фоллбэк; следующая вставка приравняет суммы и вернёт "ready".
|
||||
-- При удалении любого чанка снимок снова помечается как 0 - "pending".
|
||||
-- Это простой безопасный фоллбэк; следующая вставка приравняет суммы и вернёт 1 - "ready".
|
||||
CREATE TRIGGER IF NOT EXISTS trg_snapshots_mark_pending
|
||||
AFTER DELETE ON snapshot_chunks
|
||||
BEGIN
|
||||
UPDATE snapshots
|
||||
SET status = "pending"
|
||||
SET status = 0
|
||||
WHERE id = OLD.snapshot_id;
|
||||
END
|
||||
}
|
||||
|
|
|
@ -149,3 +149,49 @@ sequenceDiagram
|
|||
APP-->>APP: Фиксирует ошибку (несоответствие сумм)
|
||||
end
|
||||
```
|
||||
|
||||
## Схема записи в БД
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
autonumber
|
||||
participant APP as Приложение
|
||||
participant DB as SQLite
|
||||
participant CH as Разбиение (FastCDC)
|
||||
participant HS as SHA-256
|
||||
|
||||
Note over APP,DB: Подготовка к записи
|
||||
APP->>DB: PRAGMA foreign_keys=ON
|
||||
APP->>DB: BEGIN IMMEDIATE
|
||||
|
||||
Note over APP,DB: Метаданные снимка
|
||||
APP->>DB: INSERT INTO snapshots(..., status='pending')
|
||||
DB-->>APP: snap_id := last_insert_rowid()
|
||||
|
||||
Note over APP,CH: Поток файла → чанки (min/normal/max, mask_s/mask_l)
|
||||
loop Для каждого чанка по порядку
|
||||
CH-->>APP: {chunk_index, offset, size, bytes}
|
||||
|
||||
Note over APP,HS: Хеширование
|
||||
APP->>HS: SHA-256(bytes)
|
||||
HS-->>APP: sha256 (32 байта)
|
||||
|
||||
Note over APP,DB: Дедупликация содержимого
|
||||
APP->>DB: INSERT INTO blobs(sha256,size,content) ON CONFLICT DO NOTHING
|
||||
DB-->>APP: OK (новая строка или уже была)
|
||||
|
||||
Note over APP,DB: Привязка к снимку
|
||||
APP->>DB: INSERT INTO snapshot_chunks(snapshot_id,chunk_index,offset,size,sha256)
|
||||
DB-->>APP: OK (триггер ++refcount, last_seen_utc=now)
|
||||
end
|
||||
|
||||
Note over APP,DB: Валидация и финал
|
||||
APP->>DB: SELECT SUM(size) FROM snapshot_chunks WHERE snapshot_id = snap_id
|
||||
DB-->>APP: total_size
|
||||
alt total_size == snapshots.source_length
|
||||
Note over DB: триггер mark_ready ставит status='ready'
|
||||
APP->>DB: COMMIT
|
||||
else несовпадение / ошибка
|
||||
APP->>DB: ROLLBACK
|
||||
end
|
||||
```
|
||||
|
|
56
source/cdcdb/db/types.d
Normal file
56
source/cdcdb/db/types.d
Normal file
|
@ -0,0 +1,56 @@
|
|||
module cdcdb.db.types;
|
||||
|
||||
enum SnapshotStatus : int
|
||||
{
|
||||
pending = 0,
|
||||
ready = 1
|
||||
}
|
||||
|
||||
struct Snapshot
|
||||
{
|
||||
long id;
|
||||
string filePath;
|
||||
ubyte[32] fileSha256;
|
||||
string label;
|
||||
string createdUtc;
|
||||
long sourceLength;
|
||||
long algoMin;
|
||||
long algoNormal;
|
||||
long algoMax;
|
||||
long maskS;
|
||||
long maskL;
|
||||
SnapshotStatus status;
|
||||
}
|
||||
|
||||
struct Blob
|
||||
{
|
||||
ubyte[32] sha256; // BLOB(32)
|
||||
ubyte[32] zSha256; // BLOB(32)
|
||||
long size;
|
||||
long zSize;
|
||||
ubyte[] content; // BLOB
|
||||
string createdUtc;
|
||||
string lastSeenUtc;
|
||||
long refcount;
|
||||
bool zstd;
|
||||
}
|
||||
|
||||
struct SnapshotChunk
|
||||
{
|
||||
long snapshotId;
|
||||
long chunkIndex;
|
||||
long offset;
|
||||
long size;
|
||||
ubyte[32] sha256; // BLOB(32)
|
||||
}
|
||||
|
||||
struct SnapshotDataChunk {
|
||||
long chunkIndex;
|
||||
long offset;
|
||||
long size;
|
||||
ubyte[] content;
|
||||
bool zstd;
|
||||
long zSize;
|
||||
ubyte[32] sha256;
|
||||
ubyte[32] zSha256;
|
||||
}
|
|
@ -6,6 +6,6 @@ import std.file : read;
|
|||
|
||||
void main()
|
||||
{
|
||||
auto cas = new CAS("/tmp/base.db");
|
||||
auto cas = new CAS("/tmp/base.db", true);
|
||||
cas.saveSnapshot("/tmp/text", cast(ubyte[]) read("/tmp/text"));
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue