diff --git a/.env.example b/.env.example index 4e8b20b..e3b6365 100644 --- a/.env.example +++ b/.env.example @@ -12,4 +12,3 @@ USER_CLIENT_CHALLENGE_EXPIRES= SESSION_UPGRADE_CHALLENGE_EXPIRES= LIBRARY_PATH= THUMBNAILS_PATH= -UPLOADS_PATH= diff --git a/docker-compose.yaml b/docker-compose.yaml index 3544f14..2015066 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -9,7 +9,6 @@ services: volumes: - ./data/library:/app/data/library - ./data/thumbnails:/app/data/thumbnails - - ./data/uploads:/app/data/uploads environment: # ArkVault - DATABASE_HOST=database @@ -21,7 +20,6 @@ services: - SESSION_UPGRADE_CHALLENGE_EXPIRES - LIBRARY_PATH=/app/data/library - THUMBNAILS_PATH=/app/data/thumbnails - - UPLOADS_PATH=/app/data/uploads # SvelteKit - ADDRESS_HEADER=${TRUST_PROXY:+X-Forwarded-For} - XFF_DEPTH=${TRUST_PROXY:-} diff --git a/src/lib/modules/file/upload.svelte.ts b/src/lib/modules/file/upload.svelte.ts index 18215a6..691f239 100644 --- a/src/lib/modules/file/upload.svelte.ts +++ b/src/lib/modules/file/upload.svelte.ts @@ -168,7 +168,7 @@ const requestFileUpload = limitFunction( ) => { state.status = "uploading"; - await uploadBlob(uploadId, file, dataKey, { + const { encContentHash } = await uploadBlob(uploadId, file, dataKey, { onProgress(s) { state.progress = s.progress; state.rate = s.rate; @@ -178,6 +178,7 @@ const requestFileUpload = limitFunction( const { file: fileId } = await trpc().upload.completeFileUpload.mutate({ uploadId, contentHmac: fileSigned, + encContentHash, }); if (thumbnailBuffer) { diff --git a/src/lib/modules/http.ts b/src/lib/modules/http.ts index 4116c18..92e50d0 100644 --- a/src/lib/modules/http.ts +++ b/src/lib/modules/http.ts @@ -12,11 +12,3 @@ export const parseRangeHeader = (value: string | null) => { export const getContentRangeHeader = (range?: { start: number; end: number; total: number }) => { return range && { "Content-Range": `bytes ${range.start}-${range.end}/${range.total}` }; }; - -export const parseContentDigestHeader = (value: string | null) => { - if (!value) return undefined; - - const firstDigest = value.split(",")[0]!.trim(); - const match = firstDigest.match(/^sha-256=:([A-Za-z0-9+/=]+):$/); - return match?.[1]; -}; diff --git a/src/lib/modules/upload.ts b/src/lib/modules/upload.ts index cab51b7..5b98b96 100644 --- a/src/lib/modules/upload.ts +++ b/src/lib/modules/upload.ts @@ -1,7 +1,8 @@ +import { sha256 } from "@noble/hashes/sha2.js"; import axios from "axios"; import pLimit from "p-limit"; import { ENCRYPTION_OVERHEAD, CHUNK_SIZE } from "$lib/constants"; -import { encryptChunk, digestMessage, encodeToBase64 } from "$lib/modules/crypto"; +import { encodeToBase64, encryptChunk } from "$lib/modules/crypto"; import { BoundedQueue } from "$lib/utils"; interface UploadStats { @@ -12,7 +13,6 @@ interface UploadStats { interface EncryptedChunk { index: number; data: ArrayBuffer; - hash: string; } const createSpeedMeter = (timeWindow = 3000, minInterval = 200, warmupPeriod = 500) => { @@ -68,27 +68,18 @@ const createSpeedMeter = (timeWindow = 3000, minInterval = 200, warmupPeriod = 5 }; }; -const encryptChunkData = async ( - chunk: Blob, - dataKey: CryptoKey, -): Promise<{ data: ArrayBuffer; hash: string }> => { - const encrypted = await encryptChunk(await chunk.arrayBuffer(), dataKey); - const hash = encodeToBase64(await digestMessage(encrypted)); - return { data: encrypted, hash }; +const encryptChunkData = async (chunk: Blob, dataKey: CryptoKey): Promise => { + return await encryptChunk(await chunk.arrayBuffer(), dataKey); }; const uploadEncryptedChunk = async ( uploadId: string, chunkIndex: number, encrypted: ArrayBuffer, - hash: string, onChunkProgress: (chunkIndex: number, loaded: number) => void, ) => { await axios.post(`/api/upload/${uploadId}/chunks/${chunkIndex + 1}`, encrypted, { - headers: { - "Content-Type": "application/octet-stream", - "Content-Digest": `sha-256=:${hash}:`, - }, + headers: { "Content-Type": "application/octet-stream" }, onUploadProgress(e) { onChunkProgress(chunkIndex, e.loaded ?? 0); }, @@ -112,6 +103,7 @@ export const uploadBlob = async ( const uploadedByChunk = new Array(totalChunks).fill(0); const speedMeter = createSpeedMeter(3000, 200); + const hash = sha256.create(); const emit = () => { if (!onProgress) return; @@ -136,8 +128,9 @@ export const uploadBlob = async ( try { for (let i = 0; i < totalChunks; i++) { const chunk = blob.slice(i * CHUNK_SIZE, (i + 1) * CHUNK_SIZE); - const { data, hash } = await encryptChunkData(chunk, dataKey); - await queue.push({ index: i, data, hash }); + const data = await encryptChunkData(chunk, dataKey); + hash.update(new Uint8Array(data)); + await queue.push({ index: i, data }); } } catch (e) { encryptionError = e instanceof Error ? e : new Error(String(e)); @@ -158,7 +151,7 @@ export const uploadBlob = async ( const task = limit(async () => { try { - await uploadEncryptedChunk(uploadId, item.index, item.data, item.hash, onChunkProgress); + await uploadEncryptedChunk(uploadId, item.index, item.data, onChunkProgress); } finally { // @ts-ignore item.data = null; @@ -180,4 +173,5 @@ export const uploadBlob = async ( await Promise.all([encryptionProducer(), uploadConsumer()]); onProgress?.({ progress: 1, rate: speedMeter() }); + return { encContentHash: encodeToBase64(hash.digest()) }; }; diff --git a/src/lib/server/loadenv.ts b/src/lib/server/loadenv.ts index f8fd68f..3a805d8 100644 --- a/src/lib/server/loadenv.ts +++ b/src/lib/server/loadenv.ts @@ -26,5 +26,4 @@ export default { }, libraryPath: env.LIBRARY_PATH || "library", thumbnailsPath: env.THUMBNAILS_PATH || "thumbnails", - uploadsPath: env.UPLOADS_PATH || "uploads", }; diff --git a/src/lib/server/modules/filesystem.ts b/src/lib/server/modules/filesystem.ts index ade7d73..65cb9ec 100644 --- a/src/lib/server/modules/filesystem.ts +++ b/src/lib/server/modules/filesystem.ts @@ -1,10 +1,4 @@ -import { rm, unlink } from "fs/promises"; - -export const safeRecursiveRm = async (path: string | null | undefined) => { - if (path) { - await rm(path, { recursive: true }).catch(console.error); - } -}; +import { unlink } from "fs/promises"; export const safeUnlink = async (path: string | null | undefined) => { if (path) { diff --git a/src/lib/server/services/upload.ts b/src/lib/server/services/upload.ts index d654f42..6d1d61c 100644 --- a/src/lib/server/services/upload.ts +++ b/src/lib/server/services/upload.ts @@ -1,10 +1,9 @@ import { error } from "@sveltejs/kit"; -import { createHash } from "crypto"; -import { createWriteStream } from "fs"; +import { open } from "fs/promises"; import { Readable } from "stream"; import { ENCRYPTION_OVERHEAD, ENCRYPTED_CHUNK_SIZE } from "$lib/constants"; import { UploadRepo } from "$lib/server/db"; -import { safeRecursiveRm, safeUnlink } from "$lib/server/modules/filesystem"; +import { safeUnlink } from "$lib/server/modules/filesystem"; const chunkLocks = new Set(); @@ -14,12 +13,61 @@ const isChunkUploaded = (bitmap: Buffer, chunkIndex: number) => { return !!byte && (byte & (1 << (chunkIndex % 8))) !== 0; // Postgres sucks }; +const writeChunkAtOffset = async ( + path: string, + encChunkStream: Readable, + chunkIndex: number, + isLastChunk: boolean, +) => { + const offset = (chunkIndex - 1) * ENCRYPTED_CHUNK_SIZE; + const file = await open(path, "r+"); + let written = 0; + + try { + for await (const chunk of encChunkStream) { + const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); + written += buffer.length; + if (written > ENCRYPTED_CHUNK_SIZE) { + throw new Error("Invalid chunk size"); + } + + let chunkOffset = 0; + while (chunkOffset < buffer.length) { + const { bytesWritten } = await file.write( + buffer, + chunkOffset, + buffer.length - chunkOffset, + offset + written - buffer.length + chunkOffset, + ); + if (bytesWritten <= 0) { + throw new Error("Failed to write chunk"); + } + chunkOffset += bytesWritten; + } + } + + if ( + (!isLastChunk && written !== ENCRYPTED_CHUNK_SIZE) || + (isLastChunk && (written <= ENCRYPTION_OVERHEAD || written > ENCRYPTED_CHUNK_SIZE)) + ) { + throw new Error("Invalid chunk size"); + } + + if (isLastChunk) { + await file.truncate(offset + written); + } + + return written; + } finally { + await file.close(); + } +}; + export const uploadChunk = async ( userId: number, sessionId: string, chunkIndex: number, encChunkStream: Readable, - encChunkHash: string, ) => { const lockKey = `${sessionId}/${chunkIndex}`; if (chunkLocks.has(lockKey)) { @@ -28,8 +76,6 @@ export const uploadChunk = async ( chunkLocks.add(lockKey); } - let filePath; - try { const session = await UploadRepo.getUploadSession(sessionId, userId); if (!session) { @@ -41,39 +87,10 @@ export const uploadChunk = async ( } const isLastChunk = chunkIndex === session.totalChunks; - filePath = `${session.path}/${chunkIndex}`; - - const hashStream = createHash("sha256"); - const writeStream = createWriteStream(filePath, { flags: "wx", mode: 0o600 }); - let writtenBytes = 0; - - for await (const chunk of encChunkStream) { - hashStream.update(chunk); - writeStream.write(chunk); - writtenBytes += chunk.length; - } - - await new Promise((resolve, reject) => { - writeStream.end((e: any) => (e ? reject(e) : resolve())); - }); - - if (hashStream.digest("base64") !== encChunkHash) { - throw new Error("Invalid checksum"); - } else if ( - (!isLastChunk && writtenBytes !== ENCRYPTED_CHUNK_SIZE) || - (isLastChunk && (writtenBytes <= ENCRYPTION_OVERHEAD || writtenBytes > ENCRYPTED_CHUNK_SIZE)) - ) { - throw new Error("Invalid chunk size"); - } - + await writeChunkAtOffset(session.path, encChunkStream, chunkIndex, isLastChunk); await UploadRepo.markChunkAsUploaded(sessionId, chunkIndex); } catch (e) { - await safeUnlink(filePath); - - if ( - e instanceof Error && - (e.message === "Invalid checksum" || e.message === "Invalid chunk size") - ) { + if (e instanceof Error && e.message === "Invalid chunk size") { error(400, "Invalid request body"); } throw e; @@ -84,5 +101,5 @@ export const uploadChunk = async ( export const cleanupExpiredUploadSessions = async () => { const paths = await UploadRepo.cleanupExpiredUploadSessions(); - await Promise.all(paths.map(safeRecursiveRm)); + await Promise.all(paths.map(safeUnlink)); }; diff --git a/src/routes/api/upload/[id]/chunks/[index]/+server.ts b/src/routes/api/upload/[id]/chunks/[index]/+server.ts index 3b2e85b..a8598e4 100644 --- a/src/routes/api/upload/[id]/chunks/[index]/+server.ts +++ b/src/routes/api/upload/[id]/chunks/[index]/+server.ts @@ -2,7 +2,6 @@ import { error, text } from "@sveltejs/kit"; import { Readable } from "stream"; import type { ReadableStream } from "stream/web"; import { z } from "zod"; -import { parseContentDigestHeader } from "$lib/modules/http"; import { authorize } from "$lib/server/modules/auth"; import { uploadChunk } from "$lib/server/services/upload"; import type { RequestHandler } from "./$types"; @@ -19,10 +18,7 @@ export const POST: RequestHandler = async ({ locals, params, request }) => { if (!zodRes.success) error(400, "Invalid path parameters"); const { id: sessionId, index: chunkIndex } = zodRes.data; - const encContentHash = parseContentDigestHeader(request.headers.get("Content-Digest")); - if (!encContentHash) { - error(400, "Invalid request headers"); - } else if (!request.body) { + if (!request.body) { error(400, "Invalid request body"); } @@ -31,7 +27,6 @@ export const POST: RequestHandler = async ({ locals, params, request }) => { sessionId, chunkIndex, Readable.fromWeb(request.body as ReadableStream), - encContentHash, ); return text("Chunk uploaded", { headers: { "Content-Type": "text/plain" } }); }; diff --git a/src/trpc/routers/upload.ts b/src/trpc/routers/upload.ts index 3b05cbb..3c2747b 100644 --- a/src/trpc/routers/upload.ts +++ b/src/trpc/routers/upload.ts @@ -1,7 +1,7 @@ import { TRPCError } from "@trpc/server"; import { createHash } from "crypto"; -import { createReadStream, createWriteStream } from "fs"; -import { copyFile, mkdir } from "fs/promises"; +import { createReadStream } from "fs"; +import { mkdir, open } from "fs/promises"; import mime from "mime"; import { dirname } from "path"; import { v4 as uuidv4 } from "uuid"; @@ -10,17 +10,30 @@ import { DirectoryIdSchema } from "$lib/schemas"; import { FileRepo, MediaRepo, UploadRepo, IntegrityError } from "$lib/server/db"; import db from "$lib/server/db/kysely"; import env from "$lib/server/loadenv"; -import { safeRecursiveRm, safeUnlink } from "$lib/server/modules/filesystem"; +import { safeUnlink } from "$lib/server/modules/filesystem"; import { router, roleProcedure } from "../init.server"; const UPLOADS_EXPIRES = 24 * 3600 * 1000; // 24 hours const sessionLocks = new Set(); -const generateSessionId = async () => { +const reserveUploadPath = async (path: string) => { + await mkdir(dirname(path), { recursive: true }); + const file = await open(path, "wx", 0o600); + await file.close(); +}; + +const generateFileUploadSession = async (userId: number) => { const id = uuidv4(); - const path = `${env.uploadsPath}/${id}`; - await mkdir(path, { recursive: true }); + const path = `${env.libraryPath}/${userId}/${uuidv4()}`; + await reserveUploadPath(path); + return { id, path }; +}; + +const generateThumbnailUploadSession = async (userId: number) => { + const id = uuidv4(); + const path = `${env.thumbnailsPath}/${userId}/${id}`; + await reserveUploadPath(path); return { id, path }; }; @@ -54,7 +67,7 @@ const uploadRouter = router({ throw new TRPCError({ code: "BAD_REQUEST", message: "Invalid DEK version" }); } - const { id, path } = await generateSessionId(); + const { id, path } = await generateFileUploadSession(ctx.session.userId); try { await UploadRepo.createFileUploadSession({ @@ -78,7 +91,7 @@ const uploadRouter = router({ }); return { uploadId: id }; } catch (e) { - await safeRecursiveRm(path); + await safeUnlink(path); if (e instanceof IntegrityError) { if (e.message === "Inactive MEK version") { @@ -96,6 +109,7 @@ const uploadRouter = router({ z.object({ uploadId: z.uuidv4(), contentHmac: z.base64().nonempty().optional(), + encContentHash: z.base64().nonempty(), }), ) .mutation(async ({ ctx, input }) => { @@ -106,8 +120,6 @@ const uploadRouter = router({ sessionLocks.add(uploadId); } - let filePath = ""; - try { const session = await UploadRepo.getUploadSession(uploadId, ctx.session.userId); if (session?.type !== "file") { @@ -121,29 +133,24 @@ const uploadRouter = router({ throw new TRPCError({ code: "BAD_REQUEST", message: "Upload not completed" }); } - filePath = `${env.libraryPath}/${ctx.session.userId}/${uuidv4()}`; - await mkdir(dirname(filePath), { recursive: true }); - const hashStream = createHash("sha256"); - const writeStream = createWriteStream(filePath, { flags: "wx", mode: 0o600 }); - for (let i = 1; i <= session.totalChunks; i++) { - for await (const chunk of createReadStream(`${session.path}/${i}`)) { - hashStream.update(chunk); - writeStream.write(chunk); - } + for await (const chunk of createReadStream(session.path)) { + hashStream.update(chunk); } - await new Promise((resolve, reject) => { - writeStream.end((e: any) => (e ? reject(e) : resolve())); - }); - const hash = hashStream.digest("base64"); + if (hash !== input.encContentHash) { + await UploadRepo.deleteUploadSession(db, uploadId); + await safeUnlink(session.path); + throw new TRPCError({ code: "CONFLICT", message: "Uploaded file corrupted" }); + } + const fileId = await db.transaction().execute(async (trx) => { const { id: fileId } = await FileRepo.registerFile(trx, { ...session, userId: ctx.session.userId, - path: filePath, + path: session.path, contentHmac: input.contentHmac ?? null, encContentHash: hash, encContentIv: null, @@ -152,11 +159,7 @@ const uploadRouter = router({ return fileId; }); - await safeRecursiveRm(session.path); return { file: fileId }; - } catch (e) { - await safeUnlink(filePath); - throw e; } finally { sessionLocks.delete(uploadId); } @@ -170,7 +173,7 @@ const uploadRouter = router({ }), ) .mutation(async ({ ctx, input }) => { - const { id, path } = await generateSessionId(); + const { id, path } = await generateThumbnailUploadSession(ctx.session.userId); try { await UploadRepo.createThumbnailUploadSession({ @@ -185,7 +188,7 @@ const uploadRouter = router({ }); return { uploadId: id }; } catch (e) { - await safeRecursiveRm(path); + await safeUnlink(path); if (e instanceof IntegrityError) { if (e.message === "File not found") { @@ -212,8 +215,6 @@ const uploadRouter = router({ sessionLocks.add(uploadId); } - let thumbnailPath = ""; - try { const session = await UploadRepo.getUploadSession(uploadId, ctx.session.userId); if (session?.type !== "thumbnail") { @@ -222,26 +223,20 @@ const uploadRouter = router({ throw new TRPCError({ code: "BAD_REQUEST", message: "Upload not completed" }); } - thumbnailPath = `${env.thumbnailsPath}/${ctx.session.userId}/${uploadId}`; - await mkdir(dirname(thumbnailPath), { recursive: true }); - await copyFile(`${session.path}/1`, thumbnailPath); - const oldThumbnailPath = await db.transaction().execute(async (trx) => { const oldPath = await MediaRepo.updateFileThumbnail( trx, ctx.session.userId, session.fileId, session.dekVersion, - thumbnailPath, + session.path, null, ); await UploadRepo.deleteUploadSession(trx, uploadId); return oldPath; }); - await Promise.all([safeUnlink(oldThumbnailPath), safeRecursiveRm(session.path)]); + await safeUnlink(oldThumbnailPath); } catch (e) { - await safeUnlink(thumbnailPath); - if (e instanceof IntegrityError && e.message === "Invalid DEK version") { // DEK rotated after this upload started throw new TRPCError({ code: "CONFLICT", message: e.message });