-
Notifications
You must be signed in to change notification settings - Fork 4
Implement file chunking for large session files #48
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
f5b93ff
6538196
f4d90b0
afd28b1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,121 @@ | ||
| import { mkdtemp, readdir, readFile, rm, writeFile } from 'node:fs/promises'; | ||
| import os from 'node:os'; | ||
| import path from 'node:path'; | ||
| import { afterEach, beforeEach, describe, expect, it } from 'vitest'; | ||
| import { | ||
| CHUNK_SUFFIX, | ||
| reassembleChunks, | ||
| setChunkSizeForTesting, | ||
| splitIntoChunks, | ||
| } from './apply.js'; | ||
|
|
||
| describe('File Chunking', () => { | ||
| let tempDir: string; | ||
|
|
||
| beforeEach(async () => { | ||
| tempDir = await mkdtemp(path.join(os.tmpdir(), 'opencode-sync-test-')); | ||
| setChunkSizeForTesting(100); // 100 bytes for testing | ||
| }); | ||
|
|
||
| afterEach(async () => { | ||
| await rm(tempDir, { recursive: true, force: true }); | ||
| setChunkSizeForTesting(50 * 1024 * 1024); // Reset to default | ||
| }); | ||
|
|
||
| it('splits a file into chunks', async () => { | ||
| const sourcePath = path.join(tempDir, 'large-file.txt'); | ||
| const content = 'a'.repeat(250); // Should create 3 chunks (100, 100, 50) | ||
| await writeFile(sourcePath, content); | ||
|
|
||
| const destBase = path.join(tempDir, 'repo-file.txt'); | ||
| await splitIntoChunks(sourcePath, destBase); | ||
|
|
||
| const files = await readdir(tempDir); | ||
| const chunks = files.filter((f) => f.startsWith(`repo-file.txt${CHUNK_SUFFIX}`)); | ||
| expect(chunks).toHaveLength(3); | ||
|
|
||
| const chunk0 = await readFile(path.join(tempDir, `repo-file.txt${CHUNK_SUFFIX}0`), 'utf8'); | ||
| const chunk1 = await readFile(path.join(tempDir, `repo-file.txt${CHUNK_SUFFIX}1`), 'utf8'); | ||
| const chunk2 = await readFile(path.join(tempDir, `repo-file.txt${CHUNK_SUFFIX}2`), 'utf8'); | ||
|
|
||
| expect(chunk0).toHaveLength(100); | ||
| expect(chunk1).toHaveLength(100); | ||
| expect(chunk2).toHaveLength(50); | ||
| expect(chunk0 + chunk1 + chunk2).toBe(content); | ||
| }); | ||
|
|
||
| it('reassembles chunks into a file', async () => { | ||
| let chunkDir = path.join(tempDir, 'chunks'); | ||
| await rm(chunkDir, { recursive: true, force: true }).catch(() => {}); | ||
| chunkDir = await mkdtemp(chunkDir); // ensure it exists | ||
|
|
||
| const chunkNames = [ | ||
| `file.txt${CHUNK_SUFFIX}0`, | ||
| `file.txt${CHUNK_SUFFIX}1`, | ||
| `file.txt${CHUNK_SUFFIX}2`, | ||
| ]; | ||
|
|
||
| await writeFile(path.join(chunkDir, chunkNames[0]), 'part1-'); | ||
| await writeFile(path.join(chunkDir, chunkNames[1]), 'part2-'); | ||
| await writeFile(path.join(chunkDir, chunkNames[2]), 'part3'); | ||
|
|
||
| const destPath = path.join(tempDir, 'reassembled.txt'); | ||
| await reassembleChunks(chunkDir, chunkNames, destPath); | ||
|
|
||
| const reassembledContent = await readFile(destPath, 'utf8'); | ||
| expect(reassembledContent).toBe('part1-part2-part3'); | ||
| }); | ||
|
|
||
| it('handles chunks out of order in the list', async () => { | ||
| let chunkDir = path.join(tempDir, 'chunks-unordered'); | ||
| chunkDir = await mkdtemp(chunkDir); | ||
|
|
||
| const chunkNames = [ | ||
| `file.txt${CHUNK_SUFFIX}1`, | ||
| `file.txt${CHUNK_SUFFIX}0`, | ||
| `file.txt${CHUNK_SUFFIX}2`, | ||
| ]; | ||
|
|
||
| await writeFile(path.join(chunkDir, `file.txt${CHUNK_SUFFIX}0`), 'A'); | ||
| await writeFile(path.join(chunkDir, `file.txt${CHUNK_SUFFIX}1`), 'B'); | ||
| await writeFile(path.join(chunkDir, `file.txt${CHUNK_SUFFIX}2`), 'C'); | ||
|
|
||
| const destPath = path.join(tempDir, 'reassembled-ordered.txt'); | ||
| await reassembleChunks(chunkDir, chunkNames, destPath); | ||
|
|
||
| const reassembledContent = await readFile(destPath, 'utf8'); | ||
| expect(reassembledContent).toBe('ABC'); | ||
| }); | ||
|
|
||
| it('removes stale chunks when splitting a file that got smaller', async () => { | ||
| const sourcePath = path.join(tempDir, 'smaller-file.txt'); | ||
| const destBase = path.join(tempDir, 'repo-smaller.txt'); | ||
|
|
||
| // First, split a large file into 3 chunks | ||
| setChunkSizeForTesting(10); | ||
| await writeFile(sourcePath, 'a'.repeat(25)); // 3 chunks: 10, 10, 5 | ||
| await splitIntoChunks(sourcePath, destBase); | ||
| let files = await readdir(tempDir); | ||
| expect(files.filter((f) => f.startsWith(`repo-smaller.txt${CHUNK_SUFFIX}`))).toHaveLength(3); | ||
|
|
||
| // Now, split a smaller file into 1 chunk | ||
| await writeFile(sourcePath, 'b'.repeat(5)); // 1 chunk: 5 | ||
| // In copyItem, we'd call removeChunks then splitIntoChunks (if still > CHUNK_SIZE) | ||
| // or just copyItem which calls removeChunks. | ||
| // Let's simulate the copyItem logic for large -> smaller | ||
| await rm(destBase, { force: true }); | ||
| // Simulate removeChunks which should be called | ||
| const dir = path.dirname(destBase); | ||
| const baseName = path.basename(destBase); | ||
| const entries = await readdir(dir); | ||
| for (const entry of entries) { | ||
| if (entry.startsWith(baseName + CHUNK_SUFFIX)) { | ||
| await rm(path.join(dir, entry), { force: true }); | ||
| } | ||
| } | ||
|
|
||
| await splitIntoChunks(sourcePath, destBase); | ||
| files = await readdir(tempDir); | ||
| expect(files.filter((f) => f.startsWith(`repo-smaller.txt${CHUNK_SUFFIX}`))).toHaveLength(1); | ||
| }); | ||
| }); | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,4 @@ | ||
| import { Buffer } from 'node:buffer'; | ||
| import { promises as fs } from 'node:fs'; | ||
| import path from 'node:path'; | ||
|
|
||
|
|
@@ -39,12 +40,19 @@ interface ExtraPathManifest { | |
| entries: ExtraPathManifestEntry[]; | ||
| } | ||
|
|
||
| export let CHUNK_SIZE = 50 * 1024 * 1024; // 50MB | ||
| export const CHUNK_SUFFIX = '.ocsync-chunk.'; | ||
|
|
||
| export function setChunkSizeForTesting(size: number) { | ||
| CHUNK_SIZE = size; | ||
| } | ||
|
|
||
| export async function syncRepoToLocal( | ||
| plan: SyncPlan, | ||
| overrides: Record<string, unknown> | null | ||
| ): Promise<void> { | ||
| for (const item of plan.items) { | ||
| await copyItem(item.repoPath, item.localPath, item.type); | ||
| await copyItem(item.repoPath, item.localPath, item.type, false, false); | ||
| } | ||
|
|
||
| await applyExtraPaths(plan, plan.extraConfigs); | ||
|
|
@@ -98,7 +106,7 @@ export async function syncLocalToRepo( | |
| continue; | ||
| } | ||
|
|
||
| await copyItem(item.localPath, item.repoPath, item.type, true); | ||
| await copyItem(item.localPath, item.repoPath, item.type, true, true); | ||
| } | ||
|
|
||
| await writeExtraPathManifest(plan, plan.extraConfigs); | ||
|
|
@@ -109,22 +117,51 @@ async function copyItem( | |
| sourcePath: string, | ||
| destinationPath: string, | ||
| type: SyncItem['type'], | ||
| removeWhenMissing = false | ||
| removeWhenMissing = false, | ||
| toRepo = false | ||
| ): Promise<void> { | ||
| if (!(await pathExists(sourcePath))) { | ||
| const sourceExists = await pathExists(sourcePath); | ||
| const sourceChunks = type === 'file' ? await findChunks(sourcePath) : []; | ||
|
|
||
| if (!sourceExists && sourceChunks.length === 0) { | ||
| if (removeWhenMissing) { | ||
| await removePath(destinationPath); | ||
| if (toRepo) { | ||
| await removeChunks(destinationPath); | ||
| } | ||
| } | ||
| return; | ||
| } | ||
|
|
||
| if (type === 'file') { | ||
| if (toRepo) { | ||
| const stat = await fs.stat(sourcePath); | ||
| if (stat.size > CHUNK_SIZE) { | ||
| await removePath(destinationPath); | ||
| await removeChunks(destinationPath); | ||
| await fs.mkdir(path.dirname(destinationPath), { recursive: true }); | ||
| await splitIntoChunks(sourcePath, destinationPath); | ||
| return; | ||
| } | ||
| await removeChunks(destinationPath); | ||
| } else { | ||
| if (sourceChunks.length > 0) { | ||
| await reassembleChunks(path.dirname(sourcePath), sourceChunks, destinationPath); | ||
| const firstChunkStat = await fs.stat(path.join(path.dirname(sourcePath), sourceChunks[0])); | ||
| await chmodIfExists(destinationPath, firstChunkStat.mode & 0o777); | ||
| return; | ||
| } | ||
| } | ||
|
|
||
| await copyFileWithMode(sourcePath, destinationPath); | ||
| return; | ||
| } | ||
|
|
||
| await removePath(destinationPath); | ||
| await copyDirRecursive(sourcePath, destinationPath); | ||
| if (toRepo) { | ||
| await removeChunks(destinationPath); | ||
| } | ||
| await copyDirRecursive(sourcePath, destinationPath, toRepo); | ||
| } | ||
|
|
||
| async function copyConfigForRepo( | ||
|
|
@@ -198,21 +235,59 @@ async function copyFileWithMode(sourcePath: string, destinationPath: string): Pr | |
| await chmodIfExists(destinationPath, stat.mode & 0o777); | ||
| } | ||
|
|
||
| async function copyDirRecursive(sourcePath: string, destinationPath: string): Promise<void> { | ||
| async function copyDirRecursive( | ||
| sourcePath: string, | ||
| destinationPath: string, | ||
| toRepo = false | ||
| ): Promise<void> { | ||
| const stat = await fs.stat(sourcePath); | ||
| await fs.mkdir(destinationPath, { recursive: true }); | ||
| const entries = await fs.readdir(sourcePath, { withFileTypes: true }); | ||
|
|
||
| const processedFiles = new Set<string>(); | ||
|
|
||
| if (!toRepo) { | ||
| const chunksByBaseFile = new Map<string, string[]>(); | ||
| for (const entry of entries) { | ||
| if (entry.isFile() && entry.name.includes(CHUNK_SUFFIX)) { | ||
| const baseName = entry.name.split(CHUNK_SUFFIX)[0]; | ||
| if (baseName) { | ||
| const chunks = chunksByBaseFile.get(baseName) ?? []; | ||
| chunks.push(entry.name); | ||
| chunksByBaseFile.set(baseName, chunks); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| for (const [baseName, chunks] of chunksByBaseFile.entries()) { | ||
| const destFile = path.join(destinationPath, baseName); | ||
| await reassembleChunks(sourcePath, chunks, destFile); | ||
| const firstChunkStat = await fs.stat(path.join(sourcePath, chunks[0])); | ||
| await chmodIfExists(destFile, firstChunkStat.mode & 0o777); | ||
| for (const chunk of chunks) processedFiles.add(chunk); | ||
| } | ||
| } | ||
|
|
||
| for (const entry of entries) { | ||
| if (processedFiles.has(entry.name)) continue; | ||
|
|
||
| const entrySource = path.join(sourcePath, entry.name); | ||
| const entryDest = path.join(destinationPath, entry.name); | ||
|
|
||
| if (entry.isDirectory()) { | ||
| await copyDirRecursive(entrySource, entryDest); | ||
| await copyDirRecursive(entrySource, entryDest, toRepo); | ||
| continue; | ||
| } | ||
|
|
||
| if (entry.isFile()) { | ||
| if (toRepo) { | ||
| const fileStat = await fs.stat(entrySource); | ||
| if (fileStat.size > CHUNK_SIZE) { | ||
| await splitIntoChunks(entrySource, entryDest); | ||
| continue; | ||
| } | ||
| await removeChunks(entryDest); | ||
| } | ||
| await copyFileWithMode(entrySource, entryDest); | ||
| } | ||
| } | ||
|
|
@@ -244,9 +319,12 @@ async function applyExtraPaths(plan: SyncPlan, extra: ExtraPathPlan): Promise<vo | |
| const localPath = entry.sourcePath; | ||
| const entryType: ExtraPathType = entry.type ?? 'file'; | ||
|
|
||
| if (!(await pathExists(repoPath))) continue; | ||
| const repoExists = await pathExists(repoPath); | ||
| const repoChunks = entryType === 'file' ? await findChunks(repoPath) : []; | ||
|
|
||
| if (!repoExists && repoChunks.length === 0) continue; | ||
|
|
||
| await copyItem(repoPath, localPath, entryType); | ||
| await copyItem(repoPath, localPath, entryType, false, false); | ||
| await applyExtraPathModes(localPath, entry); | ||
| } | ||
| } | ||
|
|
@@ -271,7 +349,7 @@ async function writeExtraPathManifest(plan: SyncPlan, extra: ExtraPathPlan): Pro | |
| } | ||
| const stat = await fs.stat(sourcePath); | ||
| if (stat.isDirectory()) { | ||
| await copyDirRecursive(sourcePath, entry.repoPath); | ||
| await copyItem(sourcePath, entry.repoPath, 'dir', true, true); | ||
| const items = await collectExtraPathItems(sourcePath, sourcePath); | ||
| entries.push({ | ||
| sourcePath, | ||
|
|
@@ -283,7 +361,7 @@ async function writeExtraPathManifest(plan: SyncPlan, extra: ExtraPathPlan): Pro | |
| continue; | ||
| } | ||
| if (stat.isFile()) { | ||
| await copyFileWithMode(sourcePath, entry.repoPath); | ||
| await copyItem(sourcePath, entry.repoPath, 'file', true, true); | ||
| entries.push({ | ||
| sourcePath, | ||
| repoPath: path.relative(plan.repoRoot, entry.repoPath), | ||
|
|
@@ -374,6 +452,87 @@ function resolveExtraPathItem(basePath: string, relativePath: string): string | | |
| return resolvedPath; | ||
| } | ||
|
|
||
| export async function splitIntoChunks(sourcePath: string, destinationBase: string): Promise<void> { | ||
| const stat = await fs.stat(sourcePath); | ||
| const fd = await fs.open(sourcePath, 'r'); | ||
| try { | ||
| let chunkIndex = 0; | ||
| let bytesRead = 0; | ||
| const buffer = Buffer.alloc(Math.min(CHUNK_SIZE, 1024 * 1024)); | ||
|
|
||
| while (bytesRead < stat.size) { | ||
| const chunkPath = `${destinationBase}${CHUNK_SUFFIX}${chunkIndex}`; | ||
| const chunkFd = await fs.open(chunkPath, 'w'); | ||
| try { | ||
| let currentChunkBytes = 0; | ||
| while (currentChunkBytes < CHUNK_SIZE && bytesRead < stat.size) { | ||
| const toRead = Math.min( | ||
| buffer.length, | ||
| CHUNK_SIZE - currentChunkBytes, | ||
| stat.size - bytesRead | ||
| ); | ||
| const { bytesRead: n } = await fd.read(buffer, 0, toRead, bytesRead); | ||
| await chunkFd.write(buffer, 0, n); | ||
| bytesRead += n; | ||
| currentChunkBytes += n; | ||
| } | ||
| } finally { | ||
| await chunkFd.close(); | ||
| } | ||
| chunkIndex++; | ||
| } | ||
| } finally { | ||
| await fd.close(); | ||
| } | ||
| } | ||
|
|
||
| export async function reassembleChunks( | ||
| sourceDir: string, | ||
| chunkNames: string[], | ||
| destinationPath: string | ||
| ): Promise<void> { | ||
| await fs.mkdir(path.dirname(destinationPath), { recursive: true }); | ||
| const destFd = await fs.open(destinationPath, 'w'); | ||
| try { | ||
| const sortedChunks = [...chunkNames].sort((a, b) => { | ||
| const partsA = a.split(CHUNK_SUFFIX); | ||
| const partsB = b.split(CHUNK_SUFFIX); | ||
| const idxA = Number.parseInt(partsA[partsA.length - 1] ?? '0', 10); | ||
| const idxB = Number.parseInt(partsB[partsB.length - 1] ?? '0', 10); | ||
| return idxA - idxB; | ||
| }); | ||
|
|
||
| for (const chunkName of sortedChunks) { | ||
| const chunkPath = path.join(sourceDir, chunkName); | ||
| const chunkContent = await fs.readFile(chunkPath); | ||
| await destFd.write(chunkContent); | ||
| } | ||
| } finally { | ||
| await destFd.close(); | ||
| } | ||
| } | ||
|
Comment on lines
+489
to
+513
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current implementation of To improve memory efficiency, I recommend using streams to read from each chunk file and write to the destination file. This avoids loading entire chunks into memory. The export async function reassembleChunks(
sourceDir: string,
chunkNames: string[],
destinationPath: string
): Promise<void> {
await fs.mkdir(path.dirname(destinationPath), { recursive: true });
const destFd = await fs.open(destinationPath, 'w');
try {
const sortedChunks = [...chunkNames].sort((a, b) => {
const partsA = a.split(CHUNK_SUFFIX);
const partsB = b.split(CHUNK_SUFFIX);
const idxA = Number.parseInt(partsA[partsA.length - 1] ?? '0', 10);
const idxB = Number.parseInt(partsB[partsB.length - 1] ?? '0', 10);
return idxA - idxB;
});
const buffer = Buffer.alloc(1024 * 1024); // 1MB buffer
for (const chunkName of sortedChunks) {
const chunkPath = path.join(sourceDir, chunkName);
const chunkFd = await fs.open(chunkPath, 'r');
try {
let readResult;
while ((readResult = await chunkFd.read(buffer, 0, buffer.length, null)).bytesRead > 0) {
await destFd.write(buffer, 0, readResult.bytesRead);
}
} finally {
await chunkFd.close();
}
}
} finally {
await destFd.close();
}
} |
||
|
|
||
| async function removeChunks(basePath: string): Promise<void> { | ||
| const dir = path.dirname(basePath); | ||
| const baseName = path.basename(basePath); | ||
| if (!(await pathExists(dir))) return; | ||
|
|
||
| const entries = await fs.readdir(dir); | ||
| for (const entry of entries) { | ||
| if (entry.startsWith(baseName + CHUNK_SUFFIX)) { | ||
| await fs.rm(path.join(dir, entry), { force: true }); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| async function findChunks(basePath: string): Promise<string[]> { | ||
| const dir = path.dirname(basePath); | ||
| const baseName = path.basename(basePath); | ||
| if (!(await pathExists(dir))) return []; | ||
| const entries = await fs.readdir(dir); | ||
| return entries.filter((e) => e.startsWith(baseName + CHUNK_SUFFIX)); | ||
| } | ||
|
|
||
| function isDeepEqual(left: unknown, right: unknown): boolean { | ||
| if (left === right) return true; | ||
| if (typeof left !== typeof right) return false; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test case is confusing. Its name, 'removes stale chunks when splitting a file that got smaller', suggests it tests the cleanup of old chunks. However, the test itself manually performs this cleanup (lines 108-115) rather than testing the component responsible for it. The actual logic under test is simply that
splitIntoChunkscreates one chunk for a file smaller thanCHUNK_SIZE.The current structure, which simulates implementation details of another function (
copyItem), makes the test brittle and hard to understand.A clearer approach would be to have a separate, simpler test for this behavior. If the goal is to test the stale chunk removal, that should be done by testing the public function responsible for that orchestration (
syncLocalToRepoor an exportedcopyItem).I suggest simplifying this test to focus only on
splitIntoChunks's behavior with small files.