mirror of
https://github.com/kmc7468/arkvault.git
synced 2026-02-04 08:06:56 +00:00
파일 업로드 방식을 Chunking 방식으로 변경
This commit is contained in:
@@ -6,17 +6,20 @@ import { dirname } from "path";
|
||||
import { Readable } from "stream";
|
||||
import { pipeline } from "stream/promises";
|
||||
import { v4 as uuidv4 } from "uuid";
|
||||
import { FileRepo, MediaRepo, IntegrityError } from "$lib/server/db";
|
||||
import { CHUNK_SIZE, ENCRYPTION_OVERHEAD } from "$lib/constants";
|
||||
import { FileRepo, MediaRepo, UploadRepo, IntegrityError } from "$lib/server/db";
|
||||
import env from "$lib/server/loadenv";
|
||||
import { safeUnlink } from "$lib/server/modules/filesystem";
|
||||
import { getChunkDirectoryPath, safeUnlink } from "$lib/server/modules/filesystem";
|
||||
|
||||
const uploadLocks = new Set<string>();
|
||||
|
||||
const createEncContentStream = async (
|
||||
path: string,
|
||||
iv: Buffer,
|
||||
iv?: Buffer,
|
||||
range?: { start?: number; end?: number },
|
||||
) => {
|
||||
const { size: fileSize } = await stat(path);
|
||||
const ivSize = iv.byteLength;
|
||||
const ivSize = iv?.byteLength ?? 0;
|
||||
const totalSize = fileSize + ivSize;
|
||||
|
||||
const start = range?.start ?? 0;
|
||||
@@ -30,7 +33,7 @@ const createEncContentStream = async (
|
||||
Readable.from(
|
||||
(async function* () {
|
||||
if (start < ivSize) {
|
||||
yield iv.subarray(start, Math.min(end + 1, ivSize));
|
||||
yield iv!.subarray(start, Math.min(end + 1, ivSize));
|
||||
}
|
||||
if (end >= ivSize) {
|
||||
yield* createReadStream(path, {
|
||||
@@ -55,7 +58,11 @@ export const getFileStream = async (
|
||||
error(404, "Invalid file id");
|
||||
}
|
||||
|
||||
return createEncContentStream(file.path, Buffer.from(file.encContentIv, "base64"), range);
|
||||
return createEncContentStream(
|
||||
file.path,
|
||||
file.encContentIv ? Buffer.from(file.encContentIv, "base64") : undefined,
|
||||
range,
|
||||
);
|
||||
};
|
||||
|
||||
export const getFileThumbnailStream = async (
|
||||
@@ -110,56 +117,70 @@ export const uploadFileThumbnail = async (
|
||||
}
|
||||
};
|
||||
|
||||
export const uploadFile = async (
|
||||
params: Omit<FileRepo.NewFile, "path" | "encContentHash">,
|
||||
encContentStream: Readable,
|
||||
encContentHash: Promise<string>,
|
||||
export const uploadChunk = async (
|
||||
userId: number,
|
||||
sessionId: string,
|
||||
chunkIndex: number,
|
||||
encChunkStream: Readable,
|
||||
encChunkHash: string,
|
||||
) => {
|
||||
const oneDayAgo = new Date(Date.now() - 24 * 60 * 60 * 1000);
|
||||
const oneMinuteLater = new Date(Date.now() + 60 * 1000);
|
||||
if (params.dekVersion <= oneDayAgo || params.dekVersion >= oneMinuteLater) {
|
||||
error(400, "Invalid DEK version");
|
||||
const lockKey = `${sessionId}/${chunkIndex}`;
|
||||
if (uploadLocks.has(lockKey)) {
|
||||
error(409, "Chunk already uploaded"); // TODO: Message
|
||||
} else {
|
||||
uploadLocks.add(lockKey);
|
||||
}
|
||||
|
||||
const path = `${env.libraryPath}/${params.userId}/${uuidv4()}`;
|
||||
await mkdir(dirname(path), { recursive: true });
|
||||
const filePath = `${getChunkDirectoryPath(sessionId)}/${chunkIndex}`;
|
||||
|
||||
try {
|
||||
const hashStream = createHash("sha256");
|
||||
const [, hash] = await Promise.all([
|
||||
pipeline(
|
||||
encContentStream,
|
||||
async function* (source) {
|
||||
for await (const chunk of source) {
|
||||
hashStream.update(chunk);
|
||||
yield chunk;
|
||||
}
|
||||
},
|
||||
createWriteStream(path, { flags: "wx", mode: 0o600 }),
|
||||
),
|
||||
encContentHash,
|
||||
]);
|
||||
if (hashStream.digest("base64") !== hash) {
|
||||
throw new Error("Invalid checksum");
|
||||
const session = await UploadRepo.getUploadSession(sessionId, userId);
|
||||
if (!session) {
|
||||
error(404, "Invalid upload id");
|
||||
} else if (chunkIndex >= session.totalChunks) {
|
||||
error(400, "Invalid chunk index");
|
||||
} else if (session.uploadedChunks.includes(chunkIndex)) {
|
||||
error(409, "Chunk already uploaded");
|
||||
}
|
||||
|
||||
const { id: fileId } = await FileRepo.registerFile({
|
||||
...params,
|
||||
path,
|
||||
encContentHash: hash,
|
||||
});
|
||||
return { fileId };
|
||||
} catch (e) {
|
||||
await safeUnlink(path);
|
||||
const isLastChunk = chunkIndex === session.totalChunks - 1;
|
||||
|
||||
if (e instanceof IntegrityError && e.message === "Inactive MEK version") {
|
||||
error(400, "Invalid MEK version");
|
||||
let writtenBytes = 0;
|
||||
const hashStream = createHash("sha256");
|
||||
const writeStream = createWriteStream(filePath, { flags: "wx", mode: 0o600 });
|
||||
|
||||
for await (const chunk of encChunkStream) {
|
||||
writtenBytes += chunk.length;
|
||||
hashStream.update(chunk);
|
||||
writeStream.write(chunk);
|
||||
}
|
||||
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
writeStream.end((e: any) => (e ? reject(e) : resolve()));
|
||||
});
|
||||
|
||||
if (hashStream.digest("base64") !== encChunkHash) {
|
||||
throw new Error("Invalid checksum");
|
||||
} else if (
|
||||
(!isLastChunk && writtenBytes !== CHUNK_SIZE + ENCRYPTION_OVERHEAD) ||
|
||||
(isLastChunk &&
|
||||
(writtenBytes <= ENCRYPTION_OVERHEAD || writtenBytes > CHUNK_SIZE + ENCRYPTION_OVERHEAD))
|
||||
) {
|
||||
throw new Error("Invalid chunk size");
|
||||
}
|
||||
|
||||
await UploadRepo.markChunkAsUploaded(sessionId, chunkIndex);
|
||||
} catch (e) {
|
||||
await safeUnlink(filePath);
|
||||
|
||||
if (
|
||||
e instanceof Error &&
|
||||
(e.message === "Invalid request body" || e.message === "Invalid checksum")
|
||||
(e.message === "Invalid checksum" || e.message === "Invalid chunk size")
|
||||
) {
|
||||
error(400, "Invalid request body");
|
||||
}
|
||||
throw e;
|
||||
} finally {
|
||||
uploadLocks.delete(lockKey);
|
||||
}
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user