mirror of
https://github.com/kmc7468/arkvault.git
synced 2026-02-03 23:56:53 +00:00
청크 업로드 성능 개선 및 네트워크 속도를 더 정확하게 측정하도록 개선
This commit is contained in:
@@ -3,10 +3,10 @@ import { limitFunction } from "p-limit";
|
|||||||
import { CHUNK_SIZE } from "$lib/constants";
|
import { CHUNK_SIZE } from "$lib/constants";
|
||||||
import { encodeToBase64, generateDataKey, wrapDataKey, encryptString } from "$lib/modules/crypto";
|
import { encodeToBase64, generateDataKey, wrapDataKey, encryptString } from "$lib/modules/crypto";
|
||||||
import { signMessageHmac } from "$lib/modules/crypto";
|
import { signMessageHmac } from "$lib/modules/crypto";
|
||||||
import { Scheduler } from "$lib/modules/scheduler";
|
|
||||||
import { generateThumbnail } from "$lib/modules/thumbnail";
|
import { generateThumbnail } from "$lib/modules/thumbnail";
|
||||||
import { uploadBlob } from "$lib/modules/upload";
|
import { uploadBlob } from "$lib/modules/upload";
|
||||||
import type { MasterKey, HmacSecret } from "$lib/stores";
|
import type { MasterKey, HmacSecret } from "$lib/stores";
|
||||||
|
import { Scheduler } from "$lib/utils";
|
||||||
import { trpc } from "$trpc/client";
|
import { trpc } from "$trpc/client";
|
||||||
|
|
||||||
export interface FileUploadState {
|
export interface FileUploadState {
|
||||||
|
|||||||
@@ -2,55 +2,99 @@ import axios from "axios";
|
|||||||
import pLimit from "p-limit";
|
import pLimit from "p-limit";
|
||||||
import { ENCRYPTION_OVERHEAD, CHUNK_SIZE } from "$lib/constants";
|
import { ENCRYPTION_OVERHEAD, CHUNK_SIZE } from "$lib/constants";
|
||||||
import { encryptChunk, digestMessage, encodeToBase64 } from "$lib/modules/crypto";
|
import { encryptChunk, digestMessage, encodeToBase64 } from "$lib/modules/crypto";
|
||||||
|
import { BoundedQueue } from "$lib/utils";
|
||||||
|
|
||||||
interface UploadStats {
|
interface UploadStats {
|
||||||
progress: number;
|
progress: number;
|
||||||
rate: number;
|
rate: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
const createSpeedMeter = (timeWindow = 1500) => {
|
interface EncryptedChunk {
|
||||||
|
index: number;
|
||||||
|
data: ArrayBuffer;
|
||||||
|
hash: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
const createSpeedMeter = (timeWindow = 3000, minInterval = 200, warmupPeriod = 500) => {
|
||||||
const samples: { t: number; b: number }[] = [];
|
const samples: { t: number; b: number }[] = [];
|
||||||
let lastSpeed = 0;
|
let lastSpeed = 0;
|
||||||
|
let startTime: number | null = null;
|
||||||
|
|
||||||
return (bytesNow?: number) => {
|
return (bytesNow?: number) => {
|
||||||
if (!bytesNow) return lastSpeed;
|
if (bytesNow === undefined) return lastSpeed;
|
||||||
|
|
||||||
const now = performance.now();
|
const now = performance.now();
|
||||||
|
|
||||||
|
// Initialize start time on first call
|
||||||
|
if (startTime === null) {
|
||||||
|
startTime = now;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if enough time has passed since the last sample
|
||||||
|
const lastSample = samples[samples.length - 1];
|
||||||
|
if (lastSample && now - lastSample.t < minInterval) {
|
||||||
|
return lastSpeed;
|
||||||
|
}
|
||||||
|
|
||||||
samples.push({ t: now, b: bytesNow });
|
samples.push({ t: now, b: bytesNow });
|
||||||
|
|
||||||
|
// Remove old samples outside the time window
|
||||||
const cutoff = now - timeWindow;
|
const cutoff = now - timeWindow;
|
||||||
while (samples.length > 2 && samples[0]!.t < cutoff) samples.shift();
|
while (samples.length > 2 && samples[0]!.t < cutoff) samples.shift();
|
||||||
|
|
||||||
|
// Need at least 2 samples to calculate speed
|
||||||
|
if (samples.length < 2) {
|
||||||
|
return lastSpeed;
|
||||||
|
}
|
||||||
|
|
||||||
const first = samples[0]!;
|
const first = samples[0]!;
|
||||||
const dt = now - first.t;
|
const dt = now - first.t;
|
||||||
const db = bytesNow - first.b;
|
const db = bytesNow - first.b;
|
||||||
|
|
||||||
lastSpeed = dt > 0 ? (db / dt) * 1000 : 0;
|
if (dt >= minInterval) {
|
||||||
|
const instantSpeed = (db / dt) * 1000;
|
||||||
|
// Apply EMA for smoother speed transitions
|
||||||
|
const alpha = 0.3;
|
||||||
|
const rawSpeed =
|
||||||
|
lastSpeed === 0 ? instantSpeed : alpha * instantSpeed + (1 - alpha) * lastSpeed;
|
||||||
|
|
||||||
|
// Apply warmup ramp to prevent initial overestimation
|
||||||
|
const elapsed = now - startTime;
|
||||||
|
const warmupWeight = Math.min(1, elapsed / warmupPeriod);
|
||||||
|
lastSpeed = rawSpeed * warmupWeight;
|
||||||
|
}
|
||||||
|
|
||||||
return lastSpeed;
|
return lastSpeed;
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
const uploadChunk = async (
|
const encryptChunkData = async (
|
||||||
uploadId: string,
|
|
||||||
chunkIndex: number,
|
|
||||||
chunk: Blob,
|
chunk: Blob,
|
||||||
dataKey: CryptoKey,
|
dataKey: CryptoKey,
|
||||||
|
): Promise<{ data: ArrayBuffer; hash: string }> => {
|
||||||
|
const encrypted = await encryptChunk(await chunk.arrayBuffer(), dataKey);
|
||||||
|
const hash = encodeToBase64(await digestMessage(encrypted));
|
||||||
|
return { data: encrypted, hash };
|
||||||
|
};
|
||||||
|
|
||||||
|
const uploadEncryptedChunk = async (
|
||||||
|
uploadId: string,
|
||||||
|
chunkIndex: number,
|
||||||
|
encrypted: ArrayBuffer,
|
||||||
|
hash: string,
|
||||||
onChunkProgress: (chunkIndex: number, loaded: number) => void,
|
onChunkProgress: (chunkIndex: number, loaded: number) => void,
|
||||||
) => {
|
) => {
|
||||||
const chunkEncrypted = await encryptChunk(await chunk.arrayBuffer(), dataKey);
|
await axios.post(`/api/upload/${uploadId}/chunks/${chunkIndex + 1}`, encrypted, {
|
||||||
const chunkEncryptedHash = encodeToBase64(await digestMessage(chunkEncrypted));
|
|
||||||
|
|
||||||
await axios.post(`/api/upload/${uploadId}/chunks/${chunkIndex}`, chunkEncrypted, {
|
|
||||||
headers: {
|
headers: {
|
||||||
"Content-Type": "application/octet-stream",
|
"Content-Type": "application/octet-stream",
|
||||||
"Content-Digest": `sha-256=:${chunkEncryptedHash}:`,
|
"Content-Digest": `sha-256=:${hash}:`,
|
||||||
},
|
},
|
||||||
onUploadProgress(e) {
|
onUploadProgress(e) {
|
||||||
onChunkProgress(chunkIndex, e.loaded ?? 0);
|
onChunkProgress(chunkIndex, e.loaded ?? 0);
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
onChunkProgress(chunkIndex, chunkEncrypted.byteLength);
|
onChunkProgress(chunkIndex, encrypted.byteLength);
|
||||||
};
|
};
|
||||||
|
|
||||||
export const uploadBlob = async (
|
export const uploadBlob = async (
|
||||||
@@ -60,12 +104,14 @@ export const uploadBlob = async (
|
|||||||
options?: { concurrency?: number; onProgress?: (s: UploadStats) => void },
|
options?: { concurrency?: number; onProgress?: (s: UploadStats) => void },
|
||||||
) => {
|
) => {
|
||||||
const onProgress = options?.onProgress;
|
const onProgress = options?.onProgress;
|
||||||
|
const networkConcurrency = options?.concurrency ?? 4;
|
||||||
|
const maxQueueSize = 8;
|
||||||
|
|
||||||
const totalChunks = Math.ceil(blob.size / CHUNK_SIZE);
|
const totalChunks = Math.ceil(blob.size / CHUNK_SIZE);
|
||||||
const totalBytes = blob.size + totalChunks * ENCRYPTION_OVERHEAD;
|
const totalBytes = blob.size + totalChunks * ENCRYPTION_OVERHEAD;
|
||||||
|
|
||||||
const uploadedByChunk = new Array<number>(totalChunks).fill(0);
|
const uploadedByChunk = new Array<number>(totalChunks).fill(0);
|
||||||
const speedMeter = createSpeedMeter(1500);
|
const speedMeter = createSpeedMeter(3000, 200);
|
||||||
|
|
||||||
const emit = () => {
|
const emit = () => {
|
||||||
if (!onProgress) return;
|
if (!onProgress) return;
|
||||||
@@ -82,21 +128,56 @@ export const uploadBlob = async (
|
|||||||
emit();
|
emit();
|
||||||
};
|
};
|
||||||
|
|
||||||
const limit = pLimit(options?.concurrency ?? 4);
|
const queue = new BoundedQueue<EncryptedChunk>(maxQueueSize);
|
||||||
|
let encryptionError: Error | null = null;
|
||||||
|
|
||||||
await Promise.all(
|
// Producer: encrypt chunks and push to queue
|
||||||
Array.from({ length: totalChunks }, (_, i) =>
|
const encryptionProducer = async () => {
|
||||||
limit(() =>
|
try {
|
||||||
uploadChunk(
|
for (let i = 0; i < totalChunks; i++) {
|
||||||
uploadId,
|
const chunk = blob.slice(i * CHUNK_SIZE, (i + 1) * CHUNK_SIZE);
|
||||||
i + 1,
|
const { data, hash } = await encryptChunkData(chunk, dataKey);
|
||||||
blob.slice(i * CHUNK_SIZE, (i + 1) * CHUNK_SIZE),
|
await queue.push({ index: i, data, hash });
|
||||||
dataKey,
|
}
|
||||||
onChunkProgress,
|
} catch (e) {
|
||||||
),
|
encryptionError = e instanceof Error ? e : new Error(String(e));
|
||||||
),
|
} finally {
|
||||||
),
|
queue.close();
|
||||||
);
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Consumer: upload chunks from queue with concurrency limit
|
||||||
|
const uploadConsumer = async () => {
|
||||||
|
const limit = pLimit(networkConcurrency);
|
||||||
|
const activeTasks = new Set<Promise<void>>();
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
const item = await queue.pop();
|
||||||
|
if (item === null) break;
|
||||||
|
if (encryptionError) throw encryptionError;
|
||||||
|
|
||||||
|
const task = limit(async () => {
|
||||||
|
try {
|
||||||
|
await uploadEncryptedChunk(uploadId, item.index, item.data, item.hash, onChunkProgress);
|
||||||
|
} finally {
|
||||||
|
// @ts-ignore
|
||||||
|
item.data = null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
activeTasks.add(task);
|
||||||
|
task.finally(() => activeTasks.delete(task));
|
||||||
|
|
||||||
|
if (activeTasks.size >= networkConcurrency) {
|
||||||
|
await Promise.race(activeTasks);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
await Promise.all(activeTasks);
|
||||||
|
};
|
||||||
|
|
||||||
|
// Run producer and consumer concurrently
|
||||||
|
await Promise.all([encryptionProducer(), uploadConsumer()]);
|
||||||
|
|
||||||
onProgress?.({ progress: 1, rate: speedMeter() });
|
onProgress?.({ progress: 1, rate: speedMeter() });
|
||||||
};
|
};
|
||||||
|
|||||||
44
src/lib/utils/concurrency/BoundedQueue.ts
Normal file
44
src/lib/utils/concurrency/BoundedQueue.ts
Normal file
@@ -0,0 +1,44 @@
|
|||||||
|
export class BoundedQueue<T> {
|
||||||
|
private isClosed = false;
|
||||||
|
private reservedCount = 0;
|
||||||
|
private items: T[] = [];
|
||||||
|
|
||||||
|
private waitersNotFull: (() => void)[] = [];
|
||||||
|
private waitersNotEmpty: (() => void)[] = [];
|
||||||
|
|
||||||
|
constructor(private readonly maxSize: number) {}
|
||||||
|
|
||||||
|
async push(item: T) {
|
||||||
|
if (this.isClosed) {
|
||||||
|
throw new Error("Queue closed");
|
||||||
|
}
|
||||||
|
|
||||||
|
while (this.reservedCount >= this.maxSize) {
|
||||||
|
await new Promise<void>((resolve) => this.waitersNotFull.push(resolve));
|
||||||
|
if (this.isClosed) throw new Error("Queue closed");
|
||||||
|
}
|
||||||
|
|
||||||
|
this.reservedCount++;
|
||||||
|
this.items.push(item);
|
||||||
|
this.waitersNotEmpty.shift()?.();
|
||||||
|
}
|
||||||
|
|
||||||
|
async pop() {
|
||||||
|
while (this.items.length === 0) {
|
||||||
|
if (this.isClosed) return null;
|
||||||
|
await new Promise<void>((resolve) => this.waitersNotEmpty.push(resolve));
|
||||||
|
}
|
||||||
|
|
||||||
|
const item = this.items.shift()!;
|
||||||
|
this.reservedCount--;
|
||||||
|
this.waitersNotFull.shift()?.();
|
||||||
|
|
||||||
|
return item;
|
||||||
|
}
|
||||||
|
|
||||||
|
close() {
|
||||||
|
this.isClosed = true;
|
||||||
|
while (this.waitersNotEmpty.length > 0) this.waitersNotEmpty.shift()!();
|
||||||
|
while (this.waitersNotFull.length > 0) this.waitersNotFull.shift()!();
|
||||||
|
}
|
||||||
|
}
|
||||||
3
src/lib/utils/concurrency/index.ts
Normal file
3
src/lib/utils/concurrency/index.ts
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
export * from "./BoundedQueue";
|
||||||
|
export * from "./HybridPromise";
|
||||||
|
export * from "./Scheduler";
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
|
export * from "./concurrency";
|
||||||
export * from "./format";
|
export * from "./format";
|
||||||
export * from "./gotoStateful";
|
export * from "./gotoStateful";
|
||||||
export * from "./HybridPromise";
|
|
||||||
export * from "./sort";
|
export * from "./sort";
|
||||||
|
|||||||
@@ -2,9 +2,9 @@ import { limitFunction } from "p-limit";
|
|||||||
import { SvelteMap } from "svelte/reactivity";
|
import { SvelteMap } from "svelte/reactivity";
|
||||||
import { CHUNK_SIZE } from "$lib/constants";
|
import { CHUNK_SIZE } from "$lib/constants";
|
||||||
import type { FileInfo } from "$lib/modules/filesystem";
|
import type { FileInfo } from "$lib/modules/filesystem";
|
||||||
import { Scheduler } from "$lib/modules/scheduler";
|
|
||||||
import { uploadBlob } from "$lib/modules/upload";
|
import { uploadBlob } from "$lib/modules/upload";
|
||||||
import { requestFileDownload } from "$lib/services/file";
|
import { requestFileDownload } from "$lib/services/file";
|
||||||
|
import { Scheduler } from "$lib/utils";
|
||||||
import { trpc } from "$trpc/client";
|
import { trpc } from "$trpc/client";
|
||||||
|
|
||||||
export type MigrationStatus =
|
export type MigrationStatus =
|
||||||
|
|||||||
@@ -2,9 +2,9 @@ import { limitFunction } from "p-limit";
|
|||||||
import { SvelteMap } from "svelte/reactivity";
|
import { SvelteMap } from "svelte/reactivity";
|
||||||
import { storeFileThumbnailCache } from "$lib/modules/file";
|
import { storeFileThumbnailCache } from "$lib/modules/file";
|
||||||
import type { FileInfo } from "$lib/modules/filesystem";
|
import type { FileInfo } from "$lib/modules/filesystem";
|
||||||
import { Scheduler } from "$lib/modules/scheduler";
|
|
||||||
import { generateThumbnail } from "$lib/modules/thumbnail";
|
import { generateThumbnail } from "$lib/modules/thumbnail";
|
||||||
import { requestFileDownload, requestFileThumbnailUpload } from "$lib/services/file";
|
import { requestFileDownload, requestFileThumbnailUpload } from "$lib/services/file";
|
||||||
|
import { Scheduler } from "$lib/utils";
|
||||||
|
|
||||||
export type GenerationStatus =
|
export type GenerationStatus =
|
||||||
| "queued"
|
| "queued"
|
||||||
|
|||||||
Reference in New Issue
Block a user