diff --git a/src/hooks.server.ts b/src/hooks.server.ts index 1795ce3..b816f7f 100644 --- a/src/hooks.server.ts +++ b/src/hooks.server.ts @@ -7,7 +7,7 @@ import { cleanupExpiredSessions, cleanupExpiredSessionUpgradeChallenges, } from "$lib/server/db/session"; -import { cleanupExpiredUploadSessions } from "$lib/server/db/upload"; +import { cleanupExpiredUploadSessions } from "$lib/server/services/upload"; import { authenticate, setAgentInfo } from "$lib/server/middlewares"; export const init: ServerInit = async () => { diff --git a/src/lib/constants/upload.ts b/src/lib/constants/upload.ts index 99d94bb..57934d6 100644 --- a/src/lib/constants/upload.ts +++ b/src/lib/constants/upload.ts @@ -2,5 +2,5 @@ export const AES_GCM_IV_SIZE = 12; export const AES_GCM_TAG_SIZE = 16; export const ENCRYPTION_OVERHEAD = AES_GCM_IV_SIZE + AES_GCM_TAG_SIZE; -export const CHUNK_SIZE = 4 * 1024 * 1024; +export const CHUNK_SIZE = 4 * 1024 * 1024; // 4 MiB export const ENCRYPTED_CHUNK_SIZE = CHUNK_SIZE + ENCRYPTION_OVERHEAD; diff --git a/src/lib/server/db/media.ts b/src/lib/server/db/media.ts index c4d2a34..3e165c0 100644 --- a/src/lib/server/db/media.ts +++ b/src/lib/server/db/media.ts @@ -14,54 +14,53 @@ interface FileThumbnail extends Thumbnail { } export const updateFileThumbnail = async ( + trx: typeof db, userId: number, fileId: number, dekVersion: Date, path: string, encContentIv: string | null, ) => { - return await db.transaction().execute(async (trx) => { - const file = await trx - .selectFrom("file") - .select("data_encryption_key_version") - .where("id", "=", fileId) - .where("user_id", "=", userId) - .limit(1) - .forUpdate() - .executeTakeFirst(); - if (!file) { - throw new IntegrityError("File not found"); - } else if (file.data_encryption_key_version.getTime() !== dekVersion.getTime()) { - throw new IntegrityError("Invalid DEK version"); - } + const file = await trx + .selectFrom("file") + .select("data_encryption_key_version") + .where("id", "=", fileId) + .where("user_id", "=", userId) + .limit(1) + .forUpdate() + .executeTakeFirst(); + if (!file) { + throw new IntegrityError("File not found"); + } else if (file.data_encryption_key_version.getTime() !== dekVersion.getTime()) { + throw new IntegrityError("Invalid DEK version"); + } - const thumbnail = await trx - .selectFrom("thumbnail") - .select("path as oldPath") - .where("file_id", "=", fileId) - .limit(1) - .forUpdate() - .executeTakeFirst(); - const now = new Date(); + const thumbnail = await trx + .selectFrom("thumbnail") + .select("path as oldPath") + .where("file_id", "=", fileId) + .limit(1) + .forUpdate() + .executeTakeFirst(); + const now = new Date(); - await trx - .insertInto("thumbnail") - .values({ - file_id: fileId, + await trx + .insertInto("thumbnail") + .values({ + file_id: fileId, + path, + updated_at: now, + encrypted_content_iv: encContentIv, + }) + .onConflict((oc) => + oc.column("file_id").doUpdateSet({ path, updated_at: now, encrypted_content_iv: encContentIv, - }) - .onConflict((oc) => - oc.column("file_id").doUpdateSet({ - path, - updated_at: now, - encrypted_content_iv: encContentIv, - }), - ) - .execute(); - return thumbnail?.oldPath ?? null; - }); + }), + ) + .execute(); + return thumbnail?.oldPath ?? null; }; export const getFileThumbnail = async (userId: number, fileId: number) => { diff --git a/src/lib/server/db/migrations/1768062380-AddChunkedUpload.ts b/src/lib/server/db/migrations/1768062380-AddChunkedUpload.ts index cf18c05..be6a900 100644 --- a/src/lib/server/db/migrations/1768062380-AddChunkedUpload.ts +++ b/src/lib/server/db/migrations/1768062380-AddChunkedUpload.ts @@ -17,9 +17,10 @@ export const up = async (db: Kysely) => { // upload.ts await db.schema .createTable("upload_session") - .addColumn("id", "uuid", (col) => col.primaryKey().defaultTo(sql`gen_random_uuid()`)) + .addColumn("id", "uuid", (col) => col.primaryKey()) .addColumn("type", "text", (col) => col.notNull()) .addColumn("user_id", "integer", (col) => col.references("user.id").notNull()) + .addColumn("path", "text", (col) => col.notNull()) .addColumn("total_chunks", "integer", (col) => col.notNull()) .addColumn("uploaded_chunks", sql`integer[]`, (col) => col.notNull().defaultTo(sql`'{}'`)) .addColumn("expires_at", "timestamp(3)", (col) => col.notNull()) diff --git a/src/lib/server/db/schema/upload.ts b/src/lib/server/db/schema/upload.ts index 369c385..fccde36 100644 --- a/src/lib/server/db/schema/upload.ts +++ b/src/lib/server/db/schema/upload.ts @@ -2,9 +2,10 @@ import type { Generated } from "kysely"; import type { Ciphertext } from "./utils"; interface UploadSessionTable { - id: Generated; + id: string; type: "file" | "thumbnail"; user_id: number; + path: string; total_chunks: number; uploaded_chunks: Generated; expires_at: Date; diff --git a/src/lib/server/db/upload.ts b/src/lib/server/db/upload.ts index 4c8da24..d506191 100644 --- a/src/lib/server/db/upload.ts +++ b/src/lib/server/db/upload.ts @@ -6,6 +6,7 @@ import type { Ciphertext } from "./schema"; interface BaseUploadSession { id: string; userId: number; + path: string; totalChunks: number; uploadedChunks: number[]; expiresAt: Date; @@ -31,9 +32,9 @@ interface ThumbnailUploadSession extends BaseUploadSession { } export const createFileUploadSession = async ( - params: Omit, + params: Omit, ) => { - return await db.transaction().execute(async (trx) => { + await db.transaction().execute(async (trx) => { const mek = await trx .selectFrom("master_encryption_key") .select("version") @@ -60,11 +61,13 @@ export const createFileUploadSession = async ( } } - const { sessionId } = await trx + await trx .insertInto("upload_session") .values({ + id: params.id, type: "file", user_id: params.userId, + path: params.path, total_chunks: params.totalChunks, expires_at: params.expiresAt, parent_id: params.parentId !== "root" ? params.parentId : null, @@ -77,16 +80,14 @@ export const createFileUploadSession = async ( encrypted_created_at: params.encCreatedAt, encrypted_last_modified_at: params.encLastModifiedAt, }) - .returning("id as sessionId") - .executeTakeFirstOrThrow(); - return { id: sessionId }; + .execute(); }); }; export const createThumbnailUploadSession = async ( - params: Omit, + params: Omit, ) => { - return await db.transaction().execute(async (trx) => { + await db.transaction().execute(async (trx) => { const file = await trx .selectFrom("file") .select("data_encryption_key_version") @@ -101,19 +102,19 @@ export const createThumbnailUploadSession = async ( throw new IntegrityError("Invalid DEK version"); } - const { sessionId } = await trx + await trx .insertInto("upload_session") .values({ + id: params.id, type: "thumbnail", user_id: params.userId, - total_chunks: 1, + path: params.path, + total_chunks: params.totalChunks, expires_at: params.expiresAt, file_id: params.fileId, data_encryption_key_version: params.dekVersion, }) - .returning("id as sessionId") - .executeTakeFirstOrThrow(); - return { id: sessionId }; + .execute(); }); }; @@ -126,14 +127,14 @@ export const getUploadSession = async (sessionId: string, userId: number) => { .where("expires_at", ">", new Date()) .limit(1) .executeTakeFirst(); - - if (!session) return null; - - if (session.type === "file") { + if (!session) { + return null; + } else if (session.type === "file") { return { type: "file", id: session.id, userId: session.user_id, + path: session.path, totalChunks: session.total_chunks, uploadedChunks: session.uploaded_chunks, expiresAt: session.expires_at, @@ -152,6 +153,7 @@ export const getUploadSession = async (sessionId: string, userId: number) => { type: "thumbnail", id: session.id, userId: session.user_id, + path: session.path, totalChunks: session.total_chunks, uploadedChunks: session.uploaded_chunks, expiresAt: session.expires_at, @@ -176,8 +178,8 @@ export const deleteUploadSession = async (trx: typeof db, sessionId: string) => export const cleanupExpiredUploadSessions = async () => { const sessions = await db .deleteFrom("upload_session") - .where("expires_at", "<", new Date()) - .returning("id") + .where("expires_at", "<=", new Date()) + .returning("path") .execute(); - return sessions.map(({ id }) => id); + return sessions.map(({ path }) => path); }; diff --git a/src/lib/server/modules/filesystem.ts b/src/lib/server/modules/filesystem.ts index b87fd65..ade7d73 100644 --- a/src/lib/server/modules/filesystem.ts +++ b/src/lib/server/modules/filesystem.ts @@ -1,7 +1,10 @@ -import { unlink } from "fs/promises"; -import env from "$lib/server/loadenv"; +import { rm, unlink } from "fs/promises"; -export const getChunkDirectoryPath = (sessionId: string) => `${env.uploadsPath}/${sessionId}`; +export const safeRecursiveRm = async (path: string | null | undefined) => { + if (path) { + await rm(path, { recursive: true }).catch(console.error); + } +}; export const safeUnlink = async (path: string | null | undefined) => { if (path) { diff --git a/src/lib/server/services/upload.ts b/src/lib/server/services/upload.ts index 1be250d..1f7043b 100644 --- a/src/lib/server/services/upload.ts +++ b/src/lib/server/services/upload.ts @@ -2,9 +2,9 @@ import { error } from "@sveltejs/kit"; import { createHash } from "crypto"; import { createWriteStream } from "fs"; import { Readable } from "stream"; -import { CHUNK_SIZE, ENCRYPTION_OVERHEAD } from "$lib/constants"; +import { ENCRYPTION_OVERHEAD, ENCRYPTED_CHUNK_SIZE } from "$lib/constants"; import { UploadRepo } from "$lib/server/db"; -import { getChunkDirectoryPath, safeUnlink } from "$lib/server/modules/filesystem"; +import { safeRecursiveRm, safeUnlink } from "$lib/server/modules/filesystem"; const chunkLocks = new Set(); @@ -17,12 +17,12 @@ export const uploadChunk = async ( ) => { const lockKey = `${sessionId}/${chunkIndex}`; if (chunkLocks.has(lockKey)) { - error(409, "Chunk already uploaded"); // TODO: Message + error(409, "Chunk upload already in progress"); } else { chunkLocks.add(lockKey); } - const filePath = `${getChunkDirectoryPath(sessionId)}/${chunkIndex}`; + let filePath; try { const session = await UploadRepo.getUploadSession(sessionId, userId); @@ -35,15 +35,16 @@ export const uploadChunk = async ( } const isLastChunk = chunkIndex === session.totalChunks - 1; + filePath = `${session.path}/${chunkIndex}`; - let writtenBytes = 0; const hashStream = createHash("sha256"); const writeStream = createWriteStream(filePath, { flags: "wx", mode: 0o600 }); + let writtenBytes = 0; for await (const chunk of encChunkStream) { - writtenBytes += chunk.length; hashStream.update(chunk); writeStream.write(chunk); + writtenBytes += chunk.length; } await new Promise((resolve, reject) => { @@ -53,9 +54,8 @@ export const uploadChunk = async ( if (hashStream.digest("base64") !== encChunkHash) { throw new Error("Invalid checksum"); } else if ( - (!isLastChunk && writtenBytes !== CHUNK_SIZE + ENCRYPTION_OVERHEAD) || - (isLastChunk && - (writtenBytes <= ENCRYPTION_OVERHEAD || writtenBytes > CHUNK_SIZE + ENCRYPTION_OVERHEAD)) + (!isLastChunk && writtenBytes !== ENCRYPTED_CHUNK_SIZE) || + (isLastChunk && (writtenBytes <= ENCRYPTION_OVERHEAD || writtenBytes > ENCRYPTED_CHUNK_SIZE)) ) { throw new Error("Invalid chunk size"); } @@ -75,3 +75,8 @@ export const uploadChunk = async ( chunkLocks.delete(lockKey); } }; + +export const cleanupExpiredUploadSessions = async () => { + const paths = await UploadRepo.cleanupExpiredUploadSessions(); + await Promise.all(paths.map(safeRecursiveRm)); +}; diff --git a/src/trpc/routers/upload.ts b/src/trpc/routers/upload.ts index 7a1680b..168e957 100644 --- a/src/trpc/routers/upload.ts +++ b/src/trpc/routers/upload.ts @@ -1,7 +1,7 @@ import { TRPCError } from "@trpc/server"; import { createHash } from "crypto"; import { createReadStream, createWriteStream } from "fs"; -import { mkdir, rename, rm } from "fs/promises"; +import { mkdir, rename } from "fs/promises"; import mime from "mime"; import { dirname } from "path"; import { v4 as uuidv4 } from "uuid"; @@ -10,10 +10,17 @@ import { DirectoryIdSchema } from "$lib/schemas"; import { FileRepo, MediaRepo, UploadRepo, IntegrityError } from "$lib/server/db"; import db from "$lib/server/db/kysely"; import env from "$lib/server/loadenv"; -import { getChunkDirectoryPath, safeUnlink } from "$lib/server/modules/filesystem"; +import { safeRecursiveRm, safeUnlink } from "$lib/server/modules/filesystem"; import { router, roleProcedure } from "../init.server"; -const uploadLocks = new Set(); +const sessionLocks = new Set(); + +const generateSessionId = async () => { + const id = uuidv4(); + const path = `${env.uploadsPath}/${id}`; + await mkdir(path, { recursive: true }); + return { id, path }; +}; const uploadRouter = router({ startFileUpload: roleProcedure["activeClient"] @@ -45,9 +52,13 @@ const uploadRouter = router({ throw new TRPCError({ code: "BAD_REQUEST", message: "Invalid DEK version" }); } + const { id, path } = await generateSessionId(); + try { - const { id: sessionId } = await UploadRepo.createFileUploadSession({ + await UploadRepo.createFileUploadSession({ + id, userId: ctx.session.userId, + path, totalChunks: input.chunks, expiresAt: new Date(Date.now() + 24 * 60 * 60 * 1000), // 24 hours parentId: input.parent, @@ -63,9 +74,10 @@ const uploadRouter = router({ : null, encLastModifiedAt: { ciphertext: input.lastModifiedAt, iv: input.lastModifiedAtIv }, }); - await mkdir(getChunkDirectoryPath(sessionId), { recursive: true }); - return { uploadId: sessionId }; + return { uploadId: id }; } catch (e) { + await safeRecursiveRm(path); + if (e instanceof IntegrityError) { if (e.message === "Inactive MEK version") { throw new TRPCError({ code: "BAD_REQUEST", message: "Invalid MEK version" }); @@ -85,16 +97,22 @@ const uploadRouter = router({ }), ) .mutation(async ({ ctx, input }) => { + const { id, path } = await generateSessionId(); + try { - const { id: sessionId } = await UploadRepo.createThumbnailUploadSession({ + await UploadRepo.createThumbnailUploadSession({ + id, userId: ctx.session.userId, + path, + totalChunks: 1, // Up to 4 MiB expiresAt: new Date(Date.now() + 24 * 60 * 60 * 1000), // 24 hours fileId: input.file, dekVersion: input.dekVersion, }); - await mkdir(getChunkDirectoryPath(sessionId), { recursive: true }); - return { uploadId: sessionId }; + return { uploadId: id }; } catch (e) { + await safeRecursiveRm(path); + if (e instanceof IntegrityError) { if (e.message === "File not found") { throw new TRPCError({ code: "NOT_FOUND", message: "File not found" }); @@ -115,14 +133,13 @@ const uploadRouter = router({ ) .mutation(async ({ ctx, input }) => { const { uploadId } = input; - if (uploadLocks.has(uploadId)) { - throw new TRPCError({ code: "CONFLICT", message: "Upload already in progress" }); + if (sessionLocks.has(uploadId)) { + throw new TRPCError({ code: "CONFLICT", message: "Completion already in progress" }); } else { - uploadLocks.add(uploadId); + sessionLocks.add(uploadId); } - const filePath = `${env.libraryPath}/${ctx.session.userId}/${uuidv4()}`; - await mkdir(dirname(filePath), { recursive: true }); + let filePath = ""; try { const session = await UploadRepo.getUploadSession(uploadId, ctx.session.userId); @@ -132,17 +149,19 @@ const uploadRouter = router({ (session.hskVersion && !input.contentHmac) || (!session.hskVersion && input.contentHmac) ) { - throw new TRPCError({ code: "BAD_REQUEST", message: "Invalid content hmac" }); + throw new TRPCError({ code: "BAD_REQUEST", message: "Invalid content HMAC" }); } else if (session.uploadedChunks.length < session.totalChunks) { - throw new TRPCError({ code: "BAD_REQUEST", message: "Upload not complete" }); + throw new TRPCError({ code: "BAD_REQUEST", message: "Upload not completed" }); } - const chunkDirectoryPath = getChunkDirectoryPath(uploadId); + filePath = `${env.libraryPath}/${ctx.session.userId}/${uuidv4()}`; + await mkdir(dirname(filePath), { recursive: true }); + const hashStream = createHash("sha256"); const writeStream = createWriteStream(filePath, { flags: "wx", mode: 0o600 }); for (let i = 0; i < session.totalChunks; i++) { - for await (const chunk of createReadStream(`${chunkDirectoryPath}/${i}`)) { + for await (const chunk of createReadStream(`${session.path}/${i}`)) { hashStream.update(chunk); writeStream.write(chunk); } @@ -166,13 +185,13 @@ const uploadRouter = router({ return fileId; }); - await rm(chunkDirectoryPath, { recursive: true }).catch((e) => console.error(e)); + await safeRecursiveRm(session.path); return { file: fileId }; } catch (e) { await safeUnlink(filePath); throw e; } finally { - uploadLocks.delete(uploadId); + sessionLocks.delete(uploadId); } }), @@ -184,44 +203,39 @@ const uploadRouter = router({ ) .mutation(async ({ ctx, input }) => { const { uploadId } = input; - if (uploadLocks.has(uploadId)) { - throw new TRPCError({ code: "CONFLICT", message: "Upload already in progress" }); + if (sessionLocks.has(uploadId)) { + throw new TRPCError({ code: "CONFLICT", message: "Completion already in progress" }); } else { - uploadLocks.add(uploadId); + sessionLocks.add(uploadId); } - const thumbnailPath = `${env.thumbnailsPath}/${ctx.session.userId}/${uuidv4()}`; - await mkdir(dirname(thumbnailPath), { recursive: true }); + let thumbnailPath = ""; try { const session = await UploadRepo.getUploadSession(uploadId, ctx.session.userId); if (!session || session.type !== "thumbnail") { throw new TRPCError({ code: "NOT_FOUND", message: "Invalid upload id" }); } else if (session.uploadedChunks.length < session.totalChunks) { - throw new TRPCError({ code: "BAD_REQUEST", message: "Upload not complete" }); + throw new TRPCError({ code: "BAD_REQUEST", message: "Upload not completed" }); } - const chunkDirectoryPath = getChunkDirectoryPath(uploadId); - const chunkPath = `${chunkDirectoryPath}/0`; + thumbnailPath = `${env.thumbnailsPath}/${ctx.session.userId}/${uploadId}`; + await mkdir(dirname(thumbnailPath), { recursive: true }); + await rename(`${session.path}/0`, thumbnailPath); - // Move chunk file to thumbnail path (IV is prepended to the content) - await rename(chunkPath, thumbnailPath); - - // Update thumbnail in database (null IV since it's prepended to the file) - const oldPath = await MediaRepo.updateFileThumbnail( - ctx.session.userId, - session.fileId, - session.dekVersion, - thumbnailPath, - null, - ); - safeUnlink(oldPath); // Intended - - await db.transaction().execute(async (trx) => { + const oldThumbnailPath = await db.transaction().execute(async (trx) => { + const oldPath = await MediaRepo.updateFileThumbnail( + trx, + ctx.session.userId, + session.fileId, + session.dekVersion, + thumbnailPath, + null, + ); await UploadRepo.deleteUploadSession(trx, uploadId); + return oldPath; }); - - await rm(chunkDirectoryPath, { recursive: true }).catch((e) => console.error(e)); + await Promise.all([safeUnlink(oldThumbnailPath), safeRecursiveRm(session.path)]); } catch (e) { await safeUnlink(thumbnailPath); if (e instanceof IntegrityError) { @@ -233,7 +247,7 @@ const uploadRouter = router({ } throw e; } finally { - uploadLocks.delete(uploadId); + sessionLocks.delete(uploadId); } }), });