diff --git a/src/lib/modules/file/upload.svelte.ts b/src/lib/modules/file/upload.svelte.ts index 6deac1f..7ac15ce 100644 --- a/src/lib/modules/file/upload.svelte.ts +++ b/src/lib/modules/file/upload.svelte.ts @@ -3,10 +3,10 @@ import { limitFunction } from "p-limit"; import { CHUNK_SIZE } from "$lib/constants"; import { encodeToBase64, generateDataKey, wrapDataKey, encryptString } from "$lib/modules/crypto"; import { signMessageHmac } from "$lib/modules/crypto"; -import { Scheduler } from "$lib/modules/scheduler"; import { generateThumbnail } from "$lib/modules/thumbnail"; import { uploadBlob } from "$lib/modules/upload"; import type { MasterKey, HmacSecret } from "$lib/stores"; +import { Scheduler } from "$lib/utils"; import { trpc } from "$trpc/client"; export interface FileUploadState { diff --git a/src/lib/modules/upload.ts b/src/lib/modules/upload.ts index a540e22..cab51b7 100644 --- a/src/lib/modules/upload.ts +++ b/src/lib/modules/upload.ts @@ -2,55 +2,99 @@ import axios from "axios"; import pLimit from "p-limit"; import { ENCRYPTION_OVERHEAD, CHUNK_SIZE } from "$lib/constants"; import { encryptChunk, digestMessage, encodeToBase64 } from "$lib/modules/crypto"; +import { BoundedQueue } from "$lib/utils"; interface UploadStats { progress: number; rate: number; } -const createSpeedMeter = (timeWindow = 1500) => { +interface EncryptedChunk { + index: number; + data: ArrayBuffer; + hash: string; +} + +const createSpeedMeter = (timeWindow = 3000, minInterval = 200, warmupPeriod = 500) => { const samples: { t: number; b: number }[] = []; let lastSpeed = 0; + let startTime: number | null = null; return (bytesNow?: number) => { - if (!bytesNow) return lastSpeed; + if (bytesNow === undefined) return lastSpeed; const now = performance.now(); + + // Initialize start time on first call + if (startTime === null) { + startTime = now; + } + + // Check if enough time has passed since the last sample + const lastSample = samples[samples.length - 1]; + if (lastSample && now - lastSample.t < minInterval) { + return lastSpeed; + } + samples.push({ t: now, b: bytesNow }); + // Remove old samples outside the time window const cutoff = now - timeWindow; while (samples.length > 2 && samples[0]!.t < cutoff) samples.shift(); + // Need at least 2 samples to calculate speed + if (samples.length < 2) { + return lastSpeed; + } + const first = samples[0]!; const dt = now - first.t; const db = bytesNow - first.b; - lastSpeed = dt > 0 ? (db / dt) * 1000 : 0; + if (dt >= minInterval) { + const instantSpeed = (db / dt) * 1000; + // Apply EMA for smoother speed transitions + const alpha = 0.3; + const rawSpeed = + lastSpeed === 0 ? instantSpeed : alpha * instantSpeed + (1 - alpha) * lastSpeed; + + // Apply warmup ramp to prevent initial overestimation + const elapsed = now - startTime; + const warmupWeight = Math.min(1, elapsed / warmupPeriod); + lastSpeed = rawSpeed * warmupWeight; + } + return lastSpeed; }; }; -const uploadChunk = async ( - uploadId: string, - chunkIndex: number, +const encryptChunkData = async ( chunk: Blob, dataKey: CryptoKey, +): Promise<{ data: ArrayBuffer; hash: string }> => { + const encrypted = await encryptChunk(await chunk.arrayBuffer(), dataKey); + const hash = encodeToBase64(await digestMessage(encrypted)); + return { data: encrypted, hash }; +}; + +const uploadEncryptedChunk = async ( + uploadId: string, + chunkIndex: number, + encrypted: ArrayBuffer, + hash: string, onChunkProgress: (chunkIndex: number, loaded: number) => void, ) => { - const chunkEncrypted = await encryptChunk(await chunk.arrayBuffer(), dataKey); - const chunkEncryptedHash = encodeToBase64(await digestMessage(chunkEncrypted)); - - await axios.post(`/api/upload/${uploadId}/chunks/${chunkIndex}`, chunkEncrypted, { + await axios.post(`/api/upload/${uploadId}/chunks/${chunkIndex + 1}`, encrypted, { headers: { "Content-Type": "application/octet-stream", - "Content-Digest": `sha-256=:${chunkEncryptedHash}:`, + "Content-Digest": `sha-256=:${hash}:`, }, onUploadProgress(e) { onChunkProgress(chunkIndex, e.loaded ?? 0); }, }); - onChunkProgress(chunkIndex, chunkEncrypted.byteLength); + onChunkProgress(chunkIndex, encrypted.byteLength); }; export const uploadBlob = async ( @@ -60,12 +104,14 @@ export const uploadBlob = async ( options?: { concurrency?: number; onProgress?: (s: UploadStats) => void }, ) => { const onProgress = options?.onProgress; + const networkConcurrency = options?.concurrency ?? 4; + const maxQueueSize = 8; const totalChunks = Math.ceil(blob.size / CHUNK_SIZE); const totalBytes = blob.size + totalChunks * ENCRYPTION_OVERHEAD; const uploadedByChunk = new Array(totalChunks).fill(0); - const speedMeter = createSpeedMeter(1500); + const speedMeter = createSpeedMeter(3000, 200); const emit = () => { if (!onProgress) return; @@ -82,21 +128,56 @@ export const uploadBlob = async ( emit(); }; - const limit = pLimit(options?.concurrency ?? 4); + const queue = new BoundedQueue(maxQueueSize); + let encryptionError: Error | null = null; - await Promise.all( - Array.from({ length: totalChunks }, (_, i) => - limit(() => - uploadChunk( - uploadId, - i + 1, - blob.slice(i * CHUNK_SIZE, (i + 1) * CHUNK_SIZE), - dataKey, - onChunkProgress, - ), - ), - ), - ); + // Producer: encrypt chunks and push to queue + const encryptionProducer = async () => { + try { + for (let i = 0; i < totalChunks; i++) { + const chunk = blob.slice(i * CHUNK_SIZE, (i + 1) * CHUNK_SIZE); + const { data, hash } = await encryptChunkData(chunk, dataKey); + await queue.push({ index: i, data, hash }); + } + } catch (e) { + encryptionError = e instanceof Error ? e : new Error(String(e)); + } finally { + queue.close(); + } + }; + + // Consumer: upload chunks from queue with concurrency limit + const uploadConsumer = async () => { + const limit = pLimit(networkConcurrency); + const activeTasks = new Set>(); + + while (true) { + const item = await queue.pop(); + if (item === null) break; + if (encryptionError) throw encryptionError; + + const task = limit(async () => { + try { + await uploadEncryptedChunk(uploadId, item.index, item.data, item.hash, onChunkProgress); + } finally { + // @ts-ignore + item.data = null; + } + }); + + activeTasks.add(task); + task.finally(() => activeTasks.delete(task)); + + if (activeTasks.size >= networkConcurrency) { + await Promise.race(activeTasks); + } + } + + await Promise.all(activeTasks); + }; + + // Run producer and consumer concurrently + await Promise.all([encryptionProducer(), uploadConsumer()]); onProgress?.({ progress: 1, rate: speedMeter() }); }; diff --git a/src/lib/utils/concurrency/BoundedQueue.ts b/src/lib/utils/concurrency/BoundedQueue.ts new file mode 100644 index 0000000..5970914 --- /dev/null +++ b/src/lib/utils/concurrency/BoundedQueue.ts @@ -0,0 +1,44 @@ +export class BoundedQueue { + private isClosed = false; + private reservedCount = 0; + private items: T[] = []; + + private waitersNotFull: (() => void)[] = []; + private waitersNotEmpty: (() => void)[] = []; + + constructor(private readonly maxSize: number) {} + + async push(item: T) { + if (this.isClosed) { + throw new Error("Queue closed"); + } + + while (this.reservedCount >= this.maxSize) { + await new Promise((resolve) => this.waitersNotFull.push(resolve)); + if (this.isClosed) throw new Error("Queue closed"); + } + + this.reservedCount++; + this.items.push(item); + this.waitersNotEmpty.shift()?.(); + } + + async pop() { + while (this.items.length === 0) { + if (this.isClosed) return null; + await new Promise((resolve) => this.waitersNotEmpty.push(resolve)); + } + + const item = this.items.shift()!; + this.reservedCount--; + this.waitersNotFull.shift()?.(); + + return item; + } + + close() { + this.isClosed = true; + while (this.waitersNotEmpty.length > 0) this.waitersNotEmpty.shift()!(); + while (this.waitersNotFull.length > 0) this.waitersNotFull.shift()!(); + } +} diff --git a/src/lib/utils/HybridPromise.ts b/src/lib/utils/concurrency/HybridPromise.ts similarity index 100% rename from src/lib/utils/HybridPromise.ts rename to src/lib/utils/concurrency/HybridPromise.ts diff --git a/src/lib/modules/scheduler.ts b/src/lib/utils/concurrency/Scheduler.ts similarity index 100% rename from src/lib/modules/scheduler.ts rename to src/lib/utils/concurrency/Scheduler.ts diff --git a/src/lib/utils/concurrency/index.ts b/src/lib/utils/concurrency/index.ts new file mode 100644 index 0000000..59fe81d --- /dev/null +++ b/src/lib/utils/concurrency/index.ts @@ -0,0 +1,3 @@ +export * from "./BoundedQueue"; +export * from "./HybridPromise"; +export * from "./Scheduler"; diff --git a/src/lib/utils/index.ts b/src/lib/utils/index.ts index 5d5b9d4..4c576d5 100644 --- a/src/lib/utils/index.ts +++ b/src/lib/utils/index.ts @@ -1,4 +1,4 @@ +export * from "./concurrency"; export * from "./format"; export * from "./gotoStateful"; -export * from "./HybridPromise"; export * from "./sort"; diff --git a/src/routes/(fullscreen)/settings/migration/service.svelte.ts b/src/routes/(fullscreen)/settings/migration/service.svelte.ts index dfb0edd..1bdf869 100644 --- a/src/routes/(fullscreen)/settings/migration/service.svelte.ts +++ b/src/routes/(fullscreen)/settings/migration/service.svelte.ts @@ -2,9 +2,9 @@ import { limitFunction } from "p-limit"; import { SvelteMap } from "svelte/reactivity"; import { CHUNK_SIZE } from "$lib/constants"; import type { FileInfo } from "$lib/modules/filesystem"; -import { Scheduler } from "$lib/modules/scheduler"; import { uploadBlob } from "$lib/modules/upload"; import { requestFileDownload } from "$lib/services/file"; +import { Scheduler } from "$lib/utils"; import { trpc } from "$trpc/client"; export type MigrationStatus = diff --git a/src/routes/(fullscreen)/settings/thumbnail/service.ts b/src/routes/(fullscreen)/settings/thumbnail/service.ts index fdf0303..5c4c61d 100644 --- a/src/routes/(fullscreen)/settings/thumbnail/service.ts +++ b/src/routes/(fullscreen)/settings/thumbnail/service.ts @@ -2,9 +2,9 @@ import { limitFunction } from "p-limit"; import { SvelteMap } from "svelte/reactivity"; import { storeFileThumbnailCache } from "$lib/modules/file"; import type { FileInfo } from "$lib/modules/filesystem"; -import { Scheduler } from "$lib/modules/scheduler"; import { generateThumbnail } from "$lib/modules/thumbnail"; import { requestFileDownload, requestFileThumbnailUpload } from "$lib/services/file"; +import { Scheduler } from "$lib/utils"; export type GenerationStatus = | "queued"