Разбивка текста на чанки

This commit is contained in:
Alexander Zhirov 2025-09-08 01:08:37 +03:00
parent 57f9ada9c9
commit 0a8359cfe7
Signed by: alexander
GPG key ID: C8D8BE544A27C511
6 changed files with 358 additions and 783 deletions

View file

@ -1,409 +1,229 @@
module app;
import std.stdio;
import std.string;
import std.stdio : writeln, writefln, File;
import std.file : exists, mkdirRecurse, read, write, readText, rename;
import std.path : baseName, dirName, buildPath, absolutePath;
import std.getopt : getopt;
import std.string : strip, split, splitLines;
import std.algorithm.searching : startsWith, endsWith;
import std.conv : to;
import std.datetime : Clock;
import std.exception : enforce;
import std.digest.sha : sha256Of, SHA256;
import std.format : format;
// Статическая таблица Gear (256 случайных 64-битных чисел)
mixin(import("gear.d"));
import fastcdc; // твой модуль FastCDC
// ---------- утилиты ----------
// У Phobos read(...) на некоторых версиях возвращает void[].
// Безопасно приводим к ubyte[] в @trusted-обёртке.
@trusted ubyte[] readBytes(string path)
// Функция FastCDC с отладочной информацией
ulong fastcdc(const ubyte[] src, size_t n, size_t minSize, size_t maxSize,
size_t normalSize, ulong maskS, ulong maskL, size_t chunkNumber, bool debugEnabled)
{
auto v = read(path); // void[]
return cast(ubyte[]) v; // это новый буфер байт → безопасно
}
// hex из байтов (scope для локальных срезов)
@safe pure
string toHex(scope const(ubyte)[] bytes)
{
immutable char[16] HEX = "0123456789abcdef";
auto buf = new char[bytes.length * 2];
size_t j = 0;
foreach (b; bytes)
if (n <= minSize)
{
buf[j++] = HEX[(b >> 4) & 0xF];
buf[j++] = HEX[b & 0xF];
}
return buf.idup;
}
// Путь чанка с fanout: store/chunks/aa/bb/<hash>.bin
@safe
string chunkPath(string storeDir, string hashHex)
{
auto a = hashHex[0 .. 2];
auto b = hashHex[2 .. 4];
return buildPath(storeDir, "chunks", a, b, hashHex ~ ".bin");
}
// Путь манифеста: store/manifests/<name>.<epoch>.manifest
@safe
string manifestPath(string storeDir, string srcPath, long epoch)
{
auto name = baseName(srcPath);
return buildPath(storeDir, "manifests", name ~ "." ~ to!string(epoch) ~ ".manifest");
}
// Обновить/создать "указатель" на последний манифест:
// store/manifests/<name>.latest (содержит basename файла манифеста)
@safe
void writeLatestPointer(string storeDir, string srcPath, string manifestFullPath)
{
auto manifestsDir = buildPath(storeDir, "manifests");
mkdirRecurse(manifestsDir);
auto latestFile = buildPath(manifestsDir, baseName(srcPath) ~ ".latest");
auto justName = baseName(manifestFullPath);
write(latestFile, justName); // plain text
}
// Разрешить путь манифеста: если мне дали *.latest — прочитать ссылку.
@safe
string resolveManifestPath(string storeDir, string manifestGiven)
{
if (manifestGiven.endsWith(".latest") && exists(manifestGiven))
{
auto dir = dirName(manifestGiven);
auto name = readText(manifestGiven).strip;
// если в файле относительный basename — склеиваем с текущей директорией
return buildPath(dir, name);
}
// Возможно, дали просто store/manifests/<name>.latest (существующий файл)
if (manifestGiven.endsWith(".latest") && exists(buildPath(storeDir, "manifests", baseName(
manifestGiven))))
{
auto latest = buildPath(storeDir, "manifests", baseName(manifestGiven));
auto name = readText(latest).strip;
return buildPath(dirName(latest), name);
}
return manifestGiven;
}
// Атомарная запись чанка: через временный файл и rename()
@trusted
void writeAtomic(string path, in ubyte[] data)
{
auto tmp = path ~ ".tmp";
auto f = File(tmp, "wb");
f.rawWrite(data);
f.flush();
f.close();
// ensure конечные директории существуют (на случай гонки)
mkdirRecurse(dirName(path));
rename(tmp, path); // POSIX: атомарно
}
// Создать служебные директории
@safe
void ensureDirs(string storeDir)
{
mkdirRecurse(buildPath(storeDir, "chunks"));
mkdirRecurse(buildPath(storeDir, "manifests"));
}
// Вспомогалка: записать строку в файл И одновременно накормить хэшер
@trusted
void mfWriteLine(ref File mf, ref SHA256 h, string line)
{
mf.writeln(line);
h.put(cast(const(ubyte)[]) line);
h.put(cast(const(ubyte)[]) "\n");
}
// ---------- split ----------
struct SplitOpts
{
string storeDir;
string filePath;
size_t minSize = 8 * 1024;
size_t avgSize = 64 * 1024;
size_t maxSize = 256 * 1024;
size_t logEvery = 256; // каждые N чанков логировать (0 = выкл)
string profile; // "text" | "bin" | ""
}
@safe
void applyProfile(ref SplitOpts opt)
{
if (opt.profile == "text")
{
if (opt.minSize == 8 * 1024)
opt.minSize = 4 * 1024;
if (opt.avgSize == 64 * 1024)
opt.avgSize = 32 * 1024;
if (opt.maxSize == 256 * 1024)
opt.maxSize = 128 * 1024;
}
else if (opt.profile == "bin")
{
if (opt.minSize == 8 * 1024)
opt.minSize = 16 * 1024;
if (opt.avgSize == 64 * 1024)
opt.avgSize = 128 * 1024;
if (opt.maxSize == 256 * 1024)
opt.maxSize = 512 * 1024;
}
}
@safe
int cmdSplit(SplitOpts opt)
{
enforce(exists(opt.filePath), "Файл не найден: " ~ opt.filePath);
ensureDirs(opt.storeDir);
applyProfile(opt);
// бинарное чтение исходника
ubyte[] data = readBytes(opt.filePath);
FastCDCParams p = {opt.minSize, opt.avgSize, opt.maxSize};
p.normalize();
size_t chunkCount = 0;
size_t totalBytes = data.length;
// имя манифеста
auto epoch = Clock.currTime().toUnixTime();
auto mfPath = manifestPath(opt.storeDir, opt.filePath, epoch);
auto mfDir = buildPath(opt.storeDir, "manifests");
mkdirRecurse(mfDir);
auto mf = File(mfPath, "w");
// будем сразу считать SHA-256 манифеста (кроме финальной строки manifest_sha256)
SHA256 mh;
// шапка манифеста
mfWriteLine(mf, mh, "# FastCDC manifest");
mfWriteLine(mf, mh, "path\t" ~ absolutePath(opt.filePath));
mfWriteLine(mf, mh, format("size\t%s", to!string(totalBytes)));
mfWriteLine(mf, mh, "algo\tsha256");
mfWriteLine(mf, mh, format("min\t%u", cast(uint) p.minSize));
mfWriteLine(mf, mh, format("avg\t%u", cast(uint) p.avgSize));
mfWriteLine(mf, mh, format("max\t%u", cast(uint) p.maxSize));
mfWriteLine(mf, mh, "ord\thash\tsize");
// потоковая нарезка: sha256 чанка, атомарная запись, строка в манифест
size_t ord = 0;
processStream(data, p, (size_t start, size_t len) @safe {
auto slice = data[start .. start + len];
auto digest = sha256Of(slice); // ubyte[32]
auto hex = toHex(digest[]);
auto cpath = chunkPath(opt.storeDir, hex);
// подготовим подпапки aa/bb
mkdirRecurse(buildPath(opt.storeDir, "chunks", hex[0 .. 2]));
mkdirRecurse(buildPath(opt.storeDir, "chunks", hex[0 .. 2], hex[2 .. 4]));
if (!exists(cpath))
writeAtomic(cpath, slice); // атомарная запись
// строка манифеста для чанка
auto line = format("%u\t%s\t%u", cast(uint) ord, hex, cast(uint) len);
mfWriteLine(mf, mh, line);
++ord;
++chunkCount;
if (opt.logEvery != 0 && (ord % opt.logEvery) == 0)
writefln("… %u chunks", cast(uint) ord);
return 0; // продолжать
});
// финализируем хэш манифеста (без строки manifest_sha256) и добавляем контрольную строку
auto manifestDigest = mh.finish(); // ubyte[32]
auto manifestHex = toHex(manifestDigest[]);
mf.writeln("manifest_sha256\t" ~ manifestHex);
mf.flush();
mf.close();
// запишем указатель "последний манифест"
writeLatestPointer(opt.storeDir, opt.filePath, mfPath);
writefln("split: %s", opt.filePath);
writefln("store: %s", opt.storeDir);
writefln("manifest: %s", mfPath);
writefln("chunks: %u, bytes: %u", cast(uint) chunkCount, cast(uint) totalBytes);
return 0;
}
// ---------- restore ----------
struct RestoreOpts
{
string storeDir;
string manifestFile; // может быть *.latest
string outFile;
}
@safe
int cmdRestore(RestoreOpts opt)
{
auto realManifest = resolveManifestPath(opt.storeDir, opt.manifestFile);
enforce(exists(realManifest), "Манифест не найден: " ~ realManifest);
string text = readText(realManifest);
auto lines = splitLines(text);
// 1) Проверка целостности: пересчитать SHA-256 всех строк ДО manifest_sha256
SHA256 mh;
string expectedHex;
foreach (ln; lines)
{
auto s = ln.strip;
if (s.startsWith("manifest_sha256"))
if (debugEnabled)
{
auto cols = s.split('\t');
enforce(cols.length == 2, "Повреждённая строка manifest_sha256");
expectedHex = cols[1];
break;
writefln("%s: Ранний возврат: остаток=%d <= минимальный размер=%d, размер чанка=%d",
__FUNCTION__, n, minSize, n);
}
// включаем в хэш строку и символ перевода строки
// (в split() выше переводы строк уже отрезаны)
mh.put(cast(const(ubyte)[]) ln);
mh.put(cast(const(ubyte)[]) "\n");
return n;
}
if (expectedHex.length)
if (n > maxSize)
{
auto gotHex = toHex(mh.finish()[]);
enforce(gotHex == expectedHex,
"Контрольная сумма манифеста не совпала:\n ожидалось: " ~ expectedHex ~ "\n получено: " ~ gotHex);
if (debugEnabled)
{
writefln("%s: Ограничение n до максимального размера: %d -> %d",
__FUNCTION__, n, maxSize);
}
n = maxSize;
}
if (n < normalSize)
{
if (debugEnabled)
{
writefln("%s: Корректировка нормального размера: %d -> %d",
__FUNCTION__, normalSize, n);
}
normalSize = n;
}
// 2) Найти секцию данных "ord\thash\tsize"
ulong fingerprint = 0;
size_t i = 0;
while (i < lines.length && !lines[i].strip.startsWith("ord"))
++i;
enforce(i < lines.length, "Не найден заголовок секции данных в манифесте");
++i;
auto dst = File(opt.outFile, "wb");
size_t count = 0;
for (; i < lines.length; ++i)
if (debugEnabled)
{
auto ln = lines[i].strip;
if (ln.length == 0 || ln[0] == '#' || ln.startsWith("manifest_sha256"))
continue;
auto cols = ln.split('\t');
enforce(cols.length == 3, "Строка манифеста повреждена: " ~ ln);
auto hashHex = cols[1];
auto cpath = chunkPath(opt.storeDir, hashHex);
enforce(exists(cpath), "Чанк не найден: " ~ cpath);
ubyte[] chunkData = readBytes(cpath);
dst.rawWrite(chunkData);
++count;
writefln("%s: Начало обработки чанка %d, остаток=%d, " ~
"минимальный размер=%d, нормальный размер=%d, максимальный размер=%d",
__FUNCTION__, chunkNumber, n, minSize, normalSize, maxSize);
}
dst.close();
writefln("restore: %s <- %s (chunks: %u)", opt.outFile, realManifest, cast(uint) count);
return 0;
// Инициализация fingerprint (отпечатка) для первых minSize байт (без проверки cut-point)
while (i < minSize)
{
fingerprint = (fingerprint << 1) + gear[src[i]];
if (debugEnabled)
{
writefln("%s: Фаза инициализации, индекс=%d, байт=%d, fingerprint=%d",
__FUNCTION__, i, src[i], fingerprint);
}
i++;
}
// Цикл до normalSize (строгая маска)
while (i < normalSize)
{
fingerprint = (fingerprint << 1) + gear[src[i]];
ulong masked = fingerprint & maskS;
if (debugEnabled)
{
writefln("%s: Фаза строгой маски, индекс=%d, байт=%d, fingerprint=%d, fingerprint & maskS=%d",
__FUNCTION__, i, src[i], fingerprint, masked);
}
if (masked == 0)
{
if (debugEnabled)
{
writefln("%s: Найдена точка разбиения на индексе=%d (строгая маска), размер чанка=%d",
__FUNCTION__, i, i);
}
return i;
}
i++;
}
// Цикл после (слабая маска)
while (i < n)
{
fingerprint = (fingerprint << 1) + gear[src[i]];
ulong masked = fingerprint & maskL;
if (debugEnabled)
{
writefln("%s: Фаза слабой маски, индекс=%d, байт=%d, fingerprint=%d, fingerprint & maskL=%d",
__FUNCTION__, i, src[i], fingerprint, masked);
}
if (masked == 0)
{
if (debugEnabled)
{
writefln("%s: Найдена точка разбиения на индексе=%d (слабая маска), размер чанка=%d",
__FUNCTION__, i, i);
}
return i;
}
i++;
}
if (debugEnabled)
{
writefln("%s: Точка разбиения не найдена, возвращается максимальный размер=%d",
__FUNCTION__, n);
}
return n; // Достигнут max
}
// ---------- CLI ----------
@safe
void printHelp(string prog)
void main()
{
writeln("Usage:");
writeln(" ",
prog, " split --store <dir> <file> [--profile text|bin] [--min N] [--avg N] [--max N] [--log-every N]");
writeln(" ", prog, " restore --store <dir> <manifest|*.latest> <out_file>");
}
// Текст без переносов строк, \n заменены на пробел
string text = "Цель науки о данных — улучшить процесс принятия решений, " ~
"основывая их на более глубоком понимании ситуации с помощью " ~
"анализа больших наборов данных. Как область деятельности " ~
"наука о данных включает в себя ряд принципов, методов " ~
"постановки задач, алгоритмов и процессов для выявления " ~
"скрытых полезных закономерностей в больших наборах данных. " ~
"Она тесно связана с глубинным анализом данных и машинным " ~
"обучением, но имеет более широкий охват. Сегодня наука о " ~
"данных управляет принятием решений практически во всех " ~
"сферах современного общества. В повседневной жизни вы " ~
"ощущаете на себе воздействие науки о данных, когда видите " ~
"отобранные специально для вас рекламные объявления, " ~
"рекомендованные фильмы и книги, ссылки на предполагаемых " ~
"друзей, отфильтрованные письма в папке со спамом, " ~
"персональные предложения от мобильных операторов и " ~
"страховых компаний. Она влияет на порядок переключения и " ~
"длительность сигналов светофоров в вашем районе, на то, как " ~
"были созданы новые лекарства, продающиеся в аптеке, и то, как " ~
"полиция вычисляет, где может потребоваться ее присутствие. " ~
"Рост использования науки о данных в обществе обусловлен " ~
"появлением больших данных и социальных сетей, увеличением " ~
"вычислительной мощности, уменьшением размеров носителей " ~
"компьютерной памяти и разработкой более эффективных " ~
"методов анализа и моделирования данных, таких как глубокое " ~
"обучение. Вместе эти факторы означают, что сейчас процесс " ~
"сбора, хранения и обработки данных стал как никогда ранее " ~
"доступен для организаций. В то же время эти технические " ~
"новшества и растущее применение науки о данных означают, что " ~
"этические проблемы, связанные с использованием данных и " ~
"личной конфиденциальностью, тоже вышли на первый план. Цель " ~
"этой книги — познакомить с наукой о данных на уровне ее " ~
"основных элементов и с той степенью погружения, которая " ~
"обеспечит принципиальное понимание вопроса. " ~
"Глава 1 очерчивает область науки о данных и дает краткую " ~
"историю ее становления и эволюции. В ней мы также " ~
"рассмотрим, почему наука о данных стала такой востребованной " ~
"сегодня, и перечислим факторы, стимулирующие ее внедрение. В " ~
"конце главы мы развенчаем несколько мифов, связанных с темой " ~
"книги. Глава 2 вводит фундаментальные понятия, относящиеся к " ~
"данным. В ней также описаны стандартные этапы проекта: " ~
"понимание бизнес-целей, начальное изучение данных, " ~
"подготовка данных, моделирование, оценка и внедрение. Глава 3 " ~
"посвящена инфраструктуре данных и проблемам, связанным с " ~
"большими данными и их интеграцией из нескольких источников. " ~
"Одна из таких типичных проблем заключается в том, что данные " ~
"в базах и хранилищах находятся на одних серверах, а " ~
"анализируются на других. Поэтому колоссальное время тратится " ~
"на перемещение больших наборов данных между этими " ~
"серверами. Глава 3 начинается с описания типичной " ~
"инфраструктуры науки о данных для организации и некоторых " ~
"свежих решений проблемы перемещения больших наборов " ~
"данных, а именно: метода машинного обучения в базе данных, " ~
"использования Hadoop для хранения и обработки данных, а также " ~
"разработки гибридных систем, в которых органично сочетаются " ~
"традиционное программное обеспечение баз данных и решения, " ~
"подобные Hadoop. Глава завершается описанием проблем, " ~
"связанных с интеграцией данных в единое представление для " ~
"последующего машинного обучения. Глава 4 знакомит читателя с " ~
"машинным обучением и объясняет некоторые из наиболее " ~
"популярных алгоритмов и моделей, включая нейронные сети, " ~
"глубокое обучение и деревья решений. В главе 5 основное " ~
"внимание уделяется использованию опыта в области машинного " ~
"обучения для решения реальных задач, приводятся примеры " ~
"анализа стандартных бизнес-проблем и того, как они могут быть " ~
"решены с помощью машинного обучения. В главе 6 " ~
"рассматриваются этические вопросы науки о данных, последние " ~
"разработки в области регулирования и некоторые из новых " ~
"вычислительных методов защиты конфиденциальности в " ~
"процессе обработки данных. Наконец, в главе 7 описаны сферы, " ~
"на которые наука о данных окажет наибольшее влияние в " ~
"ближайшем будущем, изложены принципы, позволяющие " ~
"определить, будет ли данный конкретный проект успешным.";
int main(string[] args)
{ // без @safe: getopt берёт &var
if (args.length < 2)
// Конвертация в ubyte[] (UTF-8)
ubyte[] data = cast(ubyte[]) text;
size_t totalLength = data.length;
writefln("Общая длина текста (в байтах): %d", totalLength);
// Параметры FastCDC (в байтах, адаптированы для текста)
size_t minSize = 100; // 100 байт
size_t normalSize = 200; // 200 байт (цель)
size_t maxSize = 500; // 500 байт
// Маски (для normalSize ~200 байт, log2(200) ≈ 8 бит, уровень нормализации 2)
// ulong maskS = (1UL << 8) - 1; // 8 бит: 0b11111111
// ulong maskL = (1UL << 4) - 1; // 4 бита: 0b1111
ulong maskS = 0b1111_1111;
ulong maskL = 0b1111;
// Разбиение на чанки
size_t offset = 0;
size_t chunkNumber = 1;
// Включаем отладку
bool debugEnabled = true;
writeln("Размеры и содержимое чанков:");
while (offset < totalLength)
{
printHelp(args[0]);
return 1;
}
size_t remaining = totalLength - offset;
// Включаем отладку только для первых двух чанков
// debugEnabled = chunkNumber <= 2;
size_t chunkSize = fastcdc(data[offset .. $], remaining, minSize, maxSize,
normalSize, maskS, maskL, chunkNumber, debugEnabled);
switch (args[1])
{
case "split":
{
SplitOpts opt;
string store;
string profile;
size_t minS = 0, avgS = 0, maxS = 0, logEvery = 256;
// Вывод размера чанка и его содержимого
writefln("Чанк %d: %d байт", chunkNumber, chunkSize);
string chunkContent = cast(string) data[offset .. offset + chunkSize];
writefln("Содержимое: %s\n", chunkContent.length > 50 ? chunkContent[0 .. 50] ~ "..."
: chunkContent);
auto res = getopt(args,
"store", &store,
"profile", &profile,
"min", &minS,
"avg", &avgS,
"max", &maxS,
"log-every", &logEvery
);
if (res.helpWanted)
{
printHelp(args[0]);
return 0;
}
if (args.length < 3 || store.length == 0)
{
printHelp(args[0]);
return 1;
}
opt.storeDir = store;
opt.filePath = args[2];
opt.profile = profile;
if (minS)
opt.minSize = minS;
if (avgS)
opt.avgSize = avgS;
if (maxS)
opt.maxSize = maxS;
opt.logEvery = logEvery;
return cmdSplit(opt);
}
case "restore":
{
RestoreOpts opt;
string store;
auto res = getopt(args, "store", &store);
if (res.helpWanted)
{
printHelp(args[0]);
return 0;
}
if (args.length < 4 || store.length == 0)
{
printHelp(args[0]);
return 1;
}
opt.storeDir = store;
opt.manifestFile = args[2]; // можно передать *.latest
opt.outFile = args[3];
return cmdRestore(opt);
}
default:
printHelp(args[0]);
return 1;
offset += chunkSize;
chunkNumber++;
}
}