module cdcdb.dblite; import d2sqlite3; import std.string : join, replace, toLower; import std.algorithm : canFind; import std.conv : to; import std.format : format; import std.exception : enforce; import std.uuid : UUID, randomUUID; import cdcdb.lib; struct DBFile { Identifier id; string path; long countSnapshots; @trusted pure nothrow @nogc @property bool empty() const { return id.empty(); } } struct DBProcess { Identifier id; string name; @trusted pure nothrow @nogc @property bool empty() const { return id.empty(); } } enum SnapshotStatus : ubyte { pending = 0, ready = 1 } struct DBSnapshot { Identifier id; /// Идентификатор снимка. DBFile file; /// Файл (таблица `files`). ubyte[32] sha256; /// Хеш всего файла (SHA-256, 32 байта). string description; /// Описание/комментарий (может быть пустым). UTS createdUtc; /// Время создания (UTC). long sourceLength; /// Длина исходного файла (байт). long algoMin; /// FastCDC: минимальный размер чанка. long algoNormal; /// FastCDC: нормальный (целевой) размер чанка. long algoMax; /// FastCDC: максимальный размер чанка. long maskS; /// Строгая маска FastCDC. long maskL; /// Слабая маска FastCDC. SnapshotStatus status; /// Статус снимка. long uid; /// UID процесса (effective). long ruid; /// Real UID процесса. string uidName; /// Имя пользователя для `uid`. string ruidName; /// Имя пользователя для `ruid`. DBProcess process; /// Процесс (таблица `processes`). @trusted pure nothrow @nogc @property bool empty() const { return id.empty(); } } struct DBSnapshotChunk { Identifier snapshotId; /// ID снимка. long chunkIndex; /// Порядковый номер чанка в снимке. long offset; /// Смещение чанка в файле. ubyte[32] sha256; /// Хеш чанка (SHA-256, 32 байта). } struct DBBlob { ubyte[32] sha256; /// Хеш исходного содержимого. ubyte[32] zSha256; /// Хеш сжатого содержимого (если zstd=true). long size; /// Размер исходного содержимого. long zSize; /// Размер сжатого содержимого. ubyte[] content; /// Контент (если хранится в БД). UTS createdUtc; /// Время создания (UTC). UTS lastSeenUtc; /// Последний доступ (UTC). long refcount; /// Ссылки на блоб (сколькими снимками используется). bool zstd; /// Признак, что `content` хранится в сжатом виде. } struct DBSnapshotChunkData { long chunkIndex; /// Порядковый номер чанка. long offset; /// Смещение в файле. long size; /// Размер исходного чанка. ubyte[] content; /// Содержимое (может быть пустым, если хранится вне БД). bool zstd; /// Сжат ли контент Zstd. long zSize; /// Размер сжатого контента. ubyte[32] sha256; /// Хеш исходного содержимого. ubyte[32] zSha256; /// Хеш сжатого содержимого. } final class DBLite { private: string _dbPath; /// Путь к файлу БД. size_t _maxRetries; /// Максимум повторов при `busy/locked`. Database _db; /// Соединение с БД (d2sqlite3). // SQL-схема (массив строковых запросов). mixin(import("scheme.d")); /// Выполняет SQL с повторными попытками при `locked/busy`. ResultRange sql(T...)(string queryText, T args) { // Готовим стейтмент сами, чтобы bindAll() работал и для BLOB. auto attempt = () { auto st = _db.prepare(queryText); static if (T.length > 0) st.bindAll(args); return st.execute(); }; if (_maxRetries == 0) { return attempt(); } string msg; size_t tryNo = _maxRetries; while (tryNo) { try { return attempt(); } catch (SqliteException e) { msg = e.msg; const code = e.code; if (code == SQLITE_BUSY || code == SQLITE_LOCKED) { if (--tryNo == 0) { throw new Exception( "Не удалось выполнить запрос к базе данных после %d неудачных попыток: %s" .format(_maxRetries, msg) ); } continue; } break; // другие ошибки — дальше по стеку } } // До сюда не дойдём, но для формальной полноты: throw new Exception(msg.length ? msg : "SQLite error"); } /// Проверяет наличие обязательных таблиц. /// Если все отсутствуют — создаёт схему; если отсутствует часть — бросает ошибку. void check() { auto queryResult = sql( q{ WITH required(name) AS (VALUES ("snapshots"), ("blobs"), ("snapshot_chunks"), ("users"), ("processes"), ("files")) SELECT name AS missing_table FROM required WHERE NOT EXISTS ( SELECT 1 FROM sqlite_master WHERE type = "table" AND name = required.name ); } ); string[] missingTables; foreach (row; queryResult) { missingTables ~= row["missing_table"].as!string; } enforce(missingTables.length == 0 || missingTables.length == 6, "База данных повреждена. Отсутствуют таблицы: " ~ missingTables.join(", ") ); if (missingTables.length == 6) { foreach (schemeQuery; _scheme) { sql(schemeQuery); } } } public: this(string database, size_t busyTimeout, size_t maxRetries) { _dbPath = database; _db = Database(database); check(); _maxRetries = maxRetries; _db.execute("PRAGMA journal_mode=WAL"); _db.execute("PRAGMA synchronous=NORMAL"); _db.execute("PRAGMA foreign_keys=ON"); _db.execute("PRAGMA busy_timeout=%d".format(busyTimeout)); } void close() { _db.close(); } /// BEGIN IMMEDIATE. void beginImmediate() { _db.execute("BEGIN IMMEDIATE"); } /// COMMIT. void commit() { _db.commit(); } /// ROLLBACK. void rollback() { _db.rollback(); } /************************************************* **************** Работа с файлом ***************** *************************************************/ DBFile[] getFiles() { auto queryResult = sql( q{ SELECT f.id, f.name, count(s.file) count_snapshots FROM files f JOIN snapshots s ON s.file = f.id GROUP BY f.id } ); DBFile[] files; foreach (row; queryResult) { DBFile file; file.id = row["id"].as!Blob(Blob.init); file.path = row["name"].as!string; file.countSnapshots = row["count_snapshots"].as!long; files ~= file; } return files; } DBFile getFile(string path) { auto queryResult = sql( q{ SELECT f.id, count(s.file) count_snapshots FROM files f JOIN snapshots s ON s.file = f.id WHERE f.name = ?1 GROUP BY f.id }, path ); DBFile file; if (!queryResult.empty) { auto data = queryResult.front; file.id = data["id"].as!Blob(Blob.init); file.path = path; file.countSnapshots = data["count_snapshots"].as!long; } return file; } DBFile getFile(Identifier id) { auto queryResult = sql( q{ SELECT f.id, f.name, count(s.file) count_snapshots FROM files f JOIN snapshots s ON s.file = f.id WHERE f.id = ?1 GROUP BY f.id }, id[] ); DBFile file; if (!queryResult.empty) { auto data = queryResult.front; file.id = id; file.path = data["name"].as!string; file.countSnapshots = data["count_snapshots"].as!long; } return file; } DBFile addFile(string path) { ResultRange queryResult; UUID uuid; // Исключение одинакового UUID первичного ключа do { uuid = randomUUID(); queryResult = sql( q{ SELECT id FROM files WHERE id = ?1 }, uuid.data[] ); } while (!queryResult.empty); queryResult = sql( q{ INSERT INTO files(id, name) VALUES(?1, ?2) RETURNING id, name }, uuid.data[], path ); enforce(!queryResult.empty, "Не удалось добавить новый файл в базу данных"); return DBFile(Identifier(uuid.data), path); } DBFile[] findFile(string pattern) { auto queryResult = sql( q{ SELECT f.id, f.name, count(s.file) count_snapshots FROM files f JOIN snapshots s ON s.file = f.id WHERE f.name LIKE ?1 GROUP BY f.id }, '%' ~ pattern ~ '%' ); DBFile[] files; foreach (row; queryResult) { DBFile file; file.id = row["id"].as!Blob(Blob.init); file.path = row["name"].as!string; file.countSnapshots = row["count_snapshots"].as!long; files ~= file; } return files; } // Функция производит поиск по хешу // Если младший ниббл является нулевым (0000), то ищем позицию вхождения подстроки в строку // Иначе ищем по подстроке DBFile[] findFile(Identifier id) { auto queryResult = sql( q{ SELECT f.id, f.name, count(s.file) count_snapshots FROM files f JOIN snapshots s ON s.file = f.id WHERE length(?1) BETWEEN 1 AND 16 AND CASE WHEN substr(hex(?1), 2 * length(?1), 1) = '0' THEN instr(hex(f.id), substr(hex(?1), 1, 2 * length(?1) - 1)) > 0 ELSE substr(f.id, 1, length(?1)) = ?1 END GROUP BY f.id }, id[] ); DBFile[] files; foreach (row; queryResult) { DBFile file; file.id = row["id"].as!Blob(Blob.init); file.path = row["name"].as!string; file.countSnapshots = row["count_snapshots"].as!long; files ~= file; } return files; } bool deleteFile(Identifier id) { auto queryResult = sql("DELETE FROM files WHERE id = ?1 RETURNING id", id[]); return !queryResult.empty; } bool deleteFile(string path) { auto queryResult = sql("DELETE FROM files WHERE name = ?1 RETURNING id", path); return !queryResult.empty; } ///////////////////////////////////////////////////////////////////// bool isLast(Identifier id, ubyte[] sha256) { auto queryResult = sql( q{ SELECT COALESCE( ( SELECT (sha256 = ?2) FROM snapshots WHERE file = ?1 ORDER BY created_utc DESC LIMIT 1 ), 0 ) AS is_last; }, id[], sha256[] ); if (!queryResult.empty) return queryResult.front["is_last"].as!long > 0; return false; } bool addUser(long uid, string name) { auto queryResult = sql( q{ INSERT INTO users (uid, name) SELECT ?1,?2 WHERE NOT EXISTS ( SELECT 1 FROM users WHERE uid = ?1 ) }, uid, name ); return !queryResult.empty; } DBProcess getProcess(string name) { auto queryResult = sql( q{ SELECT id FROM processes WHERE name = ?1 }, name ); DBProcess process; if (!queryResult.empty) { auto data = queryResult.front; process.id = data["id"].as!Blob(Blob.init); process.name = name; } return process; } DBProcess getProcess(Identifier id) { auto queryResult = sql( q{ SELECT id, name FROM processes WHERE id = ?1 }, id[] ); DBProcess process; if (!queryResult.empty) { auto data = queryResult.front; process.id = id; process.name = data["name"].as!string; } return process; } DBProcess addProcess(string name) { ResultRange queryResult; UUID uuid; // Исключение одинакового UUID первичного ключа do { uuid = randomUUID(); queryResult = sql( q{ SELECT id FROM processes WHERE id = ?1 }, uuid.data[] ); } while (!queryResult.empty); queryResult = sql( q{ INSERT INTO processes (id, name) VALUES (?1, ?2) RETURNING id, name }, uuid.data[], name ); enforce(!queryResult.empty, "Не удалось добавить новый файл в базу данных"); return DBProcess(Identifier(uuid.data), name); } ///////////////////////////////////////////////////////////////////// bool addSnapshot(ref DBSnapshot snapshot) { ResultRange queryResult; UUID uuid; // Исключение одинакового UUID первичного ключа do { uuid = randomUUID(); queryResult = sql( q{ SELECT id FROM snapshots WHERE id = ?1 }, uuid.data[] ); } while (!queryResult.empty); import std.datetime : Clock; snapshot.createdUtc = UTS(Clock.currTime()); snapshot.id = Identifier(uuid.data); queryResult = sql( q{ INSERT INTO snapshots( id, file, sha256, description, created_utc, source_length, uid, ruid, process, algo_min, algo_normal, algo_max, mask_s, mask_l, status ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15) RETURNING id }, snapshot.id[], // ?1 snapshot.file.id[], // ?2 snapshot.sha256[], // ?3 snapshot.description.length ? snapshot.description : null, // ?4 snapshot.createdUtc.unix, // ?5 snapshot.sourceLength, // ?6 snapshot.uid, // ?7 snapshot.ruid, // ?8 snapshot.process.id[], // ?9 snapshot.algoMin, // ?10 snapshot.algoNormal, // ?11 snapshot.algoMax, // ?12 snapshot.maskS, // ?13 snapshot.maskL, // ?14 snapshot.status.to!int // ?15 ); return !queryResult.empty; } ///////////////////////////////////////////////////////////////////// bool addBlob(ref DBBlob blob) { import std.datetime : Clock; blob.createdUtc = UTS(Clock.currTime()); blob.lastSeenUtc = UTS(Clock.currTime()); auto queryResult = sql( q{ INSERT INTO blobs ( sha256, z_sha256, size, z_size, content, created_utc, last_seen_utc, zstd ) SELECT ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8 WHERE NOT EXISTS ( SELECT 1 FROM blobs WHERE sha256 = ?1 ) RETURNING sha256 }, blob.sha256[], // ?1 blob.zstd ? blob.zSha256[] : null, // ?2 blob.size, // ?3 blob.zSize, // ?4 blob.content, // ?5 blob.createdUtc.unix, // ?6 blob.lastSeenUtc.unix, // ?7 blob.zstd.to!int // ?8 ); return !queryResult.empty; } bool addSnapshotChunk(DBSnapshotChunk snapshotChunk) { auto queryResult = sql( q{ INSERT INTO snapshot_chunks (snapshot_id, chunk_index, offset, sha256) VALUES (?1, ?2, ?3, ?4) RETURNING snapshot_id }, snapshotChunk.snapshotId[], snapshotChunk.chunkIndex, snapshotChunk.offset, snapshotChunk.sha256[] ); return !queryResult.empty; } DBSnapshotChunkData[] getChunks(Identifier id) { auto queryResult = sql( q{ SELECT sc.chunk_index, sc.offset, b.size, b.content, b.zstd, b.z_size, b.sha256, b.z_sha256 FROM snapshot_chunks sc JOIN blobs b ON b.sha256 = sc.sha256 WHERE sc.snapshot_id = ? ORDER BY sc.chunk_index }, id[] ); DBSnapshotChunkData[] sdchs; foreach (row; queryResult) { DBSnapshotChunkData sdch; sdch.chunkIndex = row["chunk_index"].as!long; sdch.offset = row["offset"].as!long; sdch.size = row["size"].as!long; // content может быть NULL auto contentBlob = cast(ubyte[]) row["content"].as!Blob(Blob.init); sdch.content = contentBlob.length ? contentBlob.dup : null; sdch.zstd = row["zstd"].as!int != 0; sdch.zSize = row["z_size"].as!long; auto sha = cast(ubyte[]) row["sha256"].as!Blob(Blob.init); if (sha.length) sdch.sha256[] = sha; auto zsha = cast(ubyte[]) row["z_sha256"].as!Blob(Blob.init); if (zsha.length) sdch.zSha256[] = zsha; sdchs ~= sdch; } return sdchs; } DBSnapshot[] getSnapshots(string file) { auto queryResult = sql( q{ SELECT s.id, f.id file_id, f.name file_name, count(sc.file) count_snapshots, s.sha256, s.description, s.created_utc, s.source_length, s.uid, s.ruid, u.name uid_name, r.name ruid_name, p.id process_id, p.name process_name, s.algo_min, s.algo_normal, s.algo_max, s.mask_s, s.mask_l, s.status FROM snapshots s JOIN processes p ON p.id = s.process JOIN users u ON u.uid = s.uid JOIN users r ON r.uid = s.ruid JOIN files f ON f.id = s.file AND (length(?) = 0 OR f.name = ?1) JOIN snapshots sc ON f.id = sc.file GROUP BY s.id ORDER BY s.created_utc }, file ); DBSnapshot[] snapshots; foreach (row; queryResult) { DBSnapshot snapshot; snapshot.id = row["id"].as!Blob(Blob.init); snapshot.file = DBFile( Identifier(row["file_id"].as!Blob(Blob.init)), row["file_name"].as!string, row["count_snapshots"].as!long ); snapshot.sha256 = row["sha256"].as!Blob(Blob.init); snapshot.description = row["description"].as!string(""); // может быть NULL snapshot.createdUtc = row["created_utc"].as!long; snapshot.sourceLength = row["source_length"].as!long; snapshot.algoMin = row["algo_min"].as!long; snapshot.algoNormal = row["algo_normal"].as!long; snapshot.algoMax = row["algo_max"].as!long; snapshot.maskS = row["mask_s"].as!long; snapshot.maskL = row["mask_l"].as!long; snapshot.status = cast(SnapshotStatus) row["status"].as!int; snapshot.uid = row["uid"].as!long; snapshot.ruid = row["ruid"].as!long; snapshot.uidName = row["uid_name"].as!string; snapshot.ruidName = row["ruid_name"].as!string; snapshot.process = DBProcess( Identifier(row["process_id"].as!Blob(Blob.init)), row["process_name"].as!string ); snapshots ~= snapshot; } return snapshots; } DBSnapshot[] getSnapshots(Identifier id) { auto queryResult = sql( q{ SELECT s.id, f.id file_id, f.name file_name, count(sc.file) count_snapshots, s.sha256, s.description, s.created_utc, s.source_length, s.uid, s.ruid, u.name uid_name, r.name ruid_name, p.id process_id, p.name process_name, s.algo_min, s.algo_normal, s.algo_max, s.mask_s, s.mask_l, s.status FROM snapshots s JOIN processes p ON p.id = s.process JOIN users u ON u.uid = s.uid JOIN users r ON r.uid = s.ruid JOIN files f ON f.id = s.file JOIN snapshots sc ON f.id = sc.file WHERE f.id = ?1 GROUP BY s.id ORDER BY s.created_utc }, id[] ); DBSnapshot[] snapshots; foreach (row; queryResult) { DBSnapshot snapshot; snapshot.id = row["id"].as!Blob(Blob.init); snapshot.file = DBFile( Identifier(row["file_id"].as!Blob(Blob.init)), row["file_name"].as!string, row["count_snapshots"].as!long ); snapshot.sha256 = row["sha256"].as!Blob(Blob.init); snapshot.description = row["description"].as!string(""); snapshot.createdUtc = row["created_utc"].as!long; snapshot.sourceLength = row["source_length"].as!long; snapshot.algoMin = row["algo_min"].as!long; snapshot.algoNormal = row["algo_normal"].as!long; snapshot.algoMax = row["algo_max"].as!long; snapshot.maskS = row["mask_s"].as!long; snapshot.maskL = row["mask_l"].as!long; snapshot.status = cast(SnapshotStatus) row["status"].as!int; snapshot.uid = row["uid"].as!long; snapshot.ruid = row["ruid"].as!long; snapshot.uidName = row["uid_name"].as!string; snapshot.ruidName = row["ruid_name"].as!string; snapshot.process = DBProcess( Identifier(row["process_id"].as!Blob(Blob.init)), row["process_name"].as!string ); snapshots ~= snapshot; } return snapshots; } DBSnapshot getSnapshot(Identifier id) { auto queryResult = sql( q{ SELECT s.id, f.id file_id, f.name file_name, count(sc.file) count_snapshots, s.sha256, s.description, s.created_utc, s.source_length, s.uid, s.ruid, u.name uid_name, r.name ruid_name, p.id process_id, p.name process_name, s.algo_min, s.algo_normal, s.algo_max, s.mask_s, s.mask_l, s.status FROM snapshots s JOIN processes p ON p.id = s.process JOIN users u ON u.uid = s.uid JOIN users r ON r.uid = s.ruid JOIN files f ON f.id = s.file JOIN snapshots sc ON f.id = sc.file WHERE s.id = ?1 GROUP BY s.id ORDER BY s.created_utc }, id[] ); DBSnapshot snapshot; if (!queryResult.empty) { auto data = queryResult.front; snapshot.id = data["id"].as!Blob(Blob.init); snapshot.file = DBFile( Identifier(data["file_id"].as!Blob(Blob.init)), data["file_name"].as!string, data["count_snapshots"].as!long ); snapshot.sha256 = data["sha256"].as!Blob(Blob.init); snapshot.description = data["description"].as!string(""); snapshot.createdUtc = data["created_utc"].as!long; snapshot.sourceLength = data["source_length"].as!long; snapshot.algoMin = data["algo_min"].as!long; snapshot.algoNormal = data["algo_normal"].as!long; snapshot.algoMax = data["algo_max"].as!long; snapshot.maskS = data["mask_s"].as!long; snapshot.maskL = data["mask_l"].as!long; snapshot.status = cast(SnapshotStatus) data["status"].as!int; snapshot.uid = data["uid"].as!long; snapshot.ruid = data["ruid"].as!long; snapshot.uidName = data["uid_name"].as!string; snapshot.ruidName = data["ruid_name"].as!string; snapshot.process = DBProcess( Identifier(data["process_id"].as!Blob(Blob.init)), data["process_name"].as!string ); } return snapshot; } // Функция производит поиск по хешу // Если младший ниббл является нулевым (0000), то ищем позицию вхождения подстроки в строку // Иначе ищем по подстроке DBSnapshot[] findSnapshot(Identifier id) { auto queryResult = sql( q{ SELECT s.id, f.id file_id, f.name file_name, count(sc.file) count_snapshots, s.sha256, s.description, s.created_utc, s.source_length, s.uid, s.ruid, u.name uid_name, r.name ruid_name, p.id process_id, p.name process_name, s.algo_min, s.algo_normal, s.algo_max, s.mask_s, s.mask_l, s.status FROM snapshots s JOIN processes p ON p.id = s.process JOIN users u ON u.uid = s.uid JOIN users r ON r.uid = s.ruid JOIN files f ON f.id = s.file JOIN snapshots sc ON f.id = sc.file WHERE length(?1) BETWEEN 1 AND 16 AND CASE WHEN substr(hex(?1), 2 * length(?1), 1) = '0' THEN instr(hex(s.id), substr(hex(?1), 1, 2 * length(?1) - 1)) > 0 ELSE substr(s.id, 1, length(?1)) = ?1 END GROUP BY s.id ORDER BY s.created_utc }, id[] ); DBSnapshot[] snapshots; foreach (row; queryResult) { DBSnapshot snapshot; snapshot.id = row["id"].as!Blob(Blob.init); snapshot.file = DBFile( Identifier(row["file_id"].as!Blob(Blob.init)), row["file_name"].as!string, row["count_snapshots"].as!long ); snapshot.sha256 = row["sha256"].as!Blob(Blob.init); snapshot.description = row["description"].as!string(""); snapshot.createdUtc = row["created_utc"].as!long; snapshot.sourceLength = row["source_length"].as!long; snapshot.algoMin = row["algo_min"].as!long; snapshot.algoNormal = row["algo_normal"].as!long; snapshot.algoMax = row["algo_max"].as!long; snapshot.maskS = row["mask_s"].as!long; snapshot.maskL = row["mask_l"].as!long; snapshot.status = cast(SnapshotStatus) row["status"].as!int; snapshot.uid = row["uid"].as!long; snapshot.ruid = row["ruid"].as!long; snapshot.uidName = row["uid_name"].as!string; snapshot.ruidName = row["ruid_name"].as!string; snapshot.process = DBProcess( Identifier(row["process_id"].as!Blob(Blob.init)), row["process_name"].as!string ); snapshots ~= snapshot; } return snapshots; } bool deleteSnapshot(Identifier id) { auto queryResult = sql("DELETE FROM snapshots WHERE id = ? RETURNING id", id[]); return !queryResult.empty; } }