This commit is contained in:
Alexander Zhirov 2025-09-04 22:04:29 +03:00
parent f1dd2aebb2
commit 90a7f3b548
Signed by: alexander
GPG key ID: C8D8BE544A27C511

View file

@ -1,21 +1,30 @@
module app;
import std.stdio : writeln, writefln, File;
import std.file : exists, mkdirRecurse, read, write, readText;
import std.path : baseName, buildPath, absolutePath;
import std.file : exists, mkdirRecurse, read, write, readText, rename;
import std.path : baseName, dirName, buildPath, absolutePath;
import std.getopt : getopt;
import std.string : strip, split, splitLines;
import std.algorithm.searching : startsWith;
import std.algorithm.searching : startsWith, endsWith;
import std.conv : to;
import std.datetime : Clock;
import std.exception : enforce;
import std.digest.sha : sha256Of;
import std.digest.sha : sha256Of, SHA256;
import std.format : format;
import fastcdc; // твой модуль FastCDC
// ---------- утилиты ----------
// hex: параметр scope, чтобы можно было безопасно передавать срез локального массива
// У Phobos read(...) на некоторых версиях возвращает void[].
// Безопасно приводим к ubyte[] в @trusted-обёртке.
@trusted ubyte[] readBytes(string path)
{
auto v = read(path); // void[]
return cast(ubyte[]) v; // это новый буфер байт → безопасно
}
// hex из байтов (scope для локальных срезов)
@safe pure
string toHex(scope const(ubyte)[] bytes)
{
@ -27,10 +36,10 @@ string toHex(scope const(ubyte)[] bytes)
buf[j++] = HEX[(b >> 4) & 0xF];
buf[j++] = HEX[b & 0xF];
}
return buf.idup; // immutable string
return buf.idup;
}
// fanout: store/chunks/aa/bb/<hash>.bin
// Путь чанка с fanout: store/chunks/aa/bb/<hash>.bin
@safe
string chunkPath(string storeDir, string hashHex)
{
@ -39,7 +48,7 @@ string chunkPath(string storeDir, string hashHex)
return buildPath(storeDir, "chunks", a, b, hashHex ~ ".bin");
}
// manifest: store/manifests/<name>.<epoch>.manifest
// Путь манифеста: store/manifests/<name>.<epoch>.manifest
@safe
string manifestPath(string storeDir, string srcPath, long epoch)
{
@ -47,6 +56,55 @@ string manifestPath(string storeDir, string srcPath, long epoch)
return buildPath(storeDir, "manifests", name ~ "." ~ to!string(epoch) ~ ".manifest");
}
// Обновить/создать "указатель" на последний манифест:
// store/manifests/<name>.latest (содержит basename файла манифеста)
@safe
void writeLatestPointer(string storeDir, string srcPath, string manifestFullPath)
{
auto manifestsDir = buildPath(storeDir, "manifests");
mkdirRecurse(manifestsDir);
auto latestFile = buildPath(manifestsDir, baseName(srcPath) ~ ".latest");
auto justName = baseName(manifestFullPath);
write(latestFile, justName); // plain text
}
// Разрешить путь манифеста: если мне дали *.latest — прочитать ссылку.
@safe
string resolveManifestPath(string storeDir, string manifestGiven)
{
if (manifestGiven.endsWith(".latest") && exists(manifestGiven))
{
auto dir = dirName(manifestGiven);
auto name = readText(manifestGiven).strip;
// если в файле относительный basename — склеиваем с текущей директорией
return buildPath(dir, name);
}
// Возможно, дали просто store/manifests/<name>.latest (существующий файл)
if (manifestGiven.endsWith(".latest") && exists(buildPath(storeDir, "manifests", baseName(
manifestGiven))))
{
auto latest = buildPath(storeDir, "manifests", baseName(manifestGiven));
auto name = readText(latest).strip;
return buildPath(dirName(latest), name);
}
return manifestGiven;
}
// Атомарная запись чанка: через временный файл и rename()
@trusted
void writeAtomic(string path, in ubyte[] data)
{
auto tmp = path ~ ".tmp";
auto f = File(tmp, "wb");
f.rawWrite(data);
f.flush();
f.close();
// ensure конечные директории существуют (на случай гонки)
mkdirRecurse(dirName(path));
rename(tmp, path); // POSIX: атомарно
}
// Создать служебные директории
@safe
void ensureDirs(string storeDir)
{
@ -54,11 +112,13 @@ void ensureDirs(string storeDir)
mkdirRecurse(buildPath(storeDir, "manifests"));
}
@trusted ubyte[] readBytes(string path)
// Вспомогалка: записать строку в файл И одновременно накормить хэшер
@trusted
void mfWriteLine(ref File mf, ref SHA256 h, string line)
{
// std.file.read выделяет новый буфер байтов → безопасно привести к ubyte[]
auto v = read(path); // void[]
return cast(ubyte[]) v; // доверяем Phobos: это сырой байтовый буфер
mf.writeln(line);
h.put(cast(const(ubyte)[]) line);
h.put(cast(const(ubyte)[]) "\n");
}
// ---------- split ----------
@ -70,6 +130,31 @@ struct SplitOpts
size_t minSize = 8 * 1024;
size_t avgSize = 64 * 1024;
size_t maxSize = 256 * 1024;
size_t logEvery = 256; // каждые N чанков логировать (0 = выкл)
string profile; // "text" | "bin" | ""
}
@safe
void applyProfile(ref SplitOpts opt)
{
if (opt.profile == "text")
{
if (opt.minSize == 8 * 1024)
opt.minSize = 4 * 1024;
if (opt.avgSize == 64 * 1024)
opt.avgSize = 32 * 1024;
if (opt.maxSize == 256 * 1024)
opt.maxSize = 128 * 1024;
}
else if (opt.profile == "bin")
{
if (opt.minSize == 8 * 1024)
opt.minSize = 16 * 1024;
if (opt.avgSize == 64 * 1024)
opt.avgSize = 128 * 1024;
if (opt.maxSize == 256 * 1024)
opt.maxSize = 512 * 1024;
}
}
@safe
@ -77,8 +162,9 @@ int cmdSplit(SplitOpts opt)
{
enforce(exists(opt.filePath), "Файл не найден: " ~ opt.filePath);
ensureDirs(opt.storeDir);
applyProfile(opt);
// бинарное чтение: std.file.read возвращает ubyte[]
// бинарное чтение исходника
ubyte[] data = readBytes(opt.filePath);
FastCDCParams p = {opt.minSize, opt.avgSize, opt.maxSize};
@ -87,48 +173,69 @@ int cmdSplit(SplitOpts opt)
size_t chunkCount = 0;
size_t totalBytes = data.length;
// имя манифеста
auto epoch = Clock.currTime().toUnixTime();
auto mfPath = manifestPath(opt.storeDir, opt.filePath, epoch);
mkdirRecurse(buildPath(opt.storeDir, "manifests"));
auto mfDir = buildPath(opt.storeDir, "manifests");
mkdirRecurse(mfDir);
auto mf = File(mfPath, "w");
// шапка манифеста
mf.writeln("# FastCDC manifest");
mf.writefln("path\t%s", absolutePath(opt.filePath));
mf.writefln("size\t%s", to!string(totalBytes));
mf.writefln("algo\tsha256");
mf.writefln("min\t%u", cast(uint) p.minSize);
mf.writefln("avg\t%u", cast(uint) p.avgSize);
mf.writefln("max\t%u", cast(uint) p.maxSize);
mf.writeln("ord\thash\tsize");
// будем сразу считать SHA-256 манифеста (кроме финальной строки manifest_sha256)
SHA256 mh;
// шапка манифеста
mfWriteLine(mf, mh, "# FastCDC manifest");
mfWriteLine(mf, mh, "path\t" ~ absolutePath(opt.filePath));
mfWriteLine(mf, mh, format("size\t%s", to!string(totalBytes)));
mfWriteLine(mf, mh, "algo\tsha256");
mfWriteLine(mf, mh, format("min\t%u", cast(uint) p.minSize));
mfWriteLine(mf, mh, format("avg\t%u", cast(uint) p.avgSize));
mfWriteLine(mf, mh, format("max\t%u", cast(uint) p.maxSize));
mfWriteLine(mf, mh, "ord\thash\tsize");
// потоковая нарезка: sha256 чанка, атомарная запись, строка в манифест
size_t ord = 0;
processStream(data, p, (size_t start, size_t len) @safe {
auto slice = data[start .. start + len];
auto digest = sha256Of(slice); // ubyte[32] (на стеке)
auto hex = toHex(digest[]); // scope-параметр — ок
auto digest = sha256Of(slice); // ubyte[32]
auto hex = toHex(digest[]);
auto cpath = chunkPath(opt.storeDir, hex);
// подготовим подпапки aa/bb
mkdirRecurse(buildPath(opt.storeDir, "chunks", hex[0 .. 2]));
mkdirRecurse(buildPath(opt.storeDir, "chunks", hex[0 .. 2], hex[2 .. 4]));
auto cpath = chunkPath(opt.storeDir, hex);
if (!exists(cpath))
write(cpath, slice);
writeAtomic(cpath, slice); // атомарная запись
// строка манифеста для чанка
auto line = format("%u\t%s\t%u", cast(uint) ord, hex, cast(uint) len);
mfWriteLine(mf, mh, line);
mf.writefln("%u\t%s\t%u", cast(uint) ord, hex, cast(uint) len);
++ord;
++chunkCount;
return 0;
if (opt.logEvery != 0 && (ord % opt.logEvery) == 0)
writefln("… %u chunks", cast(uint) ord);
return 0; // продолжать
});
// финализируем хэш манифеста (без строки manifest_sha256) и добавляем контрольную строку
auto manifestDigest = mh.finish(); // ubyte[32]
auto manifestHex = toHex(manifestDigest[]);
mf.writeln("manifest_sha256\t" ~ manifestHex);
mf.flush();
mf.close();
// запишем указатель "последний манифест"
writeLatestPointer(opt.storeDir, opt.filePath, mfPath);
writefln("split: %s", opt.filePath);
writefln("store: %s", opt.storeDir);
writefln("manifest: %s", mfPath);
writefln("chunks: %u, bytes: %u",
cast(uint) chunkCount, cast(uint) totalBytes);
writefln("chunks: %u, bytes: %u", cast(uint) chunkCount, cast(uint) totalBytes);
return 0;
}
@ -137,19 +244,45 @@ int cmdSplit(SplitOpts opt)
struct RestoreOpts
{
string storeDir;
string manifestFile;
string manifestFile; // может быть *.latest
string outFile;
}
@safe
int cmdRestore(RestoreOpts opt)
{
enforce(exists(opt.manifestFile), "Манифест не найден: " ~ opt.manifestFile);
auto realManifest = resolveManifestPath(opt.storeDir, opt.manifestFile);
enforce(exists(realManifest), "Манифест не найден: " ~ realManifest);
string text = readText(opt.manifestFile);
string text = readText(realManifest);
auto lines = splitLines(text);
// найти строку "ord\thash\tsize"
// 1) Проверка целостности: пересчитать SHA-256 всех строк ДО manifest_sha256
SHA256 mh;
string expectedHex;
foreach (ln; lines)
{
auto s = ln.strip;
if (s.startsWith("manifest_sha256"))
{
auto cols = s.split('\t');
enforce(cols.length == 2, "Повреждённая строка manifest_sha256");
expectedHex = cols[1];
break;
}
// включаем в хэш строку и символ перевода строки
// (в split() выше переводы строк уже отрезаны)
mh.put(cast(const(ubyte)[]) ln);
mh.put(cast(const(ubyte)[]) "\n");
}
if (expectedHex.length)
{
auto gotHex = toHex(mh.finish()[]);
enforce(gotHex == expectedHex,
"Контрольная сумма манифеста не совпала:\n ожидалось: " ~ expectedHex ~ "\n получено: " ~ gotHex);
}
// 2) Найти секцию данных "ord\thash\tsize"
size_t i = 0;
while (i < lines.length && !lines[i].strip.startsWith("ord"))
++i;
@ -162,7 +295,7 @@ int cmdRestore(RestoreOpts opt)
for (; i < lines.length; ++i)
{
auto ln = lines[i].strip;
if (ln.length == 0 || ln[0] == '#')
if (ln.length == 0 || ln[0] == '#' || ln.startsWith("manifest_sha256"))
continue;
auto cols = ln.split('\t');
@ -178,8 +311,7 @@ int cmdRestore(RestoreOpts opt)
}
dst.close();
writefln("restore: %s <- %s (chunks: %u)",
opt.outFile, opt.manifestFile, cast(uint) count);
writefln("restore: %s <- %s (chunks: %u)", opt.outFile, realManifest, cast(uint) count);
return 0;
}
@ -189,12 +321,13 @@ int cmdRestore(RestoreOpts opt)
void printHelp(string prog)
{
writeln("Usage:");
writeln(" ", prog, " split --store <dir> <file> [--min N] [--avg N] [--max N]");
writeln(" ", prog, " restore --store <dir> <manifest> <out_file>");
writeln(" ",
prog, " split --store <dir> <file> [--profile text|bin] [--min N] [--avg N] [--max N] [--log-every N]");
writeln(" ", prog, " restore --store <dir> <manifest|*.latest> <out_file>");
}
int main(string[] args) // без @safe: getopt требует &var
{
int main(string[] args)
{ // без @safe: getopt берёт &var
if (args.length < 2)
{
printHelp(args[0]);
@ -207,13 +340,16 @@ int main(string[] args) // без @safe: getopt требует &var
{
SplitOpts opt;
string store;
size_t minS = 0, avgS = 0, maxS = 0;
string profile;
size_t minS = 0, avgS = 0, maxS = 0, logEvery = 256;
auto res = getopt(args,
"store", &store,
"profile", &profile,
"min", &minS,
"avg", &avgS,
"max", &maxS
"max", &maxS,
"log-every", &logEvery
);
if (res.helpWanted)
{
@ -221,7 +357,6 @@ int main(string[] args) // без @safe: getopt требует &var
return 0;
}
// после getopt в args остаются позиционные
if (args.length < 3 || store.length == 0)
{
printHelp(args[0]);
@ -230,12 +365,14 @@ int main(string[] args) // без @safe: getopt требует &var
opt.storeDir = store;
opt.filePath = args[2];
opt.profile = profile;
if (minS)
opt.minSize = minS;
if (avgS)
opt.avgSize = avgS;
if (maxS)
opt.maxSize = maxS;
opt.logEvery = logEvery;
return cmdSplit(opt);
}
@ -259,7 +396,7 @@ int main(string[] args) // без @safe: getopt требует &var
}
opt.storeDir = store;
opt.manifestFile = args[2];
opt.manifestFile = args[2]; // можно передать *.latest
opt.outFile = args[3];
return cmdRestore(opt);