diff --git a/.dockerignore b/.dockerignore index 4f68a3b..6d312ec 100644 --- a/.dockerignore +++ b/.dockerignore @@ -12,6 +12,7 @@ node_modules /data /library /thumbnails +/uploads # OS .DS_Store diff --git a/.env.example b/.env.example index e3b6365..4e8b20b 100644 --- a/.env.example +++ b/.env.example @@ -12,3 +12,4 @@ USER_CLIENT_CHALLENGE_EXPIRES= SESSION_UPGRADE_CHALLENGE_EXPIRES= LIBRARY_PATH= THUMBNAILS_PATH= +UPLOADS_PATH= diff --git a/.gitignore b/.gitignore index 5078fa8..a200c74 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,7 @@ node_modules /data /library /thumbnails +/uploads # OS .DS_Store diff --git a/docker-compose.yaml b/docker-compose.yaml index 2015066..a624d9f 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -20,6 +20,7 @@ services: - SESSION_UPGRADE_CHALLENGE_EXPIRES - LIBRARY_PATH=/app/data/library - THUMBNAILS_PATH=/app/data/thumbnails + - UPLOADS_PATH=/app/data/uploads # SvelteKit - ADDRESS_HEADER=${TRUST_PROXY:+X-Forwarded-For} - XFF_DEPTH=${TRUST_PROXY:-} diff --git a/src/hooks.server.ts b/src/hooks.server.ts index 6f94a7e..1795ce3 100644 --- a/src/hooks.server.ts +++ b/src/hooks.server.ts @@ -7,6 +7,7 @@ import { cleanupExpiredSessions, cleanupExpiredSessionUpgradeChallenges, } from "$lib/server/db/session"; +import { cleanupExpiredUploadSessions } from "$lib/server/db/upload"; import { authenticate, setAgentInfo } from "$lib/server/middlewares"; export const init: ServerInit = async () => { @@ -16,6 +17,7 @@ export const init: ServerInit = async () => { cleanupExpiredUserClientChallenges(); cleanupExpiredSessions(); cleanupExpiredSessionUpgradeChallenges(); + cleanupExpiredUploadSessions(); }); }; diff --git a/src/lib/constants/index.ts b/src/lib/constants/index.ts new file mode 100644 index 0000000..ab6125a --- /dev/null +++ b/src/lib/constants/index.ts @@ -0,0 +1 @@ +export * from "./upload"; diff --git a/src/lib/constants/upload.ts b/src/lib/constants/upload.ts new file mode 100644 index 0000000..337700d --- /dev/null +++ b/src/lib/constants/upload.ts @@ -0,0 +1,5 @@ +export const CHUNK_SIZE = 4 * 1024 * 1024; + +export const AES_GCM_IV_SIZE = 12; +export const AES_GCM_TAG_SIZE = 16; +export const ENCRYPTION_OVERHEAD = AES_GCM_IV_SIZE + AES_GCM_TAG_SIZE; diff --git a/src/lib/modules/crypto/aes.ts b/src/lib/modules/crypto/aes.ts index c911d26..67f6a9f 100644 --- a/src/lib/modules/crypto/aes.ts +++ b/src/lib/modules/crypto/aes.ts @@ -1,4 +1,11 @@ -import { encodeString, decodeString, encodeToBase64, decodeFromBase64 } from "./util"; +import { AES_GCM_IV_SIZE } from "$lib/constants"; +import { + encodeString, + decodeString, + encodeToBase64, + decodeFromBase64, + concatenateBuffers, +} from "./util"; export const generateMasterKey = async () => { return { @@ -86,7 +93,7 @@ export const encryptData = async (data: BufferSource, dataKey: CryptoKey) => { dataKey, data, ); - return { ciphertext, iv: encodeToBase64(iv.buffer) }; + return { ciphertext, iv: iv.buffer }; }; export const decryptData = async ( @@ -106,9 +113,22 @@ export const decryptData = async ( export const encryptString = async (plaintext: string, dataKey: CryptoKey) => { const { ciphertext, iv } = await encryptData(encodeString(plaintext), dataKey); - return { ciphertext: encodeToBase64(ciphertext), iv }; + return { ciphertext: encodeToBase64(ciphertext), iv: encodeToBase64(iv) }; }; export const decryptString = async (ciphertext: string, iv: string, dataKey: CryptoKey) => { return decodeString(await decryptData(decodeFromBase64(ciphertext), iv, dataKey)); }; + +export const encryptChunk = async (chunk: ArrayBuffer, dataKey: CryptoKey) => { + const { ciphertext, iv } = await encryptData(chunk, dataKey); + return concatenateBuffers(iv, ciphertext).buffer; +}; + +export const decryptChunk = async (encryptedChunk: ArrayBuffer, dataKey: CryptoKey) => { + return await decryptData( + encryptedChunk.slice(AES_GCM_IV_SIZE), + encryptedChunk.slice(0, AES_GCM_IV_SIZE), + dataKey, + ); +}; diff --git a/src/lib/modules/file/download.svelte.ts b/src/lib/modules/file/download.svelte.ts index 97f42ea..d438e3f 100644 --- a/src/lib/modules/file/download.svelte.ts +++ b/src/lib/modules/file/download.svelte.ts @@ -1,6 +1,7 @@ import axios from "axios"; import { limitFunction } from "p-limit"; -import { decryptData } from "$lib/modules/crypto"; +import { CHUNK_SIZE, ENCRYPTION_OVERHEAD } from "$lib/constants"; +import { decryptChunk, concatenateBuffers } from "$lib/modules/crypto"; export interface FileDownloadState { id: number; @@ -62,15 +63,24 @@ const requestFileDownload = limitFunction( ); const decryptFile = limitFunction( - async (state: FileDownloadState, fileEncrypted: ArrayBuffer, dataKey: CryptoKey) => { + async ( + state: FileDownloadState, + fileEncrypted: ArrayBuffer, + encryptedChunkSize: number, + dataKey: CryptoKey, + ) => { state.status = "decrypting"; - const fileBuffer = await decryptData( - fileEncrypted.slice(12), - fileEncrypted.slice(0, 12), - dataKey, - ); + const chunks: ArrayBuffer[] = []; + let offset = 0; + while (offset < fileEncrypted.byteLength) { + const nextOffset = Math.min(offset + encryptedChunkSize, fileEncrypted.byteLength); + chunks.push(await decryptChunk(fileEncrypted.slice(offset, nextOffset), dataKey)); + offset = nextOffset; + } + + const fileBuffer = concatenateBuffers(...chunks).buffer; state.status = "decrypted"; state.result = fileBuffer; return fileBuffer; @@ -78,7 +88,7 @@ const decryptFile = limitFunction( { concurrency: 4 }, ); -export const downloadFile = async (id: number, dataKey: CryptoKey) => { +export const downloadFile = async (id: number, dataKey: CryptoKey, isLegacy: boolean) => { downloadingFiles.push({ id, status: "download-pending", @@ -86,7 +96,13 @@ export const downloadFile = async (id: number, dataKey: CryptoKey) => { const state = downloadingFiles.at(-1)!; try { - return await decryptFile(state, await requestFileDownload(state, id), dataKey); + const fileEncrypted = await requestFileDownload(state, id); + return await decryptFile( + state, + fileEncrypted, + isLegacy ? fileEncrypted.byteLength : CHUNK_SIZE + ENCRYPTION_OVERHEAD, + dataKey, + ); } catch (e) { state.status = "error"; throw e; diff --git a/src/lib/modules/file/upload.svelte.ts b/src/lib/modules/file/upload.svelte.ts index a632eb5..2bb6c7c 100644 --- a/src/lib/modules/file/upload.svelte.ts +++ b/src/lib/modules/file/upload.svelte.ts @@ -1,24 +1,23 @@ import axios from "axios"; import ExifReader from "exifreader"; import { limitFunction } from "p-limit"; +import { CHUNK_SIZE } from "$lib/constants"; import { encodeToBase64, generateDataKey, wrapDataKey, encryptData, encryptString, + encryptChunk, digestMessage, signMessageHmac, } from "$lib/modules/crypto"; import { Scheduler } from "$lib/modules/scheduler"; import { generateThumbnail } from "$lib/modules/thumbnail"; -import type { - FileThumbnailUploadRequest, - FileUploadRequest, - FileUploadResponse, -} from "$lib/server/schemas"; +import type { FileThumbnailUploadRequest } from "$lib/server/schemas"; import type { MasterKey, HmacSecret } from "$lib/stores"; import { trpc } from "$trpc/client"; +import type { RouterInputs } from "$trpc/router.server"; export interface FileUploadState { name: string; @@ -110,6 +109,23 @@ const extractExifDateTime = (fileBuffer: ArrayBuffer) => { return new Date(utcDate - offsetMs); }; +const encryptChunks = async (fileBuffer: ArrayBuffer, dataKey: CryptoKey) => { + const chunksEncrypted: { chunkEncrypted: ArrayBuffer; chunkEncryptedHash: string }[] = []; + let offset = 0; + + while (offset < fileBuffer.byteLength) { + const nextOffset = Math.min(offset + CHUNK_SIZE, fileBuffer.byteLength); + const chunkEncrypted = await encryptChunk(fileBuffer.slice(offset, nextOffset), dataKey); + chunksEncrypted.push({ + chunkEncrypted: chunkEncrypted, + chunkEncryptedHash: encodeToBase64(await digestMessage(chunkEncrypted)), + }); + offset = nextOffset; + } + + return chunksEncrypted; +}; + const encryptFile = limitFunction( async (state: FileUploadState, file: File, fileBuffer: ArrayBuffer, masterKey: MasterKey) => { state.status = "encrypting"; @@ -123,9 +139,7 @@ const encryptFile = limitFunction( const { dataKey, dataKeyVersion } = await generateDataKey(); const dataKeyWrapped = await wrapDataKey(dataKey, masterKey.key); - - const fileEncrypted = await encryptData(fileBuffer, dataKey); - const fileEncryptedHash = encodeToBase64(await digestMessage(fileEncrypted.ciphertext)); + const chunksEncrypted = await encryptChunks(fileBuffer, dataKey); const nameEncrypted = await encryptString(file.name, dataKey); const createdAtEncrypted = @@ -142,8 +156,7 @@ const encryptFile = limitFunction( dataKeyWrapped, dataKeyVersion, fileType, - fileEncrypted, - fileEncryptedHash, + chunksEncrypted, nameEncrypted, createdAtEncrypted, lastModifiedAtEncrypted, @@ -154,30 +167,70 @@ const encryptFile = limitFunction( ); const requestFileUpload = limitFunction( - async (state: FileUploadState, form: FormData, thumbnailForm: FormData | null) => { + async ( + state: FileUploadState, + metadata: RouterInputs["file"]["startUpload"], + chunksEncrypted: { chunkEncrypted: ArrayBuffer; chunkEncryptedHash: string }[], + fileSigned: string | undefined, + thumbnailForm: FormData | null, + ) => { state.status = "uploading"; - const res = await axios.post("/api/file/upload", form, { - onUploadProgress: ({ progress, rate, estimated }) => { - state.progress = progress; - state.rate = rate; - state.estimated = estimated; - }, - }); - const { file }: FileUploadResponse = res.data; + const { uploadId } = await trpc().file.startUpload.mutate(metadata); + // Upload chunks with progress tracking + const totalBytes = chunksEncrypted.reduce((sum, c) => sum + c.chunkEncrypted.byteLength, 0); + let uploadedBytes = 0; + const startTime = Date.now(); + + for (let i = 0; i < chunksEncrypted.length; i++) { + const { chunkEncrypted, chunkEncryptedHash } = chunksEncrypted[i]!; + + const response = await fetch(`/api/file/upload/${uploadId}/chunks/${i}`, { + method: "POST", + headers: { + "Content-Type": "application/octet-stream", + "Content-Digest": `sha-256=:${chunkEncryptedHash}:`, + }, + body: chunkEncrypted, + }); + + if (!response.ok) { + throw new Error(`Chunk upload failed: ${response.status} ${response.statusText}`); + } + + uploadedBytes += chunkEncrypted.byteLength; + + // Calculate progress, rate, estimated + const elapsed = (Date.now() - startTime) / 1000; // seconds + const rate = uploadedBytes / elapsed; // bytes per second + const remaining = totalBytes - uploadedBytes; + const estimated = rate > 0 ? remaining / rate : undefined; + + state.progress = uploadedBytes / totalBytes; + state.rate = rate; + state.estimated = estimated; + } + + // Complete upload + const { file: fileId } = await trpc().file.completeUpload.mutate({ + uploadId, + contentHmac: fileSigned, + }); + + // Upload thumbnail if exists if (thumbnailForm) { try { - await axios.post(`/api/file/${file}/thumbnail/upload`, thumbnailForm); + await axios.post(`/api/file/${fileId}/thumbnail/upload`, thumbnailForm); } catch (e) { - // TODO + // TODO: Error handling for thumbnail upload console.error(e); } } state.status = "uploaded"; - return { fileId: file }; + return { fileId }; }, { concurrency: 1 }, ); @@ -215,36 +268,28 @@ export const uploadFile = async ( dataKeyWrapped, dataKeyVersion, fileType, - fileEncrypted, - fileEncryptedHash, + chunksEncrypted, nameEncrypted, createdAtEncrypted, lastModifiedAtEncrypted, thumbnail, } = await encryptFile(state, file, fileBuffer, masterKey); - const form = new FormData(); - form.set( - "metadata", - JSON.stringify({ - parent: parentId, - mekVersion: masterKey.version, - dek: dataKeyWrapped, - dekVersion: dataKeyVersion.toISOString(), - hskVersion: hmacSecret.version, - contentHmac: fileSigned, - contentType: fileType, - contentIv: fileEncrypted.iv, - name: nameEncrypted.ciphertext, - nameIv: nameEncrypted.iv, - createdAt: createdAtEncrypted?.ciphertext, - createdAtIv: createdAtEncrypted?.iv, - lastModifiedAt: lastModifiedAtEncrypted.ciphertext, - lastModifiedAtIv: lastModifiedAtEncrypted.iv, - } satisfies FileUploadRequest), - ); - form.set("content", new Blob([fileEncrypted.ciphertext])); - form.set("checksum", fileEncryptedHash); + const metadata = { + chunks: chunksEncrypted.length, + parent: parentId, + mekVersion: masterKey.version, + dek: dataKeyWrapped, + dekVersion: dataKeyVersion, + hskVersion: hmacSecret.version, + contentType: fileType, + name: nameEncrypted.ciphertext, + nameIv: nameEncrypted.iv, + createdAt: createdAtEncrypted?.ciphertext, + createdAtIv: createdAtEncrypted?.iv, + lastModifiedAt: lastModifiedAtEncrypted.ciphertext, + lastModifiedAtIv: lastModifiedAtEncrypted.iv, + }; let thumbnailForm = null; if (thumbnail) { @@ -253,13 +298,19 @@ export const uploadFile = async ( "metadata", JSON.stringify({ dekVersion: dataKeyVersion.toISOString(), - contentIv: thumbnail.iv, + contentIv: encodeToBase64(thumbnail.iv), } satisfies FileThumbnailUploadRequest), ); thumbnailForm.set("content", new Blob([thumbnail.ciphertext])); } - const { fileId } = await requestFileUpload(state, form, thumbnailForm); + const { fileId } = await requestFileUpload( + state, + metadata, + chunksEncrypted, + fileSigned, + thumbnailForm, + ); return { fileId, fileBuffer, thumbnailBuffer: thumbnail?.plaintext }; } catch (e) { state.status = "error"; diff --git a/src/lib/modules/filesystem/file.ts b/src/lib/modules/filesystem/file.ts index daf7fd6..d80a872 100644 --- a/src/lib/modules/filesystem/file.ts +++ b/src/lib/modules/filesystem/file.ts @@ -47,6 +47,7 @@ const cache = new FilesystemCache({ return storeToIndexedDB({ id, + isLegacy: file.isLegacy, parentId: file.parent, dataKey: metadata.dataKey, contentType: file.contentType, @@ -115,6 +116,7 @@ const cache = new FilesystemCache({ return { id, exists: true as const, + isLegacy: metadataRaw.isLegacy, parentId: metadataRaw.parent, contentType: metadataRaw.contentType, categories, diff --git a/src/lib/modules/filesystem/types.ts b/src/lib/modules/filesystem/types.ts index abac40c..f4ce9cf 100644 --- a/src/lib/modules/filesystem/types.ts +++ b/src/lib/modules/filesystem/types.ts @@ -28,6 +28,7 @@ export type SubDirectoryInfo = Omit; - interface File { id: number; parentId: DirectoryId; @@ -28,15 +26,13 @@ interface File { hskVersion: number | null; contentHmac: string | null; contentType: string; - encContentIv: string; + encContentIv: string | null; encContentHash: string; encName: Ciphertext; encCreatedAt: Ciphertext | null; encLastModifiedAt: Ciphertext; } -export type NewFile = Omit; - interface FileCategory { id: number; parentId: CategoryId; @@ -46,7 +42,7 @@ interface FileCategory { encName: Ciphertext; } -export const registerDirectory = async (params: NewDirectory) => { +export const registerDirectory = async (params: Omit) => { await db.transaction().execute(async (trx) => { const mek = await trx .selectFrom("master_encryption_key") @@ -214,69 +210,41 @@ export const unregisterDirectory = async (userId: number, directoryId: number) = }); }; -export const registerFile = async (params: NewFile) => { +export const registerFile = async (trx: typeof db, params: Omit) => { if ((params.hskVersion && !params.contentHmac) || (!params.hskVersion && params.contentHmac)) { throw new Error("Invalid arguments"); } - return await db.transaction().execute(async (trx) => { - const mek = await trx - .selectFrom("master_encryption_key") - .select("version") - .where("user_id", "=", params.userId) - .where("state", "=", "active") - .limit(1) - .forUpdate() - .executeTakeFirst(); - if (mek?.version !== params.mekVersion) { - throw new IntegrityError("Inactive MEK version"); - } - - if (params.hskVersion) { - const hsk = await trx - .selectFrom("hmac_secret_key") - .select("version") - .where("user_id", "=", params.userId) - .where("state", "=", "active") - .limit(1) - .forUpdate() - .executeTakeFirst(); - if (hsk?.version !== params.hskVersion) { - throw new IntegrityError("Inactive HSK version"); - } - } - - const { fileId } = await trx - .insertInto("file") - .values({ - parent_id: params.parentId !== "root" ? params.parentId : null, - user_id: params.userId, - path: params.path, - master_encryption_key_version: params.mekVersion, - encrypted_data_encryption_key: params.encDek, - data_encryption_key_version: params.dekVersion, - hmac_secret_key_version: params.hskVersion, - content_hmac: params.contentHmac, - content_type: params.contentType, - encrypted_content_iv: params.encContentIv, - encrypted_content_hash: params.encContentHash, - encrypted_name: params.encName, - encrypted_created_at: params.encCreatedAt, - encrypted_last_modified_at: params.encLastModifiedAt, - }) - .returning("id as fileId") - .executeTakeFirstOrThrow(); - await trx - .insertInto("file_log") - .values({ - file_id: fileId, - timestamp: new Date(), - action: "create", - new_name: params.encName, - }) - .execute(); - return { id: fileId }; - }); + const { fileId } = await trx + .insertInto("file") + .values({ + parent_id: params.parentId !== "root" ? params.parentId : null, + user_id: params.userId, + path: params.path, + master_encryption_key_version: params.mekVersion, + encrypted_data_encryption_key: params.encDek, + data_encryption_key_version: params.dekVersion, + hmac_secret_key_version: params.hskVersion, + content_hmac: params.contentHmac, + content_type: params.contentType, + encrypted_content_iv: params.encContentIv, + encrypted_content_hash: params.encContentHash, + encrypted_name: params.encName, + encrypted_created_at: params.encCreatedAt, + encrypted_last_modified_at: params.encLastModifiedAt, + }) + .returning("id as fileId") + .executeTakeFirstOrThrow(); + await trx + .insertInto("file_log") + .values({ + file_id: fileId, + timestamp: new Date(), + action: "create", + new_name: params.encName, + }) + .execute(); + return { id: fileId }; }; export const getAllFilesByParent = async (userId: number, parentId: DirectoryId) => { diff --git a/src/lib/server/db/index.ts b/src/lib/server/db/index.ts index 5c21deb..140cf7d 100644 --- a/src/lib/server/db/index.ts +++ b/src/lib/server/db/index.ts @@ -5,6 +5,7 @@ export * as HskRepo from "./hsk"; export * as MediaRepo from "./media"; export * as MekRepo from "./mek"; export * as SessionRepo from "./session"; +export * as UploadRepo from "./upload"; export * as UserRepo from "./user"; export * from "./error"; diff --git a/src/lib/server/db/migrations/1768062380-AddChunkedUpload.ts b/src/lib/server/db/migrations/1768062380-AddChunkedUpload.ts new file mode 100644 index 0000000..fe8abd4 --- /dev/null +++ b/src/lib/server/db/migrations/1768062380-AddChunkedUpload.ts @@ -0,0 +1,50 @@ +import { Kysely, sql } from "kysely"; + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export const up = async (db: Kysely) => { + // file.ts + await db.schema + .alterTable("file") + .alterColumn("encrypted_content_iv", (col) => col.dropNotNull()) + .execute(); + + // upload.ts + await db.schema + .createTable("upload_session") + .addColumn("id", "uuid", (col) => col.primaryKey().defaultTo(sql`gen_random_uuid()`)) + .addColumn("user_id", "integer", (col) => col.references("user.id").notNull()) + .addColumn("total_chunks", "integer", (col) => col.notNull()) + .addColumn("uploaded_chunks", sql`integer[]`, (col) => col.notNull().defaultTo(sql`'{}'`)) + .addColumn("expires_at", "timestamp(3)", (col) => col.notNull()) + .addColumn("parent_id", "integer", (col) => col.references("directory.id")) + .addColumn("master_encryption_key_version", "integer", (col) => col.notNull()) + .addColumn("encrypted_data_encryption_key", "text", (col) => col.notNull()) + .addColumn("data_encryption_key_version", "timestamp(3)", (col) => col.notNull()) + .addColumn("hmac_secret_key_version", "integer") + .addColumn("content_type", "text", (col) => col.notNull()) + .addColumn("encrypted_name", "json", (col) => col.notNull()) + .addColumn("encrypted_created_at", "json") + .addColumn("encrypted_last_modified_at", "json", (col) => col.notNull()) + .addForeignKeyConstraint( + "upload_session_fk01", + ["user_id", "master_encryption_key_version"], + "master_encryption_key", + ["user_id", "version"], + ) + .addForeignKeyConstraint( + "upload_session_fk02", + ["user_id", "hmac_secret_key_version"], + "hmac_secret_key", + ["user_id", "version"], + ) + .execute(); +}; + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export const down = async (db: Kysely) => { + await db.schema.dropTable("upload_session").execute(); + await db.schema + .alterTable("file") + .alterColumn("encrypted_content_iv", (col) => col.setNotNull()) + .execute(); +}; diff --git a/src/lib/server/db/migrations/index.ts b/src/lib/server/db/migrations/index.ts index f58c2d0..ca3310a 100644 --- a/src/lib/server/db/migrations/index.ts +++ b/src/lib/server/db/migrations/index.ts @@ -1,9 +1,11 @@ import * as Initial1737357000 from "./1737357000-Initial"; import * as AddFileCategory1737422340 from "./1737422340-AddFileCategory"; import * as AddThumbnail1738409340 from "./1738409340-AddThumbnail"; +import * as AddChunkedUpload1768062380 from "./1768062380-AddChunkedUpload"; export default { "1737357000-Initial": Initial1737357000, "1737422340-AddFileCategory": AddFileCategory1737422340, "1738409340-AddThumbnail": AddThumbnail1738409340, + "1768062380-AddChunkedUpload": AddChunkedUpload1768062380, }; diff --git a/src/lib/server/db/schema/file.ts b/src/lib/server/db/schema/file.ts index a1bf9bd..663aacd 100644 --- a/src/lib/server/db/schema/file.ts +++ b/src/lib/server/db/schema/file.ts @@ -30,7 +30,7 @@ interface FileTable { hmac_secret_key_version: number | null; content_hmac: string | null; // Base64 content_type: string; - encrypted_content_iv: string; // Base64 + encrypted_content_iv: string | null; // Base64 encrypted_content_hash: string; // Base64 encrypted_name: Ciphertext; encrypted_created_at: Ciphertext | null; diff --git a/src/lib/server/db/schema/index.ts b/src/lib/server/db/schema/index.ts index 4e427fb..dcc340b 100644 --- a/src/lib/server/db/schema/index.ts +++ b/src/lib/server/db/schema/index.ts @@ -5,6 +5,7 @@ export * from "./hsk"; export * from "./media"; export * from "./mek"; export * from "./session"; +export * from "./upload"; export * from "./user"; export * from "./util"; diff --git a/src/lib/server/db/schema/upload.ts b/src/lib/server/db/schema/upload.ts new file mode 100644 index 0000000..3372955 --- /dev/null +++ b/src/lib/server/db/schema/upload.ts @@ -0,0 +1,26 @@ +import type { Generated } from "kysely"; +import type { Ciphertext } from "./util"; + +interface UploadSessionTable { + id: Generated; + user_id: number; + total_chunks: number; + uploaded_chunks: Generated; + expires_at: Date; + + parent_id: number | null; + master_encryption_key_version: number; + encrypted_data_encryption_key: string; // Base64 + data_encryption_key_version: Date; + hmac_secret_key_version: number | null; + content_type: string; + encrypted_name: Ciphertext; + encrypted_created_at: Ciphertext | null; + encrypted_last_modified_at: Ciphertext; +} + +declare module "./index" { + interface Database { + upload_session: UploadSessionTable; + } +} diff --git a/src/lib/server/db/upload.ts b/src/lib/server/db/upload.ts new file mode 100644 index 0000000..935dc80 --- /dev/null +++ b/src/lib/server/db/upload.ts @@ -0,0 +1,122 @@ +import { sql } from "kysely"; +import { IntegrityError } from "./error"; +import db from "./kysely"; +import type { Ciphertext } from "./schema"; + +interface UploadSession { + id: string; + userId: number; + totalChunks: number; + uploadedChunks: number[]; + expiresAt: Date; + + parentId: DirectoryId; + mekVersion: number; + encDek: string; + dekVersion: Date; + hskVersion: number | null; + contentType: string; + encName: Ciphertext; + encCreatedAt: Ciphertext | null; + encLastModifiedAt: Ciphertext; +} + +export const createUploadSession = async (params: Omit) => { + return await db.transaction().execute(async (trx) => { + const mek = await trx + .selectFrom("master_encryption_key") + .select("version") + .where("user_id", "=", params.userId) + .where("state", "=", "active") + .limit(1) + .forUpdate() + .executeTakeFirst(); + if (mek?.version !== params.mekVersion) { + throw new IntegrityError("Inactive MEK version"); + } + + if (params.hskVersion) { + const hsk = await trx + .selectFrom("hmac_secret_key") + .select("version") + .where("user_id", "=", params.userId) + .where("state", "=", "active") + .limit(1) + .forUpdate() + .executeTakeFirst(); + if (hsk?.version !== params.hskVersion) { + throw new IntegrityError("Inactive HSK version"); + } + } + + const { sessionId } = await trx + .insertInto("upload_session") + .values({ + user_id: params.userId, + total_chunks: params.totalChunks, + expires_at: params.expiresAt, + parent_id: params.parentId !== "root" ? params.parentId : null, + master_encryption_key_version: params.mekVersion, + encrypted_data_encryption_key: params.encDek, + data_encryption_key_version: params.dekVersion, + hmac_secret_key_version: params.hskVersion, + content_type: params.contentType, + encrypted_name: params.encName, + encrypted_created_at: params.encCreatedAt, + encrypted_last_modified_at: params.encLastModifiedAt, + }) + .returning("id as sessionId") + .executeTakeFirstOrThrow(); + return { id: sessionId }; + }); +}; + +export const getUploadSession = async (sessionId: string, userId: number) => { + const session = await db + .selectFrom("upload_session") + .selectAll() + .where("id", "=", sessionId) + .where("user_id", "=", userId) + .where("expires_at", ">", new Date()) + .limit(1) + .executeTakeFirst(); + return session + ? ({ + id: session.id, + userId: session.user_id, + totalChunks: session.total_chunks, + uploadedChunks: session.uploaded_chunks, + expiresAt: session.expires_at, + parentId: session.parent_id ?? "root", + mekVersion: session.master_encryption_key_version, + encDek: session.encrypted_data_encryption_key, + dekVersion: session.data_encryption_key_version, + hskVersion: session.hmac_secret_key_version, + contentType: session.content_type, + encName: session.encrypted_name, + encCreatedAt: session.encrypted_created_at, + encLastModifiedAt: session.encrypted_last_modified_at, + } satisfies UploadSession) + : null; +}; + +export const markChunkAsUploaded = async (sessionId: string, chunkIndex: number) => { + await db + .updateTable("upload_session") + .set({ uploaded_chunks: sql`array_append(uploaded_chunks, ${chunkIndex})` }) + .where("id", "=", sessionId) + .execute(); +}; + +export const deleteUploadSession = async (trx: typeof db, sessionId: string) => { + await trx.deleteFrom("upload_session").where("id", "=", sessionId).execute(); +}; + +export const cleanupExpiredUploadSessions = async () => { + const sessions = await db + .deleteFrom("upload_session") + .where("expires_at", "<", new Date()) + .returning("id") + .execute(); + return sessions.map(({ id }) => id); +}; diff --git a/src/lib/server/loadenv.ts b/src/lib/server/loadenv.ts index 3a805d8..f8fd68f 100644 --- a/src/lib/server/loadenv.ts +++ b/src/lib/server/loadenv.ts @@ -26,4 +26,5 @@ export default { }, libraryPath: env.LIBRARY_PATH || "library", thumbnailsPath: env.THUMBNAILS_PATH || "thumbnails", + uploadsPath: env.UPLOADS_PATH || "uploads", }; diff --git a/src/lib/server/modules/filesystem.ts b/src/lib/server/modules/filesystem.ts index 65cb9ec..b87fd65 100644 --- a/src/lib/server/modules/filesystem.ts +++ b/src/lib/server/modules/filesystem.ts @@ -1,4 +1,7 @@ import { unlink } from "fs/promises"; +import env from "$lib/server/loadenv"; + +export const getChunkDirectoryPath = (sessionId: string) => `${env.uploadsPath}/${sessionId}`; export const safeUnlink = async (path: string | null | undefined) => { if (path) { diff --git a/src/lib/server/schemas/file.ts b/src/lib/server/schemas/file.ts index 811e590..8ba14e7 100644 --- a/src/lib/server/schemas/file.ts +++ b/src/lib/server/schemas/file.ts @@ -1,36 +1,7 @@ -import mime from "mime"; import { z } from "zod"; -import { directoryIdSchema } from "./directory"; export const fileThumbnailUploadRequest = z.object({ dekVersion: z.iso.datetime(), contentIv: z.base64().nonempty(), }); export type FileThumbnailUploadRequest = z.input; - -export const fileUploadRequest = z.object({ - parent: directoryIdSchema, - mekVersion: z.int().positive(), - dek: z.base64().nonempty(), - dekVersion: z.iso.datetime(), - hskVersion: z.int().positive(), - contentHmac: z.base64().nonempty(), - contentType: z - .string() - .trim() - .nonempty() - .refine((value) => mime.getExtension(value) !== null), // MIME type - contentIv: z.base64().nonempty(), - name: z.base64().nonempty(), - nameIv: z.base64().nonempty(), - createdAt: z.base64().nonempty().optional(), - createdAtIv: z.base64().nonempty().optional(), - lastModifiedAt: z.base64().nonempty(), - lastModifiedAtIv: z.base64().nonempty(), -}); -export type FileUploadRequest = z.input; - -export const fileUploadResponse = z.object({ - file: z.int().positive(), -}); -export type FileUploadResponse = z.output; diff --git a/src/lib/server/services/file.ts b/src/lib/server/services/file.ts index e45b16e..9df6430 100644 --- a/src/lib/server/services/file.ts +++ b/src/lib/server/services/file.ts @@ -6,17 +6,20 @@ import { dirname } from "path"; import { Readable } from "stream"; import { pipeline } from "stream/promises"; import { v4 as uuidv4 } from "uuid"; -import { FileRepo, MediaRepo, IntegrityError } from "$lib/server/db"; +import { CHUNK_SIZE, ENCRYPTION_OVERHEAD } from "$lib/constants"; +import { FileRepo, MediaRepo, UploadRepo, IntegrityError } from "$lib/server/db"; import env from "$lib/server/loadenv"; -import { safeUnlink } from "$lib/server/modules/filesystem"; +import { getChunkDirectoryPath, safeUnlink } from "$lib/server/modules/filesystem"; + +const uploadLocks = new Set(); const createEncContentStream = async ( path: string, - iv: Buffer, + iv?: Buffer, range?: { start?: number; end?: number }, ) => { const { size: fileSize } = await stat(path); - const ivSize = iv.byteLength; + const ivSize = iv?.byteLength ?? 0; const totalSize = fileSize + ivSize; const start = range?.start ?? 0; @@ -30,7 +33,7 @@ const createEncContentStream = async ( Readable.from( (async function* () { if (start < ivSize) { - yield iv.subarray(start, Math.min(end + 1, ivSize)); + yield iv!.subarray(start, Math.min(end + 1, ivSize)); } if (end >= ivSize) { yield* createReadStream(path, { @@ -55,7 +58,11 @@ export const getFileStream = async ( error(404, "Invalid file id"); } - return createEncContentStream(file.path, Buffer.from(file.encContentIv, "base64"), range); + return createEncContentStream( + file.path, + file.encContentIv ? Buffer.from(file.encContentIv, "base64") : undefined, + range, + ); }; export const getFileThumbnailStream = async ( @@ -110,56 +117,70 @@ export const uploadFileThumbnail = async ( } }; -export const uploadFile = async ( - params: Omit, - encContentStream: Readable, - encContentHash: Promise, +export const uploadChunk = async ( + userId: number, + sessionId: string, + chunkIndex: number, + encChunkStream: Readable, + encChunkHash: string, ) => { - const oneDayAgo = new Date(Date.now() - 24 * 60 * 60 * 1000); - const oneMinuteLater = new Date(Date.now() + 60 * 1000); - if (params.dekVersion <= oneDayAgo || params.dekVersion >= oneMinuteLater) { - error(400, "Invalid DEK version"); + const lockKey = `${sessionId}/${chunkIndex}`; + if (uploadLocks.has(lockKey)) { + error(409, "Chunk already uploaded"); // TODO: Message + } else { + uploadLocks.add(lockKey); } - const path = `${env.libraryPath}/${params.userId}/${uuidv4()}`; - await mkdir(dirname(path), { recursive: true }); + const filePath = `${getChunkDirectoryPath(sessionId)}/${chunkIndex}`; try { - const hashStream = createHash("sha256"); - const [, hash] = await Promise.all([ - pipeline( - encContentStream, - async function* (source) { - for await (const chunk of source) { - hashStream.update(chunk); - yield chunk; - } - }, - createWriteStream(path, { flags: "wx", mode: 0o600 }), - ), - encContentHash, - ]); - if (hashStream.digest("base64") !== hash) { - throw new Error("Invalid checksum"); + const session = await UploadRepo.getUploadSession(sessionId, userId); + if (!session) { + error(404, "Invalid upload id"); + } else if (chunkIndex >= session.totalChunks) { + error(400, "Invalid chunk index"); + } else if (session.uploadedChunks.includes(chunkIndex)) { + error(409, "Chunk already uploaded"); } - const { id: fileId } = await FileRepo.registerFile({ - ...params, - path, - encContentHash: hash, - }); - return { fileId }; - } catch (e) { - await safeUnlink(path); + const isLastChunk = chunkIndex === session.totalChunks - 1; - if (e instanceof IntegrityError && e.message === "Inactive MEK version") { - error(400, "Invalid MEK version"); + let writtenBytes = 0; + const hashStream = createHash("sha256"); + const writeStream = createWriteStream(filePath, { flags: "wx", mode: 0o600 }); + + for await (const chunk of encChunkStream) { + writtenBytes += chunk.length; + hashStream.update(chunk); + writeStream.write(chunk); + } + + await new Promise((resolve, reject) => { + writeStream.end((e: any) => (e ? reject(e) : resolve())); + }); + + if (hashStream.digest("base64") !== encChunkHash) { + throw new Error("Invalid checksum"); } else if ( + (!isLastChunk && writtenBytes !== CHUNK_SIZE + ENCRYPTION_OVERHEAD) || + (isLastChunk && + (writtenBytes <= ENCRYPTION_OVERHEAD || writtenBytes > CHUNK_SIZE + ENCRYPTION_OVERHEAD)) + ) { + throw new Error("Invalid chunk size"); + } + + await UploadRepo.markChunkAsUploaded(sessionId, chunkIndex); + } catch (e) { + await safeUnlink(filePath); + + if ( e instanceof Error && - (e.message === "Invalid request body" || e.message === "Invalid checksum") + (e.message === "Invalid checksum" || e.message === "Invalid chunk size") ) { error(400, "Invalid request body"); } throw e; + } finally { + uploadLocks.delete(lockKey); } }; diff --git a/src/lib/services/file.ts b/src/lib/services/file.ts index a0e769b..5f95f42 100644 --- a/src/lib/services/file.ts +++ b/src/lib/services/file.ts @@ -1,4 +1,5 @@ import { getAllFileInfos } from "$lib/indexedDB/filesystem"; +import { encodeToBase64 } from "$lib/modules/crypto"; import { getFileCache, storeFileCache, @@ -9,11 +10,15 @@ import { import type { FileThumbnailUploadRequest } from "$lib/server/schemas"; import { trpc } from "$trpc/client"; -export const requestFileDownload = async (fileId: number, dataKey: CryptoKey) => { +export const requestFileDownload = async ( + fileId: number, + dataKey: CryptoKey, + isLegacy: boolean, +) => { const cache = await getFileCache(fileId); if (cache) return cache; - const fileBuffer = await downloadFile(fileId, dataKey); + const fileBuffer = await downloadFile(fileId, dataKey, isLegacy); storeFileCache(fileId, fileBuffer); // Intended return fileBuffer; }; @@ -21,14 +26,14 @@ export const requestFileDownload = async (fileId: number, dataKey: CryptoKey) => export const requestFileThumbnailUpload = async ( fileId: number, dataKeyVersion: Date, - thumbnailEncrypted: { ciphertext: ArrayBuffer; iv: string }, + thumbnailEncrypted: { ciphertext: ArrayBuffer; iv: ArrayBuffer }, ) => { const form = new FormData(); form.set( "metadata", JSON.stringify({ dekVersion: dataKeyVersion.toISOString(), - contentIv: thumbnailEncrypted.iv, + contentIv: encodeToBase64(thumbnailEncrypted.iv), } satisfies FileThumbnailUploadRequest), ); form.set("content", new Blob([thumbnailEncrypted.ciphertext])); diff --git a/src/routes/(fullscreen)/file/[id]/+page.svelte b/src/routes/(fullscreen)/file/[id]/+page.svelte index f325c5e..4aa6b42 100644 --- a/src/routes/(fullscreen)/file/[id]/+page.svelte +++ b/src/routes/(fullscreen)/file/[id]/+page.svelte @@ -95,7 +95,7 @@ untrack(() => { if (!downloadState && !isDownloadRequested) { isDownloadRequested = true; - requestFileDownload(data.id, info!.dataKey!.key).then(async (buffer) => { + requestFileDownload(data.id, info!.dataKey!.key, info!.isLegacy!).then(async (buffer) => { const blob = await updateViewer(buffer, contentType); if (!viewerType) { FileSaver.saveAs(blob, info!.name); diff --git a/src/routes/(fullscreen)/settings/thumbnail/service.ts b/src/routes/(fullscreen)/settings/thumbnail/service.ts index 75c64b8..314cf5a 100644 --- a/src/routes/(fullscreen)/settings/thumbnail/service.ts +++ b/src/routes/(fullscreen)/settings/thumbnail/service.ts @@ -50,7 +50,7 @@ const requestThumbnailUpload = limitFunction( async ( fileId: number, dataKeyVersion: Date, - thumbnail: { plaintext: ArrayBuffer; ciphertext: ArrayBuffer; iv: string }, + thumbnail: { plaintext: ArrayBuffer; ciphertext: ArrayBuffer; iv: ArrayBuffer }, ) => { statuses.set(fileId, "uploading"); @@ -77,7 +77,7 @@ export const requestThumbnailGeneration = async (fileInfo: FileInfo) => { await scheduler.schedule( async () => { statuses.set(fileInfo.id, "generation-pending"); - file = await requestFileDownload(fileInfo.id, fileInfo.dataKey?.key!); + file = await requestFileDownload(fileInfo.id, fileInfo.dataKey?.key!, fileInfo.isLegacy!); return file.byteLength; }, async () => { diff --git a/src/routes/api/file/upload/+server.ts b/src/routes/api/file/upload/+server.ts deleted file mode 100644 index f9cbd53..0000000 --- a/src/routes/api/file/upload/+server.ts +++ /dev/null @@ -1,108 +0,0 @@ -import Busboy from "@fastify/busboy"; -import { error, json } from "@sveltejs/kit"; -import { Readable, Writable } from "stream"; -import { authorize } from "$lib/server/modules/auth"; -import { - fileUploadRequest, - fileUploadResponse, - type FileUploadResponse, -} from "$lib/server/schemas"; -import { uploadFile } from "$lib/server/services/file"; -import type { RequestHandler } from "./$types"; - -type FileMetadata = Parameters[0]; - -const parseFileMetadata = (userId: number, json: string) => { - const zodRes = fileUploadRequest.safeParse(JSON.parse(json)); - if (!zodRes.success) error(400, "Invalid request body"); - const { - parent, - mekVersion, - dek, - dekVersion, - hskVersion, - contentHmac, - contentType, - contentIv, - name, - nameIv, - createdAt, - createdAtIv, - lastModifiedAt, - lastModifiedAtIv, - } = zodRes.data; - if ((createdAt && !createdAtIv) || (!createdAt && createdAtIv)) - error(400, "Invalid request body"); - - return { - userId, - parentId: parent, - mekVersion, - encDek: dek, - dekVersion: new Date(dekVersion), - hskVersion, - contentHmac, - contentType, - encContentIv: contentIv, - encName: { ciphertext: name, iv: nameIv }, - encCreatedAt: createdAt && createdAtIv ? { ciphertext: createdAt, iv: createdAtIv } : null, - encLastModifiedAt: { ciphertext: lastModifiedAt, iv: lastModifiedAtIv }, - } satisfies FileMetadata; -}; - -export const POST: RequestHandler = async ({ locals, request }) => { - const { userId } = await authorize(locals, "activeClient"); - - const contentType = request.headers.get("Content-Type"); - if (!contentType?.startsWith("multipart/form-data") || !request.body) { - error(400, "Invalid request body"); - } - - return new Promise((resolve, reject) => { - const bb = Busboy({ headers: { "content-type": contentType } }); - const handler = - (f: (...args: T) => Promise) => - (...args: T) => { - f(...args).catch(reject); - }; - - let metadata: FileMetadata | null = null; - let content: Readable | null = null; - const checksum = new Promise((resolveChecksum, rejectChecksum) => { - bb.on( - "field", - handler(async (fieldname, val) => { - if (fieldname === "metadata") { - // Ignore subsequent metadata fields - if (!metadata) { - metadata = parseFileMetadata(userId, val); - } - } else if (fieldname === "checksum") { - // Ignore subsequent checksum fields - resolveChecksum(val); - } else { - error(400, "Invalid request body"); - } - }), - ); - bb.on( - "file", - handler(async (fieldname, file) => { - if (fieldname !== "content") error(400, "Invalid request body"); - if (!metadata || content) error(400, "Invalid request body"); - content = file; - - const { fileId } = await uploadFile(metadata, content, checksum); - resolve(json(fileUploadResponse.parse({ file: fileId } satisfies FileUploadResponse))); - }), - ); - bb.on("finish", () => rejectChecksum(new Error("Invalid request body"))); - bb.on("error", (e) => { - content?.emit("error", e) ?? reject(e); - rejectChecksum(e); - }); - }); - - request.body!.pipeTo(Writable.toWeb(bb)).catch(() => {}); // busboy will handle the error - }); -}; diff --git a/src/routes/api/file/upload/[id]/chunks/[index]/+server.ts b/src/routes/api/file/upload/[id]/chunks/[index]/+server.ts new file mode 100644 index 0000000..c44e425 --- /dev/null +++ b/src/routes/api/file/upload/[id]/chunks/[index]/+server.ts @@ -0,0 +1,43 @@ +import { error, text } from "@sveltejs/kit"; +import { Readable } from "stream"; +import { z } from "zod"; +import { authorize } from "$lib/server/modules/auth"; +import { uploadChunk } from "$lib/server/services/file"; +import type { RequestHandler } from "./$types"; + +export const POST: RequestHandler = async ({ locals, params, request }) => { + const { userId } = await authorize(locals, "activeClient"); + + const zodRes = z + .object({ + id: z.uuidv4(), + index: z.coerce.number().int().nonnegative(), + }) + .safeParse(params); + if (!zodRes.success) error(400, "Invalid path parameters"); + const { id: uploadId, index: chunkIndex } = zodRes.data; + + // Parse Content-Digest header (RFC 9530) + // Expected format: sha-256=:base64hash: + const contentDigest = request.headers.get("Content-Digest"); + if (!contentDigest) error(400, "Missing Content-Digest header"); + + const digestMatch = contentDigest.match(/^sha-256=:([A-Za-z0-9+/=]+):$/); + if (!digestMatch || !digestMatch[1]) + error(400, "Invalid Content-Digest format, must be sha-256=:base64:"); + const encChunkHash = digestMatch[1]; + + const contentType = request.headers.get("Content-Type"); + if (contentType !== "application/octet-stream" || !request.body) { + error(400, "Invalid request body"); + } + + // Convert web ReadableStream to Node Readable + const nodeReadable = Readable.fromWeb( + request.body as unknown as Parameters[0], + ); + + await uploadChunk(userId, uploadId, chunkIndex, nodeReadable, encChunkHash); + + return text("Chunk uploaded", { headers: { "Content-Type": "text/plain" } }); +}; diff --git a/src/trpc/routers/file.ts b/src/trpc/routers/file.ts index a56a91f..eaf42ca 100644 --- a/src/trpc/routers/file.ts +++ b/src/trpc/routers/file.ts @@ -1,9 +1,20 @@ import { TRPCError } from "@trpc/server"; +import { createHash } from "crypto"; +import { createReadStream, createWriteStream } from "fs"; +import { mkdir, rm } from "fs/promises"; +import mime from "mime"; +import { dirname } from "path"; +import { v4 as uuidv4 } from "uuid"; import { z } from "zod"; -import { FileRepo, MediaRepo, IntegrityError } from "$lib/server/db"; -import { safeUnlink } from "$lib/server/modules/filesystem"; +import { FileRepo, MediaRepo, UploadRepo, IntegrityError } from "$lib/server/db"; +import db from "$lib/server/db/kysely"; +import env from "$lib/server/loadenv"; +import { getChunkDirectoryPath, safeUnlink } from "$lib/server/modules/filesystem"; +import { directoryIdSchema } from "$lib/server/schemas"; import { router, roleProcedure } from "../init.server"; +const uploadLocks = new Set(); + const fileRouter = router({ get: roleProcedure["activeClient"] .input( @@ -19,6 +30,7 @@ const fileRouter = router({ const categories = await FileRepo.getAllFileCategories(input.id); return { + isLegacy: !!file.encContentIv, parent: file.parentId, mekVersion: file.mekVersion, dek: file.encDek, @@ -52,6 +64,7 @@ const fileRouter = router({ const files = await FileRepo.getFilesWithCategories(ctx.session.userId, input.ids); return files.map((file) => ({ id: file.id, + isLegacy: !!file.encContentIv, parent: file.parentId, mekVersion: file.mekVersion, dek: file.encDek, @@ -158,6 +171,137 @@ const fileRouter = router({ return { updatedAt: thumbnail.updatedAt }; }), + + startUpload: roleProcedure["activeClient"] + .input( + z.object({ + chunks: z.int().positive(), + parent: directoryIdSchema, + mekVersion: z.int().positive(), + dek: z.base64().nonempty(), + dekVersion: z.date(), + hskVersion: z.int().positive().optional(), + contentType: z + .string() + .trim() + .nonempty() + .refine((value) => mime.getExtension(value) !== null), + name: z.base64().nonempty(), + nameIv: z.base64().nonempty(), + createdAt: z.base64().nonempty().optional(), + createdAtIv: z.base64().nonempty().optional(), + lastModifiedAt: z.base64().nonempty(), + lastModifiedAtIv: z.base64().nonempty(), + }), + ) + .mutation(async ({ ctx, input }) => { + const oneMinuteAgo = new Date(Date.now() - 60 * 1000); + const oneMinuteLater = new Date(Date.now() + 60 * 1000); + if (input.dekVersion <= oneMinuteAgo || input.dekVersion >= oneMinuteLater) { + throw new TRPCError({ code: "BAD_REQUEST", message: "Invalid DEK version" }); + } + + try { + const { id: sessionId } = await UploadRepo.createUploadSession({ + userId: ctx.session.userId, + totalChunks: input.chunks, + expiresAt: new Date(Date.now() + 24 * 60 * 60 * 1000), // 24 hours + parentId: input.parent, + mekVersion: input.mekVersion, + encDek: input.dek, + dekVersion: input.dekVersion, + hskVersion: input.hskVersion ?? null, + contentType: input.contentType, + encName: { ciphertext: input.name, iv: input.nameIv }, + encCreatedAt: + input.createdAt && input.createdAtIv + ? { ciphertext: input.createdAt, iv: input.createdAtIv } + : null, + encLastModifiedAt: { ciphertext: input.lastModifiedAt, iv: input.lastModifiedAtIv }, + }); + await mkdir(getChunkDirectoryPath(sessionId), { recursive: true }); + return { uploadId: sessionId }; + } catch (e) { + if (e instanceof IntegrityError) { + if (e.message === "Inactive MEK version") { + throw new TRPCError({ code: "BAD_REQUEST", message: "Invalid MEK version" }); + } else if (e.message === "Inactive HSK version") { + throw new TRPCError({ code: "BAD_REQUEST", message: "Invalid HSK version" }); + } + } + throw e; + } + }), + + completeUpload: roleProcedure["activeClient"] + .input( + z.object({ + uploadId: z.uuidv4(), + contentHmac: z.base64().nonempty().optional(), + }), + ) + .mutation(async ({ ctx, input }) => { + const { uploadId } = input; + if (uploadLocks.has(uploadId)) { + throw new TRPCError({ code: "CONFLICT", message: "Upload already in progress" }); // TODO: Message + } else { + uploadLocks.add(uploadId); + } + + const filePath = `${env.libraryPath}/${ctx.session.userId}/${uuidv4()}`; + await mkdir(dirname(filePath), { recursive: true }); + + try { + const session = await UploadRepo.getUploadSession(uploadId, ctx.session.userId); + if (!session) { + throw new TRPCError({ code: "NOT_FOUND", message: "Invalid upload id" }); + } else if ( + (session.hskVersion && !input.contentHmac) || + (!session.hskVersion && input.contentHmac) + ) { + throw new TRPCError({ code: "BAD_REQUEST", message: "Invalid content hmac" }); // TODO: message + } else if (session.uploadedChunks.length < session.totalChunks) { + throw new TRPCError({ code: "BAD_REQUEST", message: "Upload not complete" }); // TODO: Message + } + + const chunkDirectoryPath = getChunkDirectoryPath(uploadId); + const hashStream = createHash("sha256"); + const writeStream = createWriteStream(filePath, { flags: "wx", mode: 0o600 }); + + for (let i = 0; i < session.totalChunks; i++) { + for await (const chunk of createReadStream(`${chunkDirectoryPath}/${i}`)) { + hashStream.update(chunk); + writeStream.write(chunk); + } + } + + await new Promise((resolve, reject) => { + writeStream.end((e: any) => (e ? reject(e) : resolve())); + }); + + const hash = hashStream.digest("base64"); + const fileId = await db.transaction().execute(async (trx) => { + const { id: fileId } = await FileRepo.registerFile(trx, { + ...session, + userId: ctx.session.userId, + path: filePath, + contentHmac: input.contentHmac ?? null, + encContentHash: hash, + encContentIv: null, + }); + await UploadRepo.deleteUploadSession(trx, uploadId); + return fileId; + }); + + await rm(chunkDirectoryPath, { recursive: true }).catch((e) => console.error(e)); + return { file: fileId }; + } catch (e) { + await safeUnlink(filePath); + throw e; + } finally { + uploadLocks.delete(uploadId); + } + }), }); export default fileRouter;