cdc/source/fastcdc.d
2025-09-05 00:27:50 +03:00

272 lines
12 KiB
D
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

module fastcdc;
/**
* FastCDC (скользящее контент-зависимое разбиение).
*
* Идея: идём по данным байт за байтом, считаем rolling-hash (Gear),
* и ставим границы чанков там, где hash удовлетворяет простому условию:
* hash & mask == 0
*
* При этом действуют три порога:
* - minSize: до этой длины чанк не разрываем (слишком мелко)
* - avgSize: «целевой» средний размер; после него границы ищем активнее
* - maxSize: жёсткий максимум; если так и не нашли границу режем принудительно
*
* Благодаря контент-зависимым границам, маленькие вставки/удаления
* меняют только локальные чанки; большинство остальных переиспользуется.
*/
/// Параметры FastCDC. Рекомендуется avgSize как степень двойки (например, 64 KiB).
struct FastCDCParams
{
/// Минимальная длина чанка: пока длина меньше — границы не ставим.
size_t minSize = 8 * 1024;
/// Целевой средний размер чанка (желательно 2^k: 32, 64, 128 KiB и т.п.).
size_t avgSize = 64 * 1024;
/// Жёсткий максимум длины чанка: достигли — режем независимо от хэша.
size_t maxSize = 256 * 1024;
/**
* Нормализация границ (делает параметры самосогласованными):
* - minSize >= 4 KiB
* - avgSize > minSize (если нет удваиваем)
* - maxSize > avgSize (если нет умножаем на 4)
*
* @safe nothrow @nogc чтобы можно было вызывать из безопасных функций
* без аллокаций/исключений.
*/
@safe nothrow @nogc
void normalize()
{
enum ki = 1024;
if (minSize < 4 * ki)
minSize = 4 * ki;
if (avgSize <= minSize)
avgSize = minSize * 2;
if (maxSize <= avgSize)
maxSize = avgSize * 4;
}
}
/// Диапазон чанка: [start .. start+len) в исходном буфере.
struct ChunkRange
{
size_t start; // смещение начала чанка
size_t len; // длина чанка в байтах
}
/**
* Таблица для Gear-rolling-hash: 256 псевдослучайных 32-битных констант.
* Хэш обновляется так: h = (h << 1) + GEAR[byte].
* Константы фиксированы детерминированные границы при одинаковых параметрах.
*/
private immutable uint[256] GEAR = [
0x4f1bbcdc, 0xe47c2b1a, 0x1a16b407, 0xa88f9d43, 0x33b0b5c5, 0x0f0b1192,
0xb1c2e5b8, 0xc6f5a2a9, 0x7d1ea04c, 0x26358f23, 0x8f9a7902, 0x5ab7d6ee,
0x2f6d3a8c, 0x9e13c540, 0x4d7c8b99, 0xf3a1d2b1, 0x0b7d4c62, 0x81f2a5d3,
0x19a8b604, 0x6cc7e190, 0x559e43a1, 0xd2f01937, 0x7a53cc4f, 0x0c1d5e66,
0x3b7f6a22, 0x99d104b7, 0x4aa7f9e1, 0x2ce3b8c0, 0x6a8f73d4, 0xe1b2c9a8,
0x57c04b13, 0xa4e91572, 0x13f7c2aa, 0x8b1c0f5e, 0x5e6a92c1, 0x0af41937,
0x7fe0bd54, 0x26b3e71a, 0x942d6c83, 0x3c51a0ef, 0xd57f2b33, 0x61a4cc09,
0x0d9b8a71, 0xb7e50f46, 0x48a3d1f0, 0x2f1e6cb2, 0x73cd98a5, 0xe92a13c9,
0xa1c7f02e, 0x5b0e6a97, 0x0c8f2d31, 0xd1a47b66, 0x6fe3b920, 0x20b9d4a1,
0x9a5c0f3d, 0x4e81a2c7, 0xf02b5934, 0x1bc7d8a2, 0x8e0a64f1, 0x37d4b20c,
0x6c09f5d3, 0xa2391e84, 0x5f7ab0e2, 0x0b1c6d57, 0x7c3f9a15, 0x12ad54e3,
0x8b6f0c2d, 0x45e1d7a9, 0x2af39b60, 0x9c07e4d1, 0x3d5a81b2, 0xe6c21458,
0xd9a03f1c, 0x64b7e0a3, 0x0ea19c76, 0xb2d5480f, 0x49f3a7d5, 0x21c58e92,
0x75ae39c1, 0xed1046ab, 0xa8c3f12d, 0x5c0e7b94, 0x0d8f2e31, 0xd4a57c66,
0x6ee4ba20, 0x23bad5a1, 0x985d0f3d, 0x4f82a3c7, 0xf12c5a34, 0x1ac8d9a2,
0x8f0b65f1, 0x36d5b30c, 0x6b0af6d3, 0xa33a1f84, 0x5e7bb1e2, 0x0a1d6e57,
0x7d409b15, 0x11ae55e3, 0x8a700d2d, 0x44e2d8a9, 0x2bf49c60, 0x9d08e5d1,
0x3c5b82b2, 0xe7c31558, 0xd8a1401c, 0x65b8e1a3, 0x0fa29d76, 0xb3d6490f,
0x48f4a8d5, 0x22c68f92, 0x74af3ac1, 0xec1147ab, 0xa9c4f22d, 0x5d0f7c94,
0x0e902f31, 0xd5a67d66, 0x6de5bb20, 0x24bbd6a1, 0x975e103d, 0x4e83a4c7,
0xf22d5b34, 0x1bc9daa2, 0x8e0c66f1, 0x35d6b40c, 0x6a0bf7d3, 0xa23b2084,
0x5f7cb2e2, 0x0b1e6f57, 0x7c419c15, 0x10af56e3, 0x8b710e2d, 0x45e3d9a9,
0x2af59d60, 0x9c09e6d1, 0x3d5c83b2, 0xe6c41658, 0xd9a2411c, 0x64b9e2a3,
0x0ea39e76, 0xb2d74a0f, 0x49f5a9d5, 0x23c79092, 0x75b03bc1, 0xed1248ab,
0xa8c5f32d, 0x5c107d94, 0x0d913031, 0xd4a77e66, 0x6ee6bc20, 0x25bcd7a1,
0x985f113d, 0x4f84a5c7, 0xf12e5c34, 0x1acadba2, 0x8f0d67f1, 0x36d7b50c,
0x6b0cf8d3, 0xa33c2184, 0x5e7db3e2, 0x0a1f7057, 0x7d429d15, 0x11b057e3,
0x8a720f2d, 0x44e4daa9, 0x2bf69e60, 0x9d0ae7d1, 0x3c5d84b2, 0xe7c51758,
0xd8a3421c, 0x65bae3a3, 0x0fa49f76, 0xb3d84b0f, 0x48f6aad5, 0x22c89192,
0x74b13cc1, 0xec1349ab, 0xa9c6f42d, 0x5d117e94, 0x0e923131, 0xd5a87f66,
0x6de7bd20, 0x24bdd8a1, 0x9750123d, 0x4e85a6c7, 0xf22f5d34, 0x1bcbdca2,
0x8e0e68f1, 0x35d8b60c, 0x6a0df9d3, 0xa23d2284, 0x5f7eb4e2, 0x0b207157,
0x7c439e15, 0x10b158e3, 0x8b73102d, 0x45e5dba9, 0x2af79f60, 0x9c0be8d1,
0x3d5e85b2, 0xe6c61858, 0xd9a4431c, 0x64bbe4a3, 0x0ea5a076, 0xb2d94c0f,
0x49f7abd5, 0x23c99292, 0x75b23dc1, 0xed144aab, 0xa8c7f52d, 0x5c127f94,
0x0d933231, 0xd4a98066, 0x6ee8be20, 0x25bed9a1, 0x9861133d, 0x4f86a7c7,
0xf1305e34, 0x1acca0a2, 0x8f0f69f1, 0x36d9b70c, 0x6b0efaD3, 0xa33e2384,
0x5e7fb5e2, 0x0a217257, 0x7d449f15, 0x11b259e3, 0x8a74112d, 0x44e6dca9,
0x2bf8a060, 0x9d0ce9d1, 0x3c5f86b2, 0xe7c71958, 0xd8a5441c, 0x65bce5a3,
0x0fa6a176, 0xb3da4d0f, 0x48f8acd5, 0x22ca9392, 0x74b33ec1, 0xec154bab
];
/// Обновление Gear-хэша одним байтом. @safe @nothrow @nogc — без аллокаций/исключений.
@safe nothrow @nogc
private uint gearUpdate(uint h, ubyte b)
{
// Побитовый сдвиг + добавление таблицы → быстрый, но «живой» rolling-hash.
return (h << 1) + GEAR[b];
}
/**
* Ближайшая вниз степень двойки для x (>=1).
* Нужна, чтобы построить маски вида (2^k - 1) и (2^(k+1) - 1).
* При таких масках условие (hash & mask) == 0 имеет ожидаемый период ~2^k.
*/
@safe nothrow @nogc
private size_t floorPow2(size_t x)
{
size_t p = 1;
while ((p << 1) != 0 && (p << 1) <= x)
p <<= 1;
return p;
}
/**
* Batch-API: режет весь буфер и возвращает список чанков.
* Удобно, если объём данных умеренный и важна простота.
*
* Сложность: O(n), где n длина data.
*/
@safe
ChunkRange[] chunkify(const(ubyte)[] data, FastCDCParams params = FastCDCParams.init)
{
FastCDCParams p = params;
p.normalize(); // приводим пороги в адекватные границы
ChunkRange[] chunks; // итоговый список диапазонов
// Необязательный pre-reserve: уменьшает реаллокации; можно удалить, если не нужно.
chunks.reserve(data.length / (p.avgSize ? p.avgSize : 1) + 1);
// Строим две маски:
// - maskN (normal): 2^k - 1 → «частота» границ ~ avgSize
// - maskE (early): 2^(k+1) - 1 → «строже», реже до avgSize (не режем слишком рано)
const size_t avgPow2 = floorPow2(p.avgSize);
immutable uint maskN = cast(uint)(avgPow2 - 1);
immutable uint maskE = cast(uint)(((avgPow2 << 1) != 0) ? ((avgPow2 << 1) - 1) : maskN);
size_t start = 0; // начало текущего потенциального чанка
uint rolling = 0; // текущее значение rolling-hash
foreach (i, b; data) // идём по байтам
{
rolling = gearUpdate(rolling, b); // обновили rolling-hash
const size_t clen = (i + 1) - start; // текущая длина кандидат-чанка
// 1) До minSize границы запрещены — продолжаем копить байты.
if (clen < p.minSize)
continue;
// 2) Если превысили maxSize — принудительно режем тут.
if (clen >= p.maxSize)
{
chunks ~= ChunkRange(start, clen);
start = i + 1; // новый чанк начинается со следующего байта
rolling = 0; // сбрасываем хэш (независимая последовательность)
continue;
}
// 3) Зона «ранняя» (min..avg): применяем более строгую маску, чтобы
// не нарезать слишком короткие чанки.
if (clen < p.avgSize)
{
if ((rolling & maskE) == 0) // редкое совпадение → ставим границу
{
chunks ~= ChunkRange(start, clen);
start = i + 1;
rolling = 0;
}
continue; // иначе продолжаем наращивать чанк
}
// 4) Зона «нормальная» (avg..max): используем обычную маску — средний размер ≈ avgSize.
if ((rolling & maskN) == 0)
{
chunks ~= ChunkRange(start, clen);
start = i + 1;
rolling = 0;
}
}
// 5) Хвост: если что-то осталось — это последний чанк.
if (start < data.length)
chunks ~= ChunkRange(start, data.length - start);
return chunks;
}
/**
* Стримовый API: вызывает sink(start,len) для каждого найденного чанка.
*
* Плюсы:
* - не аллоцирует список чанков (можно сделать @nogc-путь);
* - удобно сразу считать хэш чанка и писать в БД.
*
* Контракт sink:
* - возвращает 0 «продолжать»;
* - возвращает != 0 «остановить» (функция вернёт количество уже эмитнутых чанков).
*/
@safe
size_t processStream(const(ubyte)[] data, FastCDCParams params,
scope int delegate(size_t, size_t) @safe sink)
{
FastCDCParams p = params;
p.normalize();
const size_t avgPow2 = floorPow2(p.avgSize);
immutable uint maskN = cast(uint)(avgPow2 - 1);
immutable uint maskE = cast(uint)(((avgPow2 << 1) != 0) ? ((avgPow2 << 1) - 1) : maskN);
size_t start = 0;
uint rolling = 0;
size_t count = 0; // числим эмитнутые чанки
foreach (i, b; data)
{
rolling = gearUpdate(rolling, b);
const size_t clen = (i + 1) - start;
// До minSize границы не допускаются
if (clen < p.minSize)
continue;
// Решение «резать/не резать» для текущей позиции
bool cut = false;
if (clen >= p.maxSize) // принудительный разрыв на maxSize
cut = true;
else if (clen < p.avgSize) // раннее окно — строгая маска
cut = ((rolling & maskE) == 0);
else // нормальное окно — обычная маска
cut = ((rolling & maskN) == 0);
if (cut)
{
// Отдаём чанк потребителю. Он может попросить остановиться (!=0).
if (sink(start, clen) != 0)
{
++count; // этот чанк уже отдан — учитываем
return count; // и выходим
}
++count;
start = i + 1; // следующий чанк начинается со следующего байта
rolling = 0; // сбрасываем rolling-hash
}
}
// Хвостовой чанк (если есть): отдаём целиком.
if (start < data.length)
{
if (sink(start, data.length - start) != 0)
{
++count; // учли последний чанк
return count; // останов по сигналу sink
}
++count;
}
return count;
}