diff --git a/package.json b/package.json index 17dad8d..952d53f 100644 --- a/package.json +++ b/package.json @@ -56,7 +56,6 @@ "vite": "^7.3.0" }, "dependencies": { - "@fastify/busboy": "^3.2.0", "@trpc/server": "^11.8.1", "argon2": "^0.44.0", "kysely": "^0.28.9", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 025aacd..f4c8e80 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -8,9 +8,6 @@ importers: .: dependencies: - '@fastify/busboy': - specifier: ^3.2.0 - version: 3.2.0 '@trpc/server': specifier: ^11.8.1 version: 11.8.1(typescript@5.9.3) @@ -373,9 +370,6 @@ packages: resolution: {integrity: sha512-43/qtrDUokr7LJqoF2c3+RInu/t4zfrpYdoSDfYyhg52rwLV6TnOvdG4fXm7IkSB3wErkcmJS9iEhjVtOSEjjA==} engines: {node: ^18.18.0 || ^20.9.0 || >=21.1.0} - '@fastify/busboy@3.2.0': - resolution: {integrity: sha512-m9FVDXU3GT2ITSe0UaMA5rU3QkfC/UXtCU8y0gSN/GugTqtVldOBWIB5V6V3sbmenVZUIpU6f+mPEO2+m5iTaA==} - '@humanfs/core@0.19.1': resolution: {integrity: sha512-5DyQ4+1JEUzejeK1JGICcideyfUbGixgS9jNgex5nqkW+cY7WZhxBigmieN5Qnw9ZosSNVC9KQKyb+GUaGyKUA==} engines: {node: '>=18.18.0'} @@ -2180,8 +2174,6 @@ snapshots: '@eslint/core': 0.17.0 levn: 0.4.1 - '@fastify/busboy@3.2.0': {} - '@humanfs/core@0.19.1': {} '@humanfs/node@0.16.7': diff --git a/src/lib/modules/file/upload.svelte.ts b/src/lib/modules/file/upload.svelte.ts index ac3010e..eaa35df 100644 --- a/src/lib/modules/file/upload.svelte.ts +++ b/src/lib/modules/file/upload.svelte.ts @@ -1,4 +1,3 @@ -import axios from "axios"; import ExifReader from "exifreader"; import pLimit, { limitFunction } from "p-limit"; import { CHUNK_SIZE } from "$lib/constants"; @@ -15,7 +14,6 @@ import { } from "$lib/modules/crypto"; import { Scheduler } from "$lib/modules/scheduler"; import { generateThumbnail, generateThumbnailFromFile } from "$lib/modules/thumbnail"; -import type { FileThumbnailUploadRequest } from "$lib/server/schemas"; import type { MasterKey, HmacSecret } from "$lib/stores"; import { trpc } from "$trpc/client"; import type { RouterInputs } from "$trpc/router.server"; @@ -194,17 +192,55 @@ const encryptFile = limitFunction( { concurrency: 4 }, ); +const uploadThumbnail = async ( + fileId: number, + thumbnailEncrypted: { ciphertext: ArrayBuffer; iv: ArrayBuffer }, + dataKeyVersion: Date, +) => { + const { uploadId } = await trpc().upload.startFileThumbnailUpload.mutate({ + file: fileId, + dekVersion: dataKeyVersion, + }); + + const ivAndCiphertext = new Uint8Array( + thumbnailEncrypted.iv.byteLength + thumbnailEncrypted.ciphertext.byteLength, + ); + ivAndCiphertext.set(new Uint8Array(thumbnailEncrypted.iv), 0); + ivAndCiphertext.set( + new Uint8Array(thumbnailEncrypted.ciphertext), + thumbnailEncrypted.iv.byteLength, + ); + + const chunkHash = encodeToBase64(await digestMessage(ivAndCiphertext)); + + const response = await fetch(`/api/upload/${uploadId}/chunks/0`, { + method: "POST", + headers: { + "Content-Type": "application/octet-stream", + "Content-Digest": `sha-256=:${chunkHash}:`, + }, + body: ivAndCiphertext, + }); + + if (!response.ok) { + throw new Error(`Thumbnail upload failed: ${response.status} ${response.statusText}`); + } + + await trpc().upload.completeFileThumbnailUpload.mutate({ uploadId }); +}; + const requestFileUpload = limitFunction( async ( state: FileUploadState, - metadata: RouterInputs["file"]["startUpload"], + metadata: RouterInputs["upload"]["startFileUpload"], chunksEncrypted: { chunkEncrypted: ArrayBuffer; chunkEncryptedHash: string }[], fileSigned: string | undefined, - thumbnailForm: FormData | null, + thumbnailData: { ciphertext: ArrayBuffer; iv: ArrayBuffer; plaintext: ArrayBuffer } | null, + dataKeyVersion: Date, ) => { state.status = "uploading"; - const { uploadId } = await trpc().file.startUpload.mutate(metadata); + const { uploadId } = await trpc().upload.startFileUpload.mutate(metadata); // Upload chunks with progress tracking const totalBytes = chunksEncrypted.reduce((sum, c) => sum + c.chunkEncrypted.byteLength, 0); @@ -214,7 +250,7 @@ const requestFileUpload = limitFunction( for (let i = 0; i < chunksEncrypted.length; i++) { const { chunkEncrypted, chunkEncryptedHash } = chunksEncrypted[i]!; - const response = await fetch(`/api/file/upload/${uploadId}/chunks/${i}`, { + const response = await fetch(`/api/upload/${uploadId}/chunks/${i}`, { method: "POST", headers: { "Content-Type": "application/octet-stream", @@ -241,15 +277,15 @@ const requestFileUpload = limitFunction( } // Complete upload - const { file: fileId } = await trpc().file.completeUpload.mutate({ + const { file: fileId } = await trpc().upload.completeFileUpload.mutate({ uploadId, contentHmac: fileSigned, }); // Upload thumbnail if exists - if (thumbnailForm) { + if (thumbnailData) { try { - await axios.post(`/api/file/${fileId}/thumbnail/upload`, thumbnailForm); + await uploadThumbnail(fileId, thumbnailData, dataKeyVersion); } catch (e) { // TODO: Error handling for thumbnail upload console.error(e); @@ -258,7 +294,7 @@ const requestFileUpload = limitFunction( state.status = "uploaded"; - return { fileId }; + return { fileId, thumbnailBuffer: thumbnailData?.plaintext }; }, { concurrency: 1 }, ); @@ -296,7 +332,7 @@ const uploadFileStreaming = async ( lastModifiedAtIv: lastModifiedAtEncrypted.iv, }; - const { uploadId } = await trpc().file.startUpload.mutate(metadata); + const { uploadId } = await trpc().upload.startFileUpload.mutate(metadata); // Stream file, encrypt, and upload with concurrency limit const reader = file.stream().getReader(); @@ -315,7 +351,7 @@ const uploadFileStreaming = async ( chunkHash: string, originalChunkSize: number, ) => { - const response = await fetch(`/api/file/upload/${uploadId}/chunks/${index}`, { + const response = await fetch(`/api/upload/${uploadId}/chunks/${index}`, { method: "POST", headers: { "Content-Type": "application/octet-stream", @@ -370,7 +406,7 @@ const uploadFileStreaming = async ( await Promise.all(uploadPromises); - const { file: fileId } = await trpc().file.completeUpload.mutate({ + const { file: fileId } = await trpc().upload.completeFileUpload.mutate({ uploadId, contentHmac: fileSigned, }); @@ -383,16 +419,7 @@ const uploadFileStreaming = async ( const thumbnailBuffer = await thumbnail.arrayBuffer(); const thumbnailEncrypted = await encryptData(thumbnailBuffer, dataKey); - const thumbnailForm = new FormData(); - thumbnailForm.set( - "metadata", - JSON.stringify({ - dekVersion: dataKeyVersion.toISOString(), - contentIv: encodeToBase64(thumbnailEncrypted.iv), - } satisfies FileThumbnailUploadRequest), - ); - thumbnailForm.set("content", new Blob([thumbnailEncrypted.ciphertext])); - await axios.post(`/api/file/${fileId}/thumbnail/upload`, thumbnailForm); + await uploadThumbnail(fileId, thumbnailEncrypted, dataKeyVersion); } } catch (e) { // Thumbnail upload failure is not critical @@ -465,27 +492,15 @@ export const uploadFile = async ( lastModifiedAtIv: lastModifiedAtEncrypted.iv, }; - let thumbnailForm = null; - if (thumbnail) { - thumbnailForm = new FormData(); - thumbnailForm.set( - "metadata", - JSON.stringify({ - dekVersion: dataKeyVersion.toISOString(), - contentIv: encodeToBase64(thumbnail.iv), - } satisfies FileThumbnailUploadRequest), - ); - thumbnailForm.set("content", new Blob([thumbnail.ciphertext])); - } - - const { fileId } = await requestFileUpload( + const { fileId, thumbnailBuffer } = await requestFileUpload( state, metadata, chunksEncrypted, fileSigned, - thumbnailForm, + thumbnail ?? null, + dataKeyVersion, ); - return { fileId, fileBuffer, thumbnailBuffer: thumbnail?.plaintext }; + return { fileId, fileBuffer, thumbnailBuffer }; } catch (e) { state.status = "error"; throw e; diff --git a/src/lib/server/db/media.ts b/src/lib/server/db/media.ts index 209e256..c4d2a34 100644 --- a/src/lib/server/db/media.ts +++ b/src/lib/server/db/media.ts @@ -6,7 +6,7 @@ interface Thumbnail { id: number; path: string; updatedAt: Date; - encContentIv: string; + encContentIv: string | null; } interface FileThumbnail extends Thumbnail { @@ -18,7 +18,7 @@ export const updateFileThumbnail = async ( fileId: number, dekVersion: Date, path: string, - encContentIv: string, + encContentIv: string | null, ) => { return await db.transaction().execute(async (trx) => { const file = await trx diff --git a/src/lib/server/db/migrations/1768062380-AddChunkedUpload.ts b/src/lib/server/db/migrations/1768062380-AddChunkedUpload.ts index fe8abd4..cf18c05 100644 --- a/src/lib/server/db/migrations/1768062380-AddChunkedUpload.ts +++ b/src/lib/server/db/migrations/1768062380-AddChunkedUpload.ts @@ -8,23 +8,31 @@ export const up = async (db: Kysely) => { .alterColumn("encrypted_content_iv", (col) => col.dropNotNull()) .execute(); + // media.ts + await db.schema + .alterTable("thumbnail") + .alterColumn("encrypted_content_iv", (col) => col.dropNotNull()) + .execute(); + // upload.ts await db.schema .createTable("upload_session") .addColumn("id", "uuid", (col) => col.primaryKey().defaultTo(sql`gen_random_uuid()`)) + .addColumn("type", "text", (col) => col.notNull()) .addColumn("user_id", "integer", (col) => col.references("user.id").notNull()) .addColumn("total_chunks", "integer", (col) => col.notNull()) .addColumn("uploaded_chunks", sql`integer[]`, (col) => col.notNull().defaultTo(sql`'{}'`)) .addColumn("expires_at", "timestamp(3)", (col) => col.notNull()) .addColumn("parent_id", "integer", (col) => col.references("directory.id")) - .addColumn("master_encryption_key_version", "integer", (col) => col.notNull()) - .addColumn("encrypted_data_encryption_key", "text", (col) => col.notNull()) - .addColumn("data_encryption_key_version", "timestamp(3)", (col) => col.notNull()) + .addColumn("master_encryption_key_version", "integer") + .addColumn("encrypted_data_encryption_key", "text") + .addColumn("data_encryption_key_version", "timestamp(3)") .addColumn("hmac_secret_key_version", "integer") - .addColumn("content_type", "text", (col) => col.notNull()) - .addColumn("encrypted_name", "json", (col) => col.notNull()) + .addColumn("content_type", "text") + .addColumn("encrypted_name", "json") .addColumn("encrypted_created_at", "json") - .addColumn("encrypted_last_modified_at", "json", (col) => col.notNull()) + .addColumn("encrypted_last_modified_at", "json") + .addColumn("file_id", "integer", (col) => col.references("file.id")) .addForeignKeyConstraint( "upload_session_fk01", ["user_id", "master_encryption_key_version"], @@ -43,6 +51,10 @@ export const up = async (db: Kysely) => { // eslint-disable-next-line @typescript-eslint/no-explicit-any export const down = async (db: Kysely) => { await db.schema.dropTable("upload_session").execute(); + await db.schema + .alterTable("thumbnail") + .alterColumn("encrypted_content_iv", (col) => col.setNotNull()) + .execute(); await db.schema .alterTable("file") .alterColumn("encrypted_content_iv", (col) => col.setNotNull()) diff --git a/src/lib/server/db/schema/media.ts b/src/lib/server/db/schema/media.ts index ebfbf29..1fef90b 100644 --- a/src/lib/server/db/schema/media.ts +++ b/src/lib/server/db/schema/media.ts @@ -7,7 +7,7 @@ interface ThumbnailTable { category_id: number | null; path: string; updated_at: Date; - encrypted_content_iv: string; // Base64 + encrypted_content_iv: string | null; // Base64 } declare module "./index" { diff --git a/src/lib/server/db/schema/upload.ts b/src/lib/server/db/schema/upload.ts index 3372955..26eaac2 100644 --- a/src/lib/server/db/schema/upload.ts +++ b/src/lib/server/db/schema/upload.ts @@ -3,20 +3,25 @@ import type { Ciphertext } from "./util"; interface UploadSessionTable { id: Generated; + type: "file" | "thumbnail"; user_id: number; total_chunks: number; uploaded_chunks: Generated; expires_at: Date; + // For file uploads parent_id: number | null; - master_encryption_key_version: number; - encrypted_data_encryption_key: string; // Base64 - data_encryption_key_version: Date; + master_encryption_key_version: number | null; + encrypted_data_encryption_key: string | null; // Base64 + data_encryption_key_version: Date | null; hmac_secret_key_version: number | null; - content_type: string; - encrypted_name: Ciphertext; + content_type: string | null; + encrypted_name: Ciphertext | null; encrypted_created_at: Ciphertext | null; - encrypted_last_modified_at: Ciphertext; + encrypted_last_modified_at: Ciphertext | null; + + // For thumbnail uploads + file_id: number | null; } declare module "./index" { diff --git a/src/lib/server/db/upload.ts b/src/lib/server/db/upload.ts index 935dc80..4c8da24 100644 --- a/src/lib/server/db/upload.ts +++ b/src/lib/server/db/upload.ts @@ -3,13 +3,16 @@ import { IntegrityError } from "./error"; import db from "./kysely"; import type { Ciphertext } from "./schema"; -interface UploadSession { +interface BaseUploadSession { id: string; userId: number; totalChunks: number; uploadedChunks: number[]; expiresAt: Date; +} +interface FileUploadSession extends BaseUploadSession { + type: "file"; parentId: DirectoryId; mekVersion: number; encDek: string; @@ -21,7 +24,15 @@ interface UploadSession { encLastModifiedAt: Ciphertext; } -export const createUploadSession = async (params: Omit) => { +interface ThumbnailUploadSession extends BaseUploadSession { + type: "thumbnail"; + fileId: number; + dekVersion: Date; +} + +export const createFileUploadSession = async ( + params: Omit, +) => { return await db.transaction().execute(async (trx) => { const mek = await trx .selectFrom("master_encryption_key") @@ -52,6 +63,7 @@ export const createUploadSession = async (params: Omit, +) => { + return await db.transaction().execute(async (trx) => { + const file = await trx + .selectFrom("file") + .select("data_encryption_key_version") + .where("id", "=", params.fileId) + .where("user_id", "=", params.userId) + .limit(1) + .forUpdate() + .executeTakeFirst(); + if (!file) { + throw new IntegrityError("File not found"); + } else if (file.data_encryption_key_version.getTime() !== params.dekVersion.getTime()) { + throw new IntegrityError("Invalid DEK version"); + } + + const { sessionId } = await trx + .insertInto("upload_session") + .values({ + type: "thumbnail", + user_id: params.userId, + total_chunks: 1, + expires_at: params.expiresAt, + file_id: params.fileId, + data_encryption_key_version: params.dekVersion, + }) + .returning("id as sessionId") + .executeTakeFirstOrThrow(); + return { id: sessionId }; + }); +}; + export const getUploadSession = async (sessionId: string, userId: number) => { const session = await db .selectFrom("upload_session") @@ -80,24 +126,39 @@ export const getUploadSession = async (sessionId: string, userId: number) => { .where("expires_at", ">", new Date()) .limit(1) .executeTakeFirst(); - return session - ? ({ - id: session.id, - userId: session.user_id, - totalChunks: session.total_chunks, - uploadedChunks: session.uploaded_chunks, - expiresAt: session.expires_at, - parentId: session.parent_id ?? "root", - mekVersion: session.master_encryption_key_version, - encDek: session.encrypted_data_encryption_key, - dekVersion: session.data_encryption_key_version, - hskVersion: session.hmac_secret_key_version, - contentType: session.content_type, - encName: session.encrypted_name, - encCreatedAt: session.encrypted_created_at, - encLastModifiedAt: session.encrypted_last_modified_at, - } satisfies UploadSession) - : null; + + if (!session) return null; + + if (session.type === "file") { + return { + type: "file", + id: session.id, + userId: session.user_id, + totalChunks: session.total_chunks, + uploadedChunks: session.uploaded_chunks, + expiresAt: session.expires_at, + parentId: session.parent_id ?? "root", + mekVersion: session.master_encryption_key_version!, + encDek: session.encrypted_data_encryption_key!, + dekVersion: session.data_encryption_key_version!, + hskVersion: session.hmac_secret_key_version, + contentType: session.content_type!, + encName: session.encrypted_name!, + encCreatedAt: session.encrypted_created_at, + encLastModifiedAt: session.encrypted_last_modified_at!, + } satisfies FileUploadSession; + } else { + return { + type: "thumbnail", + id: session.id, + userId: session.user_id, + totalChunks: session.total_chunks, + uploadedChunks: session.uploaded_chunks, + expiresAt: session.expires_at, + fileId: session.file_id!, + dekVersion: session.data_encryption_key_version!, + } satisfies ThumbnailUploadSession; + } }; export const markChunkAsUploaded = async (sessionId: string, chunkIndex: number) => { diff --git a/src/lib/server/services/file.ts b/src/lib/server/services/file.ts index 9df6430..0d67303 100644 --- a/src/lib/server/services/file.ts +++ b/src/lib/server/services/file.ts @@ -1,17 +1,8 @@ import { error } from "@sveltejs/kit"; -import { createHash } from "crypto"; -import { createReadStream, createWriteStream } from "fs"; -import { mkdir, stat } from "fs/promises"; -import { dirname } from "path"; +import { createReadStream } from "fs"; +import { stat } from "fs/promises"; import { Readable } from "stream"; -import { pipeline } from "stream/promises"; -import { v4 as uuidv4 } from "uuid"; -import { CHUNK_SIZE, ENCRYPTION_OVERHEAD } from "$lib/constants"; -import { FileRepo, MediaRepo, UploadRepo, IntegrityError } from "$lib/server/db"; -import env from "$lib/server/loadenv"; -import { getChunkDirectoryPath, safeUnlink } from "$lib/server/modules/filesystem"; - -const uploadLocks = new Set(); +import { FileRepo, MediaRepo } from "$lib/server/db"; const createEncContentStream = async ( path: string, @@ -77,110 +68,7 @@ export const getFileThumbnailStream = async ( return createEncContentStream( thumbnail.path, - Buffer.from(thumbnail.encContentIv, "base64"), + thumbnail.encContentIv ? Buffer.from(thumbnail.encContentIv, "base64") : undefined, range, ); }; - -export const uploadFileThumbnail = async ( - userId: number, - fileId: number, - dekVersion: Date, - encContentIv: string, - encContentStream: Readable, -) => { - const path = `${env.thumbnailsPath}/${userId}/${uuidv4()}`; - await mkdir(dirname(path), { recursive: true }); - - try { - await pipeline(encContentStream, createWriteStream(path, { flags: "wx", mode: 0o600 })); - - const oldPath = await MediaRepo.updateFileThumbnail( - userId, - fileId, - dekVersion, - path, - encContentIv, - ); - safeUnlink(oldPath); // Intended - } catch (e) { - await safeUnlink(path); - - if (e instanceof IntegrityError) { - if (e.message === "File not found") { - error(404, "File not found"); - } else if (e.message === "Invalid DEK version") { - error(400, "Mismatched DEK version"); - } - } - throw e; - } -}; - -export const uploadChunk = async ( - userId: number, - sessionId: string, - chunkIndex: number, - encChunkStream: Readable, - encChunkHash: string, -) => { - const lockKey = `${sessionId}/${chunkIndex}`; - if (uploadLocks.has(lockKey)) { - error(409, "Chunk already uploaded"); // TODO: Message - } else { - uploadLocks.add(lockKey); - } - - const filePath = `${getChunkDirectoryPath(sessionId)}/${chunkIndex}`; - - try { - const session = await UploadRepo.getUploadSession(sessionId, userId); - if (!session) { - error(404, "Invalid upload id"); - } else if (chunkIndex >= session.totalChunks) { - error(400, "Invalid chunk index"); - } else if (session.uploadedChunks.includes(chunkIndex)) { - error(409, "Chunk already uploaded"); - } - - const isLastChunk = chunkIndex === session.totalChunks - 1; - - let writtenBytes = 0; - const hashStream = createHash("sha256"); - const writeStream = createWriteStream(filePath, { flags: "wx", mode: 0o600 }); - - for await (const chunk of encChunkStream) { - writtenBytes += chunk.length; - hashStream.update(chunk); - writeStream.write(chunk); - } - - await new Promise((resolve, reject) => { - writeStream.end((e: any) => (e ? reject(e) : resolve())); - }); - - if (hashStream.digest("base64") !== encChunkHash) { - throw new Error("Invalid checksum"); - } else if ( - (!isLastChunk && writtenBytes !== CHUNK_SIZE + ENCRYPTION_OVERHEAD) || - (isLastChunk && - (writtenBytes <= ENCRYPTION_OVERHEAD || writtenBytes > CHUNK_SIZE + ENCRYPTION_OVERHEAD)) - ) { - throw new Error("Invalid chunk size"); - } - - await UploadRepo.markChunkAsUploaded(sessionId, chunkIndex); - } catch (e) { - await safeUnlink(filePath); - - if ( - e instanceof Error && - (e.message === "Invalid checksum" || e.message === "Invalid chunk size") - ) { - error(400, "Invalid request body"); - } - throw e; - } finally { - uploadLocks.delete(lockKey); - } -}; diff --git a/src/lib/server/services/upload.ts b/src/lib/server/services/upload.ts new file mode 100644 index 0000000..1be250d --- /dev/null +++ b/src/lib/server/services/upload.ts @@ -0,0 +1,77 @@ +import { error } from "@sveltejs/kit"; +import { createHash } from "crypto"; +import { createWriteStream } from "fs"; +import { Readable } from "stream"; +import { CHUNK_SIZE, ENCRYPTION_OVERHEAD } from "$lib/constants"; +import { UploadRepo } from "$lib/server/db"; +import { getChunkDirectoryPath, safeUnlink } from "$lib/server/modules/filesystem"; + +const chunkLocks = new Set(); + +export const uploadChunk = async ( + userId: number, + sessionId: string, + chunkIndex: number, + encChunkStream: Readable, + encChunkHash: string, +) => { + const lockKey = `${sessionId}/${chunkIndex}`; + if (chunkLocks.has(lockKey)) { + error(409, "Chunk already uploaded"); // TODO: Message + } else { + chunkLocks.add(lockKey); + } + + const filePath = `${getChunkDirectoryPath(sessionId)}/${chunkIndex}`; + + try { + const session = await UploadRepo.getUploadSession(sessionId, userId); + if (!session) { + error(404, "Invalid upload id"); + } else if (chunkIndex >= session.totalChunks) { + error(400, "Invalid chunk index"); + } else if (session.uploadedChunks.includes(chunkIndex)) { + error(409, "Chunk already uploaded"); + } + + const isLastChunk = chunkIndex === session.totalChunks - 1; + + let writtenBytes = 0; + const hashStream = createHash("sha256"); + const writeStream = createWriteStream(filePath, { flags: "wx", mode: 0o600 }); + + for await (const chunk of encChunkStream) { + writtenBytes += chunk.length; + hashStream.update(chunk); + writeStream.write(chunk); + } + + await new Promise((resolve, reject) => { + writeStream.end((e: any) => (e ? reject(e) : resolve())); + }); + + if (hashStream.digest("base64") !== encChunkHash) { + throw new Error("Invalid checksum"); + } else if ( + (!isLastChunk && writtenBytes !== CHUNK_SIZE + ENCRYPTION_OVERHEAD) || + (isLastChunk && + (writtenBytes <= ENCRYPTION_OVERHEAD || writtenBytes > CHUNK_SIZE + ENCRYPTION_OVERHEAD)) + ) { + throw new Error("Invalid chunk size"); + } + + await UploadRepo.markChunkAsUploaded(sessionId, chunkIndex); + } catch (e) { + await safeUnlink(filePath); + + if ( + e instanceof Error && + (e.message === "Invalid checksum" || e.message === "Invalid chunk size") + ) { + error(400, "Invalid request body"); + } + throw e; + } finally { + chunkLocks.delete(lockKey); + } +}; diff --git a/src/lib/services/file.ts b/src/lib/services/file.ts index 5f95f42..2f37f52 100644 --- a/src/lib/services/file.ts +++ b/src/lib/services/file.ts @@ -1,5 +1,5 @@ import { getAllFileInfos } from "$lib/indexedDB/filesystem"; -import { encodeToBase64 } from "$lib/modules/crypto"; +import { encodeToBase64, digestMessage } from "$lib/modules/crypto"; import { getFileCache, storeFileCache, @@ -7,7 +7,6 @@ import { downloadFile, deleteFileThumbnailCache, } from "$lib/modules/file"; -import type { FileThumbnailUploadRequest } from "$lib/server/schemas"; import { trpc } from "$trpc/client"; export const requestFileDownload = async ( @@ -28,17 +27,38 @@ export const requestFileThumbnailUpload = async ( dataKeyVersion: Date, thumbnailEncrypted: { ciphertext: ArrayBuffer; iv: ArrayBuffer }, ) => { - const form = new FormData(); - form.set( - "metadata", - JSON.stringify({ - dekVersion: dataKeyVersion.toISOString(), - contentIv: encodeToBase64(thumbnailEncrypted.iv), - } satisfies FileThumbnailUploadRequest), - ); - form.set("content", new Blob([thumbnailEncrypted.ciphertext])); + const { uploadId } = await trpc().upload.startFileThumbnailUpload.mutate({ + file: fileId, + dekVersion: dataKeyVersion, + }); - return await fetch(`/api/file/${fileId}/thumbnail/upload`, { method: "POST", body: form }); + // Prepend IV to ciphertext (consistent with file download format) + const ivAndCiphertext = new Uint8Array( + thumbnailEncrypted.iv.byteLength + thumbnailEncrypted.ciphertext.byteLength, + ); + ivAndCiphertext.set(new Uint8Array(thumbnailEncrypted.iv), 0); + ivAndCiphertext.set( + new Uint8Array(thumbnailEncrypted.ciphertext), + thumbnailEncrypted.iv.byteLength, + ); + + const chunkHash = encodeToBase64(await digestMessage(ivAndCiphertext)); + + const response = await fetch(`/api/upload/${uploadId}/chunks/0`, { + method: "POST", + headers: { + "Content-Type": "application/octet-stream", + "Content-Digest": `sha-256=:${chunkHash}:`, + }, + body: ivAndCiphertext, + }); + + if (!response.ok) { + throw new Error(`Thumbnail upload failed: ${response.status} ${response.statusText}`); + } + + await trpc().upload.completeFileThumbnailUpload.mutate({ uploadId }); + return response; }; export const requestDeletedFilesCleanup = async () => { diff --git a/src/routes/api/file/[id]/thumbnail/upload/+server.ts b/src/routes/api/file/[id]/thumbnail/upload/+server.ts deleted file mode 100644 index 62dfe42..0000000 --- a/src/routes/api/file/[id]/thumbnail/upload/+server.ts +++ /dev/null @@ -1,74 +0,0 @@ -import Busboy from "@fastify/busboy"; -import { error, text } from "@sveltejs/kit"; -import { Readable, Writable } from "stream"; -import { z } from "zod"; -import { authorize } from "$lib/server/modules/auth"; -import { fileThumbnailUploadRequest, type FileThumbnailUploadRequest } from "$lib/server/schemas"; -import { uploadFileThumbnail } from "$lib/server/services/file"; -import type { RequestHandler } from "./$types"; - -export const POST: RequestHandler = async ({ locals, params, request }) => { - const { userId } = await authorize(locals, "activeClient"); - - const zodRes = z - .object({ - id: z.coerce.number().int().positive(), - }) - .safeParse(params); - if (!zodRes.success) error(400, "Invalid path parameters"); - const { id } = zodRes.data; - - const contentType = request.headers.get("Content-Type"); - if (!contentType?.startsWith("multipart/form-data") || !request.body) { - error(400, "Invalid request body"); - } - - return new Promise((resolve, reject) => { - const bb = Busboy({ headers: { "content-type": contentType } }); - const handler = - (f: (...args: T) => Promise) => - (...args: T) => { - f(...args).catch(reject); - }; - - let metadata: FileThumbnailUploadRequest | null = null; - let content: Readable | null = null; - bb.on( - "field", - handler(async (fieldname, val) => { - if (fieldname === "metadata") { - // Ignore subsequent metadata fields - if (!metadata) { - const zodRes = fileThumbnailUploadRequest.safeParse(JSON.parse(val)); - if (!zodRes.success) error(400, "Invalid request body"); - metadata = zodRes.data; - } - } else { - error(400, "Invalid request body"); - } - }), - ); - bb.on( - "file", - handler(async (fieldname, file) => { - if (fieldname !== "content") error(400, "Invalid request body"); - if (!metadata || content) error(400, "Invalid request body"); - content = file; - - await uploadFileThumbnail( - userId, - id, - new Date(metadata.dekVersion), - metadata.contentIv, - content, - ); - resolve(text("Thumbnail uploaded", { headers: { "Content-Type": "text/plain" } })); - }), - ); - bb.on("error", (e) => { - content?.emit("error", e) ?? reject(e); - }); - - request.body!.pipeTo(Writable.toWeb(bb)).catch(() => {}); // busboy will handle the error - }); -}; diff --git a/src/routes/api/file/upload/[id]/chunks/[index]/+server.ts b/src/routes/api/upload/[id]/chunks/[index]/+server.ts similarity index 96% rename from src/routes/api/file/upload/[id]/chunks/[index]/+server.ts rename to src/routes/api/upload/[id]/chunks/[index]/+server.ts index c44e425..47d6397 100644 --- a/src/routes/api/file/upload/[id]/chunks/[index]/+server.ts +++ b/src/routes/api/upload/[id]/chunks/[index]/+server.ts @@ -2,7 +2,7 @@ import { error, text } from "@sveltejs/kit"; import { Readable } from "stream"; import { z } from "zod"; import { authorize } from "$lib/server/modules/auth"; -import { uploadChunk } from "$lib/server/services/file"; +import { uploadChunk } from "$lib/server/services/upload"; import type { RequestHandler } from "./$types"; export const POST: RequestHandler = async ({ locals, params, request }) => { diff --git a/src/trpc/router.server.ts b/src/trpc/router.server.ts index 64d25c7..d343fa6 100644 --- a/src/trpc/router.server.ts +++ b/src/trpc/router.server.ts @@ -9,6 +9,7 @@ import { fileRouter, hskRouter, mekRouter, + uploadRouter, userRouter, } from "./routers"; @@ -20,6 +21,7 @@ export const appRouter = router({ file: fileRouter, hsk: hskRouter, mek: mekRouter, + upload: uploadRouter, user: userRouter, }); diff --git a/src/trpc/routers/file.ts b/src/trpc/routers/file.ts index eaf42ca..294300c 100644 --- a/src/trpc/routers/file.ts +++ b/src/trpc/routers/file.ts @@ -1,20 +1,9 @@ import { TRPCError } from "@trpc/server"; -import { createHash } from "crypto"; -import { createReadStream, createWriteStream } from "fs"; -import { mkdir, rm } from "fs/promises"; -import mime from "mime"; -import { dirname } from "path"; -import { v4 as uuidv4 } from "uuid"; import { z } from "zod"; -import { FileRepo, MediaRepo, UploadRepo, IntegrityError } from "$lib/server/db"; -import db from "$lib/server/db/kysely"; -import env from "$lib/server/loadenv"; -import { getChunkDirectoryPath, safeUnlink } from "$lib/server/modules/filesystem"; -import { directoryIdSchema } from "$lib/server/schemas"; +import { FileRepo, MediaRepo, IntegrityError } from "$lib/server/db"; +import { safeUnlink } from "$lib/server/modules/filesystem"; import { router, roleProcedure } from "../init.server"; -const uploadLocks = new Set(); - const fileRouter = router({ get: roleProcedure["activeClient"] .input( @@ -171,137 +160,6 @@ const fileRouter = router({ return { updatedAt: thumbnail.updatedAt }; }), - - startUpload: roleProcedure["activeClient"] - .input( - z.object({ - chunks: z.int().positive(), - parent: directoryIdSchema, - mekVersion: z.int().positive(), - dek: z.base64().nonempty(), - dekVersion: z.date(), - hskVersion: z.int().positive().optional(), - contentType: z - .string() - .trim() - .nonempty() - .refine((value) => mime.getExtension(value) !== null), - name: z.base64().nonempty(), - nameIv: z.base64().nonempty(), - createdAt: z.base64().nonempty().optional(), - createdAtIv: z.base64().nonempty().optional(), - lastModifiedAt: z.base64().nonempty(), - lastModifiedAtIv: z.base64().nonempty(), - }), - ) - .mutation(async ({ ctx, input }) => { - const oneMinuteAgo = new Date(Date.now() - 60 * 1000); - const oneMinuteLater = new Date(Date.now() + 60 * 1000); - if (input.dekVersion <= oneMinuteAgo || input.dekVersion >= oneMinuteLater) { - throw new TRPCError({ code: "BAD_REQUEST", message: "Invalid DEK version" }); - } - - try { - const { id: sessionId } = await UploadRepo.createUploadSession({ - userId: ctx.session.userId, - totalChunks: input.chunks, - expiresAt: new Date(Date.now() + 24 * 60 * 60 * 1000), // 24 hours - parentId: input.parent, - mekVersion: input.mekVersion, - encDek: input.dek, - dekVersion: input.dekVersion, - hskVersion: input.hskVersion ?? null, - contentType: input.contentType, - encName: { ciphertext: input.name, iv: input.nameIv }, - encCreatedAt: - input.createdAt && input.createdAtIv - ? { ciphertext: input.createdAt, iv: input.createdAtIv } - : null, - encLastModifiedAt: { ciphertext: input.lastModifiedAt, iv: input.lastModifiedAtIv }, - }); - await mkdir(getChunkDirectoryPath(sessionId), { recursive: true }); - return { uploadId: sessionId }; - } catch (e) { - if (e instanceof IntegrityError) { - if (e.message === "Inactive MEK version") { - throw new TRPCError({ code: "BAD_REQUEST", message: "Invalid MEK version" }); - } else if (e.message === "Inactive HSK version") { - throw new TRPCError({ code: "BAD_REQUEST", message: "Invalid HSK version" }); - } - } - throw e; - } - }), - - completeUpload: roleProcedure["activeClient"] - .input( - z.object({ - uploadId: z.uuidv4(), - contentHmac: z.base64().nonempty().optional(), - }), - ) - .mutation(async ({ ctx, input }) => { - const { uploadId } = input; - if (uploadLocks.has(uploadId)) { - throw new TRPCError({ code: "CONFLICT", message: "Upload already in progress" }); // TODO: Message - } else { - uploadLocks.add(uploadId); - } - - const filePath = `${env.libraryPath}/${ctx.session.userId}/${uuidv4()}`; - await mkdir(dirname(filePath), { recursive: true }); - - try { - const session = await UploadRepo.getUploadSession(uploadId, ctx.session.userId); - if (!session) { - throw new TRPCError({ code: "NOT_FOUND", message: "Invalid upload id" }); - } else if ( - (session.hskVersion && !input.contentHmac) || - (!session.hskVersion && input.contentHmac) - ) { - throw new TRPCError({ code: "BAD_REQUEST", message: "Invalid content hmac" }); // TODO: message - } else if (session.uploadedChunks.length < session.totalChunks) { - throw new TRPCError({ code: "BAD_REQUEST", message: "Upload not complete" }); // TODO: Message - } - - const chunkDirectoryPath = getChunkDirectoryPath(uploadId); - const hashStream = createHash("sha256"); - const writeStream = createWriteStream(filePath, { flags: "wx", mode: 0o600 }); - - for (let i = 0; i < session.totalChunks; i++) { - for await (const chunk of createReadStream(`${chunkDirectoryPath}/${i}`)) { - hashStream.update(chunk); - writeStream.write(chunk); - } - } - - await new Promise((resolve, reject) => { - writeStream.end((e: any) => (e ? reject(e) : resolve())); - }); - - const hash = hashStream.digest("base64"); - const fileId = await db.transaction().execute(async (trx) => { - const { id: fileId } = await FileRepo.registerFile(trx, { - ...session, - userId: ctx.session.userId, - path: filePath, - contentHmac: input.contentHmac ?? null, - encContentHash: hash, - encContentIv: null, - }); - await UploadRepo.deleteUploadSession(trx, uploadId); - return fileId; - }); - - await rm(chunkDirectoryPath, { recursive: true }).catch((e) => console.error(e)); - return { file: fileId }; - } catch (e) { - await safeUnlink(filePath); - throw e; - } finally { - uploadLocks.delete(uploadId); - } - }), }); export default fileRouter; diff --git a/src/trpc/routers/index.ts b/src/trpc/routers/index.ts index ab5b6a0..5c8df24 100644 --- a/src/trpc/routers/index.ts +++ b/src/trpc/routers/index.ts @@ -5,4 +5,5 @@ export { default as directoryRouter } from "./directory"; export { default as fileRouter } from "./file"; export { default as hskRouter } from "./hsk"; export { default as mekRouter } from "./mek"; +export { default as uploadRouter } from "./upload"; export { default as userRouter } from "./user"; diff --git a/src/trpc/routers/upload.ts b/src/trpc/routers/upload.ts new file mode 100644 index 0000000..08d483f --- /dev/null +++ b/src/trpc/routers/upload.ts @@ -0,0 +1,241 @@ +import { TRPCError } from "@trpc/server"; +import { createHash } from "crypto"; +import { createReadStream, createWriteStream } from "fs"; +import { mkdir, rename, rm } from "fs/promises"; +import mime from "mime"; +import { dirname } from "path"; +import { v4 as uuidv4 } from "uuid"; +import { z } from "zod"; +import { FileRepo, MediaRepo, UploadRepo, IntegrityError } from "$lib/server/db"; +import db from "$lib/server/db/kysely"; +import env from "$lib/server/loadenv"; +import { getChunkDirectoryPath, safeUnlink } from "$lib/server/modules/filesystem"; +import { directoryIdSchema } from "$lib/server/schemas"; +import { router, roleProcedure } from "../init.server"; + +const uploadLocks = new Set(); + +const uploadRouter = router({ + startFileUpload: roleProcedure["activeClient"] + .input( + z.object({ + chunks: z.int().positive(), + parent: directoryIdSchema, + mekVersion: z.int().positive(), + dek: z.base64().nonempty(), + dekVersion: z.date(), + hskVersion: z.int().positive().optional(), + contentType: z + .string() + .trim() + .nonempty() + .refine((value) => mime.getExtension(value) !== null), + name: z.base64().nonempty(), + nameIv: z.base64().nonempty(), + createdAt: z.base64().nonempty().optional(), + createdAtIv: z.base64().nonempty().optional(), + lastModifiedAt: z.base64().nonempty(), + lastModifiedAtIv: z.base64().nonempty(), + }), + ) + .mutation(async ({ ctx, input }) => { + const oneMinuteAgo = new Date(Date.now() - 60 * 1000); + const oneMinuteLater = new Date(Date.now() + 60 * 1000); + if (input.dekVersion <= oneMinuteAgo || input.dekVersion >= oneMinuteLater) { + throw new TRPCError({ code: "BAD_REQUEST", message: "Invalid DEK version" }); + } + + try { + const { id: sessionId } = await UploadRepo.createFileUploadSession({ + userId: ctx.session.userId, + totalChunks: input.chunks, + expiresAt: new Date(Date.now() + 24 * 60 * 60 * 1000), // 24 hours + parentId: input.parent, + mekVersion: input.mekVersion, + encDek: input.dek, + dekVersion: input.dekVersion, + hskVersion: input.hskVersion ?? null, + contentType: input.contentType, + encName: { ciphertext: input.name, iv: input.nameIv }, + encCreatedAt: + input.createdAt && input.createdAtIv + ? { ciphertext: input.createdAt, iv: input.createdAtIv } + : null, + encLastModifiedAt: { ciphertext: input.lastModifiedAt, iv: input.lastModifiedAtIv }, + }); + await mkdir(getChunkDirectoryPath(sessionId), { recursive: true }); + return { uploadId: sessionId }; + } catch (e) { + if (e instanceof IntegrityError) { + if (e.message === "Inactive MEK version") { + throw new TRPCError({ code: "BAD_REQUEST", message: "Invalid MEK version" }); + } else if (e.message === "Inactive HSK version") { + throw new TRPCError({ code: "BAD_REQUEST", message: "Invalid HSK version" }); + } + } + throw e; + } + }), + + startFileThumbnailUpload: roleProcedure["activeClient"] + .input( + z.object({ + file: z.int().positive(), + dekVersion: z.date(), + }), + ) + .mutation(async ({ ctx, input }) => { + try { + const { id: sessionId } = await UploadRepo.createThumbnailUploadSession({ + userId: ctx.session.userId, + expiresAt: new Date(Date.now() + 24 * 60 * 60 * 1000), // 24 hours + fileId: input.file, + dekVersion: input.dekVersion, + }); + await mkdir(getChunkDirectoryPath(sessionId), { recursive: true }); + return { uploadId: sessionId }; + } catch (e) { + if (e instanceof IntegrityError) { + if (e.message === "File not found") { + throw new TRPCError({ code: "NOT_FOUND", message: "File not found" }); + } else if (e.message === "Invalid DEK version") { + throw new TRPCError({ code: "BAD_REQUEST", message: "Mismatched DEK version" }); + } + } + throw e; + } + }), + + completeFileUpload: roleProcedure["activeClient"] + .input( + z.object({ + uploadId: z.uuidv4(), + contentHmac: z.base64().nonempty().optional(), + }), + ) + .mutation(async ({ ctx, input }) => { + const { uploadId } = input; + if (uploadLocks.has(uploadId)) { + throw new TRPCError({ code: "CONFLICT", message: "Upload already in progress" }); + } else { + uploadLocks.add(uploadId); + } + + const filePath = `${env.libraryPath}/${ctx.session.userId}/${uuidv4()}`; + await mkdir(dirname(filePath), { recursive: true }); + + try { + const session = await UploadRepo.getUploadSession(uploadId, ctx.session.userId); + if (!session || session.type !== "file") { + throw new TRPCError({ code: "NOT_FOUND", message: "Invalid upload id" }); + } else if ( + (session.hskVersion && !input.contentHmac) || + (!session.hskVersion && input.contentHmac) + ) { + throw new TRPCError({ code: "BAD_REQUEST", message: "Invalid content hmac" }); + } else if (session.uploadedChunks.length < session.totalChunks) { + throw new TRPCError({ code: "BAD_REQUEST", message: "Upload not complete" }); + } + + const chunkDirectoryPath = getChunkDirectoryPath(uploadId); + const hashStream = createHash("sha256"); + const writeStream = createWriteStream(filePath, { flags: "wx", mode: 0o600 }); + + for (let i = 0; i < session.totalChunks; i++) { + for await (const chunk of createReadStream(`${chunkDirectoryPath}/${i}`)) { + hashStream.update(chunk); + writeStream.write(chunk); + } + } + + await new Promise((resolve, reject) => { + writeStream.end((e: any) => (e ? reject(e) : resolve())); + }); + + const hash = hashStream.digest("base64"); + const fileId = await db.transaction().execute(async (trx) => { + const { id: fileId } = await FileRepo.registerFile(trx, { + ...session, + userId: ctx.session.userId, + path: filePath, + contentHmac: input.contentHmac ?? null, + encContentHash: hash, + encContentIv: null, + }); + await UploadRepo.deleteUploadSession(trx, uploadId); + return fileId; + }); + + await rm(chunkDirectoryPath, { recursive: true }).catch((e) => console.error(e)); + return { file: fileId }; + } catch (e) { + await safeUnlink(filePath); + throw e; + } finally { + uploadLocks.delete(uploadId); + } + }), + + completeFileThumbnailUpload: roleProcedure["activeClient"] + .input( + z.object({ + uploadId: z.uuidv4(), + }), + ) + .mutation(async ({ ctx, input }) => { + const { uploadId } = input; + if (uploadLocks.has(uploadId)) { + throw new TRPCError({ code: "CONFLICT", message: "Upload already in progress" }); + } else { + uploadLocks.add(uploadId); + } + + const thumbnailPath = `${env.thumbnailsPath}/${ctx.session.userId}/${uuidv4()}`; + await mkdir(dirname(thumbnailPath), { recursive: true }); + + try { + const session = await UploadRepo.getUploadSession(uploadId, ctx.session.userId); + if (!session || session.type !== "thumbnail") { + throw new TRPCError({ code: "NOT_FOUND", message: "Invalid upload id" }); + } else if (session.uploadedChunks.length < session.totalChunks) { + throw new TRPCError({ code: "BAD_REQUEST", message: "Upload not complete" }); + } + + const chunkDirectoryPath = getChunkDirectoryPath(uploadId); + const chunkPath = `${chunkDirectoryPath}/0`; + + // Move chunk file to thumbnail path (IV is prepended to the content) + await rename(chunkPath, thumbnailPath); + + // Update thumbnail in database (null IV since it's prepended to the file) + const oldPath = await MediaRepo.updateFileThumbnail( + ctx.session.userId, + session.fileId, + session.dekVersion, + thumbnailPath, + null, + ); + safeUnlink(oldPath); // Intended + + await db.transaction().execute(async (trx) => { + await UploadRepo.deleteUploadSession(trx, uploadId); + }); + + await rm(chunkDirectoryPath, { recursive: true }).catch((e) => console.error(e)); + } catch (e) { + await safeUnlink(thumbnailPath); + if (e instanceof IntegrityError) { + if (e.message === "File not found") { + throw new TRPCError({ code: "NOT_FOUND", message: "File not found" }); + } else if (e.message === "Invalid DEK version") { + throw new TRPCError({ code: "BAD_REQUEST", message: "Mismatched DEK version" }); + } + } + throw e; + } finally { + uploadLocks.delete(uploadId); + } + }), +}); + +export default uploadRouter;