DB에 청크 업로드 경로를 저장하도록 변경

This commit is contained in:
static
2026-01-11 15:16:03 +09:00
parent 2801eed556
commit 83369f83e3
9 changed files with 143 additions and 118 deletions

View File

@@ -1,7 +1,7 @@
import { TRPCError } from "@trpc/server";
import { createHash } from "crypto";
import { createReadStream, createWriteStream } from "fs";
import { mkdir, rename, rm } from "fs/promises";
import { mkdir, rename } from "fs/promises";
import mime from "mime";
import { dirname } from "path";
import { v4 as uuidv4 } from "uuid";
@@ -10,10 +10,17 @@ import { DirectoryIdSchema } from "$lib/schemas";
import { FileRepo, MediaRepo, UploadRepo, IntegrityError } from "$lib/server/db";
import db from "$lib/server/db/kysely";
import env from "$lib/server/loadenv";
import { getChunkDirectoryPath, safeUnlink } from "$lib/server/modules/filesystem";
import { safeRecursiveRm, safeUnlink } from "$lib/server/modules/filesystem";
import { router, roleProcedure } from "../init.server";
const uploadLocks = new Set<string>();
const sessionLocks = new Set<string>();
const generateSessionId = async () => {
const id = uuidv4();
const path = `${env.uploadsPath}/${id}`;
await mkdir(path, { recursive: true });
return { id, path };
};
const uploadRouter = router({
startFileUpload: roleProcedure["activeClient"]
@@ -45,9 +52,13 @@ const uploadRouter = router({
throw new TRPCError({ code: "BAD_REQUEST", message: "Invalid DEK version" });
}
const { id, path } = await generateSessionId();
try {
const { id: sessionId } = await UploadRepo.createFileUploadSession({
await UploadRepo.createFileUploadSession({
id,
userId: ctx.session.userId,
path,
totalChunks: input.chunks,
expiresAt: new Date(Date.now() + 24 * 60 * 60 * 1000), // 24 hours
parentId: input.parent,
@@ -63,9 +74,10 @@ const uploadRouter = router({
: null,
encLastModifiedAt: { ciphertext: input.lastModifiedAt, iv: input.lastModifiedAtIv },
});
await mkdir(getChunkDirectoryPath(sessionId), { recursive: true });
return { uploadId: sessionId };
return { uploadId: id };
} catch (e) {
await safeRecursiveRm(path);
if (e instanceof IntegrityError) {
if (e.message === "Inactive MEK version") {
throw new TRPCError({ code: "BAD_REQUEST", message: "Invalid MEK version" });
@@ -85,16 +97,22 @@ const uploadRouter = router({
}),
)
.mutation(async ({ ctx, input }) => {
const { id, path } = await generateSessionId();
try {
const { id: sessionId } = await UploadRepo.createThumbnailUploadSession({
await UploadRepo.createThumbnailUploadSession({
id,
userId: ctx.session.userId,
path,
totalChunks: 1, // Up to 4 MiB
expiresAt: new Date(Date.now() + 24 * 60 * 60 * 1000), // 24 hours
fileId: input.file,
dekVersion: input.dekVersion,
});
await mkdir(getChunkDirectoryPath(sessionId), { recursive: true });
return { uploadId: sessionId };
return { uploadId: id };
} catch (e) {
await safeRecursiveRm(path);
if (e instanceof IntegrityError) {
if (e.message === "File not found") {
throw new TRPCError({ code: "NOT_FOUND", message: "File not found" });
@@ -115,14 +133,13 @@ const uploadRouter = router({
)
.mutation(async ({ ctx, input }) => {
const { uploadId } = input;
if (uploadLocks.has(uploadId)) {
throw new TRPCError({ code: "CONFLICT", message: "Upload already in progress" });
if (sessionLocks.has(uploadId)) {
throw new TRPCError({ code: "CONFLICT", message: "Completion already in progress" });
} else {
uploadLocks.add(uploadId);
sessionLocks.add(uploadId);
}
const filePath = `${env.libraryPath}/${ctx.session.userId}/${uuidv4()}`;
await mkdir(dirname(filePath), { recursive: true });
let filePath = "";
try {
const session = await UploadRepo.getUploadSession(uploadId, ctx.session.userId);
@@ -132,17 +149,19 @@ const uploadRouter = router({
(session.hskVersion && !input.contentHmac) ||
(!session.hskVersion && input.contentHmac)
) {
throw new TRPCError({ code: "BAD_REQUEST", message: "Invalid content hmac" });
throw new TRPCError({ code: "BAD_REQUEST", message: "Invalid content HMAC" });
} else if (session.uploadedChunks.length < session.totalChunks) {
throw new TRPCError({ code: "BAD_REQUEST", message: "Upload not complete" });
throw new TRPCError({ code: "BAD_REQUEST", message: "Upload not completed" });
}
const chunkDirectoryPath = getChunkDirectoryPath(uploadId);
filePath = `${env.libraryPath}/${ctx.session.userId}/${uuidv4()}`;
await mkdir(dirname(filePath), { recursive: true });
const hashStream = createHash("sha256");
const writeStream = createWriteStream(filePath, { flags: "wx", mode: 0o600 });
for (let i = 0; i < session.totalChunks; i++) {
for await (const chunk of createReadStream(`${chunkDirectoryPath}/${i}`)) {
for await (const chunk of createReadStream(`${session.path}/${i}`)) {
hashStream.update(chunk);
writeStream.write(chunk);
}
@@ -166,13 +185,13 @@ const uploadRouter = router({
return fileId;
});
await rm(chunkDirectoryPath, { recursive: true }).catch((e) => console.error(e));
await safeRecursiveRm(session.path);
return { file: fileId };
} catch (e) {
await safeUnlink(filePath);
throw e;
} finally {
uploadLocks.delete(uploadId);
sessionLocks.delete(uploadId);
}
}),
@@ -184,44 +203,39 @@ const uploadRouter = router({
)
.mutation(async ({ ctx, input }) => {
const { uploadId } = input;
if (uploadLocks.has(uploadId)) {
throw new TRPCError({ code: "CONFLICT", message: "Upload already in progress" });
if (sessionLocks.has(uploadId)) {
throw new TRPCError({ code: "CONFLICT", message: "Completion already in progress" });
} else {
uploadLocks.add(uploadId);
sessionLocks.add(uploadId);
}
const thumbnailPath = `${env.thumbnailsPath}/${ctx.session.userId}/${uuidv4()}`;
await mkdir(dirname(thumbnailPath), { recursive: true });
let thumbnailPath = "";
try {
const session = await UploadRepo.getUploadSession(uploadId, ctx.session.userId);
if (!session || session.type !== "thumbnail") {
throw new TRPCError({ code: "NOT_FOUND", message: "Invalid upload id" });
} else if (session.uploadedChunks.length < session.totalChunks) {
throw new TRPCError({ code: "BAD_REQUEST", message: "Upload not complete" });
throw new TRPCError({ code: "BAD_REQUEST", message: "Upload not completed" });
}
const chunkDirectoryPath = getChunkDirectoryPath(uploadId);
const chunkPath = `${chunkDirectoryPath}/0`;
thumbnailPath = `${env.thumbnailsPath}/${ctx.session.userId}/${uploadId}`;
await mkdir(dirname(thumbnailPath), { recursive: true });
await rename(`${session.path}/0`, thumbnailPath);
// Move chunk file to thumbnail path (IV is prepended to the content)
await rename(chunkPath, thumbnailPath);
// Update thumbnail in database (null IV since it's prepended to the file)
const oldPath = await MediaRepo.updateFileThumbnail(
ctx.session.userId,
session.fileId,
session.dekVersion,
thumbnailPath,
null,
);
safeUnlink(oldPath); // Intended
await db.transaction().execute(async (trx) => {
const oldThumbnailPath = await db.transaction().execute(async (trx) => {
const oldPath = await MediaRepo.updateFileThumbnail(
trx,
ctx.session.userId,
session.fileId,
session.dekVersion,
thumbnailPath,
null,
);
await UploadRepo.deleteUploadSession(trx, uploadId);
return oldPath;
});
await rm(chunkDirectoryPath, { recursive: true }).catch((e) => console.error(e));
await Promise.all([safeUnlink(oldThumbnailPath), safeRecursiveRm(session.path)]);
} catch (e) {
await safeUnlink(thumbnailPath);
if (e instanceof IntegrityError) {
@@ -233,7 +247,7 @@ const uploadRouter = router({
}
throw e;
} finally {
uploadLocks.delete(uploadId);
sessionLocks.delete(uploadId);
}
}),
});