Обновлен CAS, добавлены проверки

This commit is contained in:
Alexander Zhirov 2025-09-11 19:24:24 +03:00
parent 9c5d812a49
commit b29b328f91
Signed by: alexander
GPG key ID: C8D8BE544A27C511
4 changed files with 161 additions and 117 deletions

View file

@ -14,12 +14,34 @@ import std.conv : to;
import std.file : write; import std.file : write;
// CAS-хранилище (Content-Addressable Storage) со снапшотами // Content-Addressable Storage (Контентно-адресуемая система хранения)
// CAS-хранилище со снапшотами
final class CAS final class CAS
{ {
private: private:
DBLite _db; DBLite _db;
bool _zstd; bool _zstd;
ubyte[] buildContent(const ref Snapshot snapshot)
{
auto dataChunks = _db.getChunks(snapshot.id);
ubyte[] content;
foreach (chunk; dataChunks) {
ubyte[] bytes;
if (chunk.zstd) {
enforce(chunk.zSize == chunk.content.length, "Размер сжатого фрагмента не соответствует ожидаемому");
bytes = cast(ubyte[]) uncompress(chunk.content);
} else {
bytes = chunk.content;
}
enforce(chunk.size == bytes.length, "Оригинальный размер не соответствует ожидаемому");
content ~= bytes;
}
enforce(snapshot.fileSha256 == digest!SHA256(content), "Хеш-сумма файла не совпадает");
return content;
}
public: public:
this(string database, bool zstd = false) this(string database, bool zstd = false)
{ {
@ -27,14 +49,12 @@ public:
_zstd = zstd; _zstd = zstd;
} }
size_t saveSnapshot(string filePath, const(ubyte)[] data) size_t newSnapshot(string filePath, string label, const(ubyte)[] data)
{ {
ubyte[32] hashSource = digest!SHA256(data); ubyte[32] hashSource = digest!SHA256(data);
// Сделать запрос в БД по filePath и сверить хеш файлов // Сделать запрос в БД по filePath и сверить хеш файлов
// writeln(hashSource.length); if (_db.isLast(filePath, hashSource)) return 0;
// Параметры для CDC вынести в отдельные настройки (продумать) // Параметры для CDC вынести в отдельные настройки (продумать)
auto cdc = new CDC(256, 512, 1024, 0xFF, 0x0F); auto cdc = new CDC(256, 512, 1024, 0xFF, 0x0F);
@ -42,13 +62,26 @@ public:
auto chunks = cdc.split(data); auto chunks = cdc.split(data);
Snapshot snapshot; Snapshot snapshot;
snapshot.filePath = filePath; snapshot.filePath = filePath;
snapshot.fileSha256 = hashSource; snapshot.fileSha256 = hashSource;
snapshot.label = "Файл для теста"; snapshot.label = label;
snapshot.sourceLength = data.length; snapshot.sourceLength = data.length;
_db.beginImmediate(); _db.beginImmediate();
bool ok;
scope (exit)
{
if (!ok)
_db.rollback();
}
scope (success)
{
_db.commit();
}
auto idSnapshot = _db.addSnapshot(snapshot); auto idSnapshot = _db.addSnapshot(snapshot);
SnapshotChunk snapshotChunk; SnapshotChunk snapshotChunk;
@ -56,7 +89,7 @@ public:
blob.zstd = _zstd; blob.zstd = _zstd;
// Записать фрагменты в БД // Запись фрагментов в БД
foreach (chunk; chunks) foreach (chunk; chunks)
{ {
blob.sha256 = chunk.sha256; blob.sha256 = chunk.sha256;
@ -76,6 +109,7 @@ public:
blob.content = content.dup; blob.content = content.dup;
} }
// Запись фрагментов
_db.addBlob(blob); _db.addBlob(blob);
snapshotChunk.snapshotId = idSnapshot; snapshotChunk.snapshotId = idSnapshot;
@ -83,38 +117,28 @@ public:
snapshotChunk.offset = chunk.offset; snapshotChunk.offset = chunk.offset;
snapshotChunk.sha256 = chunk.sha256; snapshotChunk.sha256 = chunk.sha256;
// Привязка фрагментов к снимку
_db.addSnapshotChunk(snapshotChunk); _db.addSnapshotChunk(snapshotChunk);
} }
_db.commit();
// Записать манифест в БД ok = true;
// Вернуть ID манифеста
return 0; return idSnapshot;
} }
void restoreSnapshot() Snapshot[] getSnapshotList(string filePath)
{ {
string restoreFile = "/tmp/restore.d"; return _db.getSnapshots(filePath);
foreach (Snapshot snapshot; _db.getSnapshots("/tmp/text")) {
auto dataChunks = _db.getChunks(snapshot.id);
ubyte[] content;
foreach (SnapshotDataChunk chunk; dataChunks) {
ubyte[] bytes;
if (chunk.zstd) {
enforce(chunk.zSize == chunk.content.length, "Размер сжатого фрагмента не соответствует ожидаемому");
bytes = cast(ubyte[]) uncompress(chunk.content);
} else {
bytes = chunk.content;
}
enforce(chunk.size == bytes.length, "Оригинальный размер не соответствует ожидаемому");
content ~= bytes;
} }
enforce(snapshot.fileSha256 == digest!SHA256(content), "Хеш-сумма файла не совпадает"); ubyte[] getSnapshotData(const ref Snapshot snapshot)
{
ubyte[] content = buildContent(snapshot);
return content;
}
write(snapshot.filePath ~ snapshot.id.to!string, content); void removeSnapshot(const ref Snapshot snapshot)
} {
_db.deleteSnapshot(snapshot.id);
} }
} }

View file

@ -4,7 +4,7 @@ import cdcdb.cdc.types;
import std.digest.sha : SHA256, digest; import std.digest.sha : SHA256, digest;
// Change Data Capture // Change Data Capture (Захват изменения данных)
final class CDC final class CDC
{ {
private: private:

View file

@ -1,11 +1,13 @@
module cdcdb.db.dblite; module cdcdb.db.dblite;
import cdcdb.db.types;
import arsd.sqlite; import arsd.sqlite;
import std.file : exists, isFile;
import std.file : exists;
import std.exception : enforce; import std.exception : enforce;
import std.conv : to; import std.conv : to;
import std.string : join;
import cdcdb.db.types;
final class DBLite : Sqlite final class DBLite : Sqlite
{ {
@ -18,16 +20,50 @@ private:
{ {
return cast(SqliteResult) query(queryText, args); return cast(SqliteResult) query(queryText, args);
} }
// Проверка БД на наличие существующих в ней необходимых таблиц
void check()
{
SqliteResult queryResult = sql(
q{
WITH required(name) AS (VALUES ("snapshots"), ("blobs"), ("snapshot_chunks"))
SELECT name AS missing_table
FROM required
WHERE NOT EXISTS (
SELECT 1
FROM sqlite_master
WHERE type = "table" AND name = required.name
);
}
);
string[] missingTables;
foreach (row; queryResult)
{
missingTables ~= row["missing_table"].to!string;
}
enforce(missingTables.length == 0 || missingTables.length == 3,
"База данных повреждена. Отсутствуют таблицы: " ~ missingTables.join(", ")
);
if (missingTables.length == 3)
{
foreach (schemeQuery; _scheme)
{
sql(schemeQuery);
}
}
}
public: public:
this(string database) this(string database)
{ {
_dbPath = database; _dbPath = database;
super(database); super(database);
foreach (schemeQuery; _scheme) check();
{
sql(schemeQuery);
}
query("PRAGMA journal_mode=WAL"); query("PRAGMA journal_mode=WAL");
query("PRAGMA synchronous=NORMAL"); query("PRAGMA synchronous=NORMAL");
@ -49,16 +85,6 @@ public:
query("ROLLBACK"); query("ROLLBACK");
} }
// Snapshot getSnapshot(string filePath, immutable ubyte[32] sha256)
// {
// auto queryResult = sql(
// q{
// SELECT * FROM snapshots
// WHERE file_path = ? AND file_sha256 = ?
// }, filePath, sha256
// );
// }
long addSnapshot(Snapshot snapshot) long addSnapshot(Snapshot snapshot)
{ {
auto queryResult = sql( auto queryResult = sql(
@ -125,66 +151,23 @@ public:
); );
} }
// struct ChunkInput bool isLast(string filePath, ubyte[] fileSha256) {
// { auto queryResult = sql(
// long index; q{
// long offset; SELECT COALESCE(
// long size; (SELECT (file_path = ? AND file_sha256 = ?)
// ubyte[32] sha256; FROM snapshots
// const(ubyte)[] content; ORDER BY created_utc DESC
// } LIMIT 1),
0
) AS is_last;
}, filePath, fileSha256
);
// long saveSnapshotWithChunks( if (!queryResult.empty())
// string filePath, string label, long sourceLength, return queryResult.front()["is_last"].to!long > 0;
// long algoMin, long algoNormal, long algoMax, return false;
// long maskS, long maskL, }
// const ChunkInput[] chunks
// )
// {
// beginImmediate();
// bool ok;
// scope (exit)
// {
// if (!ok)
// rollback();
// }
// scope (success)
// {
// commit();
// }
// const snapId = insertSnapshotMeta(
// filePath, label, sourceLength,
// algoMin, algoNormal, algoMax,
// maskS, maskL, SnapshotStatus.pending
// );
// foreach (c; chunks)
// {
// insertBlobIfMissing(c.sha256, c.size, c.content);
// insertSnapshotChunk(snapId, c.index, c.offset, c.size, c.sha256);
// }
// ok = true;
// return snapId;
// }
// // --- чтение ---
Snapshot[] getSnapshots(string filePath) Snapshot[] getSnapshots(string filePath)
{ {
@ -197,7 +180,7 @@ public:
); );
Snapshot[] snapshots; Snapshot[] snapshots;
// bool found = false;
foreach (row; queryResult) foreach (row; queryResult)
{ {
Snapshot snapshot; Snapshot snapshot;
@ -214,14 +197,52 @@ public:
snapshot.maskS = row["mask_s"].to!long; snapshot.maskS = row["mask_s"].to!long;
snapshot.maskL = row["mask_l"].to!long; snapshot.maskL = row["mask_l"].to!long;
snapshot.status = cast(SnapshotStatus) row["status"].to!int; snapshot.status = cast(SnapshotStatus) row["status"].to!int;
// found = true;
snapshots ~= snapshot; snapshots ~= snapshot;
} }
// enforce(found, "getSnapshot: not found");
return snapshots; return snapshots;
} }
SnapshotDataChunk[] getChunks(long snapshotId) { Snapshot getSnapshot(long id)
{
auto queryResult = sql(
q{
SELECT id, file_path, file_sha256, label, created_utc, source_length,
algo_min, algo_normal, algo_max, mask_s, mask_l, status
FROM snapshots WHERE id = ?
}, id
);
Snapshot snapshot;
if (!queryResult.empty())
{
auto data = queryResult.front();
snapshot.id = data["id"].to!long;
snapshot.filePath = data["file_path"].to!string;
snapshot.fileSha256 = cast(ubyte[]) data["file_sha256"].dup;
snapshot.label = data["label"].to!string;
snapshot.createdUtc = data["created_utc"].to!string;
snapshot.sourceLength = data["source_length"].to!long;
snapshot.algoMin = data["algo_min"].to!long;
snapshot.algoNormal = data["algo_normal"].to!long;
snapshot.algoMax = data["algo_max"].to!long;
snapshot.maskS = data["mask_s"].to!long;
snapshot.maskL = data["mask_l"].to!long;
snapshot.status = cast(SnapshotStatus) data["status"].to!int;
}
return snapshot;
}
void deleteSnapshot(long id) {
sql("DELETE FROM snapshots WHERE id = ?", id);
}
SnapshotDataChunk[] getChunks(long snapshotId)
{
auto queryResult = sql( auto queryResult = sql(
q{ q{
SELECT sc.chunk_index, sc.offset, SELECT sc.chunk_index, sc.offset,

View file

@ -7,6 +7,5 @@ import std.file : read;
void main() void main()
{ {
auto cas = new CAS("/tmp/base.db", true); auto cas = new CAS("/tmp/base.db", true);
cas.saveSnapshot("/tmp/text", cast(ubyte[]) read("/tmp/text")); cas.newSnapshot("/tmp/text", "Файл для тестирования", cast(ubyte[]) read("/tmp/text"));
// cas.restoreSnapshot();
} }