diff --git a/src/lib/modules/upload.ts b/src/lib/modules/upload.ts index 231b54b..83f4fd3 100644 --- a/src/lib/modules/upload.ts +++ b/src/lib/modules/upload.ts @@ -80,12 +80,12 @@ export const uploadBlob = async ( const limit = pLimit(options?.concurrency ?? 4); await Promise.all( - Array.from({ length: totalChunks }, (_, chunkIndex) => + Array.from({ length: totalChunks }, (_, i) => limit(() => uploadChunk( uploadId, - chunkIndex, - blob.slice(chunkIndex * CHUNK_SIZE, (chunkIndex + 1) * CHUNK_SIZE), + i + 1, // 1-based chunk index + blob.slice(i * CHUNK_SIZE, (i + 1) * CHUNK_SIZE), dataKey, onChunkProgress, ), diff --git a/src/lib/server/db/migrations/1768062380-AddChunkedUpload.ts b/src/lib/server/db/migrations/1768062380-AddChunkedUpload.ts index be6a900..26e6ae8 100644 --- a/src/lib/server/db/migrations/1768062380-AddChunkedUpload.ts +++ b/src/lib/server/db/migrations/1768062380-AddChunkedUpload.ts @@ -21,8 +21,14 @@ export const up = async (db: Kysely) => { .addColumn("type", "text", (col) => col.notNull()) .addColumn("user_id", "integer", (col) => col.references("user.id").notNull()) .addColumn("path", "text", (col) => col.notNull()) + .addColumn("bitmap", "bytea", (col) => col.notNull()) .addColumn("total_chunks", "integer", (col) => col.notNull()) - .addColumn("uploaded_chunks", sql`integer[]`, (col) => col.notNull().defaultTo(sql`'{}'`)) + .addColumn("uploaded_chunks", "integer", (col) => + col + .generatedAlwaysAs(sql`bit_count(bitmap)`) + .stored() + .notNull(), + ) .addColumn("expires_at", "timestamp(3)", (col) => col.notNull()) .addColumn("parent_id", "integer", (col) => col.references("directory.id")) .addColumn("master_encryption_key_version", "integer") @@ -46,6 +52,11 @@ export const up = async (db: Kysely) => { "hmac_secret_key", ["user_id", "version"], ) + .addCheckConstraint("upload_session_ck01", sql`uploaded_chunks <= total_chunks`) + .addCheckConstraint( + "upload_session_ck02", + sql`length(bitmap) = ceil(total_chunks / 8.0)::integer`, + ) .execute(); }; diff --git a/src/lib/server/db/schema/upload.ts b/src/lib/server/db/schema/upload.ts index e20227d..7aefc5d 100644 --- a/src/lib/server/db/schema/upload.ts +++ b/src/lib/server/db/schema/upload.ts @@ -6,8 +6,9 @@ interface UploadSessionTable { type: "file" | "thumbnail" | "migration"; user_id: number; path: string; + bitmap: Buffer; total_chunks: number; - uploaded_chunks: Generated; + uploaded_chunks: Generated; expires_at: Date; // For file uploads diff --git a/src/lib/server/db/upload.ts b/src/lib/server/db/upload.ts index 876c150..db19cbf 100644 --- a/src/lib/server/db/upload.ts +++ b/src/lib/server/db/upload.ts @@ -7,8 +7,9 @@ interface BaseUploadSession { id: string; userId: number; path: string; + bitmap: Buffer; totalChunks: number; - uploadedChunks: number[]; + uploadedChunks: number; expiresAt: Date; } @@ -37,7 +38,7 @@ interface MigrationUploadSession extends BaseUploadSession { } export const createFileUploadSession = async ( - params: Omit, + params: Omit, ) => { await db.transaction().execute(async (trx) => { const mek = await trx @@ -73,6 +74,7 @@ export const createFileUploadSession = async ( type: "file", user_id: params.userId, path: params.path, + bitmap: Buffer.alloc(Math.ceil(params.totalChunks / 8)), total_chunks: params.totalChunks, expires_at: params.expiresAt, parent_id: params.parentId !== "root" ? params.parentId : null, @@ -90,7 +92,7 @@ export const createFileUploadSession = async ( }; export const createThumbnailUploadSession = async ( - params: Omit, + params: Omit, ) => { await db.transaction().execute(async (trx) => { const file = await trx @@ -114,6 +116,7 @@ export const createThumbnailUploadSession = async ( type: "thumbnail", user_id: params.userId, path: params.path, + bitmap: Buffer.alloc(Math.ceil(params.totalChunks / 8)), total_chunks: params.totalChunks, expires_at: params.expiresAt, file_id: params.fileId, @@ -124,7 +127,7 @@ export const createThumbnailUploadSession = async ( }; export const createMigrationUploadSession = async ( - params: Omit, + params: Omit, ) => { await db.transaction().execute(async (trx) => { const file = await trx @@ -148,6 +151,7 @@ export const createMigrationUploadSession = async ( type: "migration", user_id: params.userId, path: params.path, + bitmap: Buffer.alloc(Math.ceil(params.totalChunks / 8)), total_chunks: params.totalChunks, expires_at: params.expiresAt, file_id: params.fileId, @@ -173,6 +177,7 @@ export const getUploadSession = async (sessionId: string, userId: number) => { id: session.id, userId: session.user_id, path: session.path, + bitmap: session.bitmap, totalChunks: session.total_chunks, uploadedChunks: session.uploaded_chunks, expiresAt: session.expires_at, @@ -192,6 +197,7 @@ export const getUploadSession = async (sessionId: string, userId: number) => { id: session.id, userId: session.user_id, path: session.path, + bitmap: session.bitmap, totalChunks: session.total_chunks, uploadedChunks: session.uploaded_chunks, expiresAt: session.expires_at, @@ -204,6 +210,7 @@ export const getUploadSession = async (sessionId: string, userId: number) => { id: session.id, userId: session.user_id, path: session.path, + bitmap: session.bitmap, totalChunks: session.total_chunks, uploadedChunks: session.uploaded_chunks, expiresAt: session.expires_at, @@ -215,7 +222,9 @@ export const getUploadSession = async (sessionId: string, userId: number) => { export const markChunkAsUploaded = async (sessionId: string, chunkIndex: number) => { await db .updateTable("upload_session") - .set({ uploaded_chunks: sql`array_append(uploaded_chunks, ${chunkIndex})` }) + .set({ + bitmap: sql`set_bit(${sql.ref("bitmap")}, ${chunkIndex - 1}, 1)`, + }) .where("id", "=", sessionId) .execute(); }; diff --git a/src/lib/server/services/upload.ts b/src/lib/server/services/upload.ts index 1f7043b..d654f42 100644 --- a/src/lib/server/services/upload.ts +++ b/src/lib/server/services/upload.ts @@ -8,6 +8,12 @@ import { safeRecursiveRm, safeUnlink } from "$lib/server/modules/filesystem"; const chunkLocks = new Set(); +const isChunkUploaded = (bitmap: Buffer, chunkIndex: number) => { + chunkIndex -= 1; + const byte = bitmap[Math.floor(chunkIndex / 8)]; + return !!byte && (byte & (1 << (chunkIndex % 8))) !== 0; // Postgres sucks +}; + export const uploadChunk = async ( userId: number, sessionId: string, @@ -28,13 +34,13 @@ export const uploadChunk = async ( const session = await UploadRepo.getUploadSession(sessionId, userId); if (!session) { error(404, "Invalid upload id"); - } else if (chunkIndex >= session.totalChunks) { + } else if (chunkIndex > session.totalChunks) { error(400, "Invalid chunk index"); - } else if (session.uploadedChunks.includes(chunkIndex)) { + } else if (isChunkUploaded(session.bitmap, chunkIndex)) { error(409, "Chunk already uploaded"); } - const isLastChunk = chunkIndex === session.totalChunks - 1; + const isLastChunk = chunkIndex === session.totalChunks; filePath = `${session.path}/${chunkIndex}`; const hashStream = createHash("sha256"); diff --git a/src/lib/services/file.ts b/src/lib/services/file.ts index 2f37f52..b8db243 100644 --- a/src/lib/services/file.ts +++ b/src/lib/services/file.ts @@ -1,5 +1,4 @@ import { getAllFileInfos } from "$lib/indexedDB/filesystem"; -import { encodeToBase64, digestMessage } from "$lib/modules/crypto"; import { getFileCache, storeFileCache, @@ -7,6 +6,7 @@ import { downloadFile, deleteFileThumbnailCache, } from "$lib/modules/file"; +import { uploadBlob } from "$lib/modules/upload"; import { trpc } from "$trpc/client"; export const requestFileDownload = async ( @@ -24,41 +24,24 @@ export const requestFileDownload = async ( export const requestFileThumbnailUpload = async ( fileId: number, + thumbnail: Blob, + dataKey: CryptoKey, dataKeyVersion: Date, - thumbnailEncrypted: { ciphertext: ArrayBuffer; iv: ArrayBuffer }, ) => { - const { uploadId } = await trpc().upload.startFileThumbnailUpload.mutate({ - file: fileId, - dekVersion: dataKeyVersion, - }); + try { + const { uploadId } = await trpc().upload.startFileThumbnailUpload.mutate({ + file: fileId, + dekVersion: dataKeyVersion, + }); - // Prepend IV to ciphertext (consistent with file download format) - const ivAndCiphertext = new Uint8Array( - thumbnailEncrypted.iv.byteLength + thumbnailEncrypted.ciphertext.byteLength, - ); - ivAndCiphertext.set(new Uint8Array(thumbnailEncrypted.iv), 0); - ivAndCiphertext.set( - new Uint8Array(thumbnailEncrypted.ciphertext), - thumbnailEncrypted.iv.byteLength, - ); + await uploadBlob(uploadId, thumbnail, dataKey); - const chunkHash = encodeToBase64(await digestMessage(ivAndCiphertext)); - - const response = await fetch(`/api/upload/${uploadId}/chunks/0`, { - method: "POST", - headers: { - "Content-Type": "application/octet-stream", - "Content-Digest": `sha-256=:${chunkHash}:`, - }, - body: ivAndCiphertext, - }); - - if (!response.ok) { - throw new Error(`Thumbnail upload failed: ${response.status} ${response.statusText}`); + await trpc().upload.completeFileThumbnailUpload.mutate({ uploadId }); + return true; + } catch { + // TODO: Error Handling + return false; } - - await trpc().upload.completeFileThumbnailUpload.mutate({ uploadId }); - return response; }; export const requestDeletedFilesCleanup = async () => { diff --git a/src/routes/(fullscreen)/file/[id]/service.ts b/src/routes/(fullscreen)/file/[id]/service.ts index ea3e49c..598418b 100644 --- a/src/routes/(fullscreen)/file/[id]/service.ts +++ b/src/routes/(fullscreen)/file/[id]/service.ts @@ -1,4 +1,3 @@ -import { encryptData } from "$lib/modules/crypto"; import { storeFileThumbnailCache } from "$lib/modules/file"; import { prepareFileDecryption, getDecryptedFileUrl } from "$lib/serviceWorker"; import { requestFileThumbnailUpload } from "$lib/services/file"; @@ -33,12 +32,10 @@ export const requestThumbnailUpload = async ( dataKey: CryptoKey, dataKeyVersion: Date, ) => { - const thumbnailBuffer = await thumbnail.arrayBuffer(); - const thumbnailEncrypted = await encryptData(thumbnailBuffer, dataKey); - const res = await requestFileThumbnailUpload(fileId, dataKeyVersion, thumbnailEncrypted); - if (!res.ok) return false; + const res = await requestFileThumbnailUpload(fileId, thumbnail, dataKey, dataKeyVersion); + if (!res) return false; - storeFileThumbnailCache(fileId, thumbnailBuffer); // Intended + void thumbnail.arrayBuffer().then((buffer) => storeFileThumbnailCache(fileId, buffer)); return true; }; diff --git a/src/routes/(fullscreen)/settings/thumbnail/service.ts b/src/routes/(fullscreen)/settings/thumbnail/service.ts index 381ed53..83b2890 100644 --- a/src/routes/(fullscreen)/settings/thumbnail/service.ts +++ b/src/routes/(fullscreen)/settings/thumbnail/service.ts @@ -1,10 +1,9 @@ import { limitFunction } from "p-limit"; import { SvelteMap } from "svelte/reactivity"; -import { encryptData } from "$lib/modules/crypto"; import { storeFileThumbnailCache } from "$lib/modules/file"; import type { FileInfo } from "$lib/modules/filesystem"; import { Scheduler } from "$lib/modules/scheduler"; -import { generateThumbnail as doGenerateThumbnail } from "$lib/modules/thumbnail"; +import { generateThumbnail } from "$lib/modules/thumbnail"; import { requestFileDownload, requestFileThumbnailUpload } from "$lib/services/file"; export type GenerationStatus = @@ -31,33 +30,25 @@ export const clearThumbnailGenerationStatuses = () => { } }; -const generateThumbnail = limitFunction( - async (fileId: number, fileBuffer: ArrayBuffer, fileType: string, dataKey: CryptoKey) => { - statuses.set(fileId, "generating"); - - const thumbnail = await doGenerateThumbnail(new Blob([fileBuffer], { type: fileType })); - if (!thumbnail) return null; - - const thumbnailBuffer = await thumbnail.arrayBuffer(); - const thumbnailEncrypted = await encryptData(thumbnailBuffer, dataKey); - statuses.set(fileId, "upload-pending"); - return { plaintext: thumbnailBuffer, ...thumbnailEncrypted }; - }, - { concurrency: 4 }, -); - const requestThumbnailUpload = limitFunction( - async ( - fileId: number, - dataKeyVersion: Date, - thumbnail: { plaintext: ArrayBuffer; ciphertext: ArrayBuffer; iv: ArrayBuffer }, - ) => { - statuses.set(fileId, "uploading"); + async (fileInfo: FileInfo, fileBuffer: ArrayBuffer) => { + statuses.set(fileInfo.id, "generating"); - const res = await requestFileThumbnailUpload(fileId, dataKeyVersion, thumbnail); - if (!res.ok) return false; - statuses.set(fileId, "uploaded"); - storeFileThumbnailCache(fileId, thumbnail.plaintext); // Intended + const thumbnail = await generateThumbnail( + new Blob([fileBuffer], { type: fileInfo.contentType }), + ); + if (!thumbnail) return false; + + const res = await requestFileThumbnailUpload( + fileInfo.id, + thumbnail, + fileInfo.dataKey?.key!, + fileInfo.dataKey?.version!, + ); + if (!res) return false; + + statuses.set(fileInfo.id, "uploaded"); + void thumbnail.arrayBuffer().then((buffer) => storeFileThumbnailCache(fileInfo.id, buffer)); return true; }, { concurrency: 4 }, @@ -81,16 +72,7 @@ export const requestThumbnailGeneration = async (fileInfo: FileInfo) => { return file.byteLength; }, async () => { - const thumbnail = await generateThumbnail( - fileInfo.id, - file!, - fileInfo.contentType, - fileInfo.dataKey?.key!, - ); - if ( - !thumbnail || - !(await requestThumbnailUpload(fileInfo.id, fileInfo.dataKey?.version!, thumbnail)) - ) { + if (!(await requestThumbnailUpload(fileInfo, file!))) { statuses.set(fileInfo.id, "error"); } }, diff --git a/src/routes/api/upload/[id]/chunks/[index]/+server.ts b/src/routes/api/upload/[id]/chunks/[index]/+server.ts index 689d313..179030e 100644 --- a/src/routes/api/upload/[id]/chunks/[index]/+server.ts +++ b/src/routes/api/upload/[id]/chunks/[index]/+server.ts @@ -13,7 +13,7 @@ export const POST: RequestHandler = async ({ locals, params, request }) => { const zodRes = z .object({ id: z.uuidv4(), - index: z.coerce.number().int().nonnegative(), + index: z.coerce.number().int().positive(), }) .safeParse(params); if (!zodRes.success) error(400, "Invalid path parameters"); diff --git a/src/trpc/routers/upload.ts b/src/trpc/routers/upload.ts index adc0a3e..3289aad 100644 --- a/src/trpc/routers/upload.ts +++ b/src/trpc/routers/upload.ts @@ -150,7 +150,7 @@ const uploadRouter = router({ (!session.hskVersion && input.contentHmac) ) { throw new TRPCError({ code: "BAD_REQUEST", message: "Invalid content HMAC" }); - } else if (session.uploadedChunks.length < session.totalChunks) { + } else if (session.uploadedChunks < session.totalChunks) { throw new TRPCError({ code: "BAD_REQUEST", message: "Upload not completed" }); } @@ -160,7 +160,7 @@ const uploadRouter = router({ const hashStream = createHash("sha256"); const writeStream = createWriteStream(filePath, { flags: "wx", mode: 0o600 }); - for (let i = 0; i < session.totalChunks; i++) { + for (let i = 1; i <= session.totalChunks; i++) { for await (const chunk of createReadStream(`${session.path}/${i}`)) { hashStream.update(chunk); writeStream.write(chunk); @@ -215,13 +215,13 @@ const uploadRouter = router({ const session = await UploadRepo.getUploadSession(uploadId, ctx.session.userId); if (!session || session.type !== "thumbnail") { throw new TRPCError({ code: "NOT_FOUND", message: "Invalid upload id" }); - } else if (session.uploadedChunks.length < session.totalChunks) { + } else if (session.uploadedChunks < session.totalChunks) { throw new TRPCError({ code: "BAD_REQUEST", message: "Upload not completed" }); } thumbnailPath = `${env.thumbnailsPath}/${ctx.session.userId}/${uploadId}`; await mkdir(dirname(thumbnailPath), { recursive: true }); - await rename(`${session.path}/0`, thumbnailPath); + await rename(`${session.path}/1`, thumbnailPath); const oldThumbnailPath = await db.transaction().execute(async (trx) => { const oldPath = await MediaRepo.updateFileThumbnail( @@ -305,7 +305,7 @@ const uploadRouter = router({ const session = await UploadRepo.getUploadSession(uploadId, ctx.session.userId); if (!session || session.type !== "migration") { throw new TRPCError({ code: "NOT_FOUND", message: "Invalid upload id" }); - } else if (session.uploadedChunks.length < session.totalChunks) { + } else if (session.uploadedChunks < session.totalChunks) { throw new TRPCError({ code: "BAD_REQUEST", message: "Upload not completed" }); } @@ -315,7 +315,7 @@ const uploadRouter = router({ const hashStream = createHash("sha256"); const writeStream = createWriteStream(filePath, { flags: "wx", mode: 0o600 }); - for (let i = 0; i < session.totalChunks; i++) { + for (let i = 1; i <= session.totalChunks; i++) { for await (const chunk of createReadStream(`${session.path}/${i}`)) { hashStream.update(chunk); writeStream.write(chunk);