diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 8e76abf1..002b2ba5 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1040,30 +1040,35 @@ packages: engines: {node: '>= 10'} cpu: [arm64] os: [linux] + libc: [glibc] '@napi-rs/canvas-linux-arm64-musl@0.1.74': resolution: {integrity: sha512-tfFqLHGtSEabBigOnPUfZviSTGmW2xHv5tYZYPBWmgGiTkoNJ7lEWFUxHjwvV5HXGqLs8ok/O7g1enSpxO6lmQ==} engines: {node: '>= 10'} cpu: [arm64] os: [linux] + libc: [musl] '@napi-rs/canvas-linux-riscv64-gnu@0.1.74': resolution: {integrity: sha512-j6H9dHTMtr1y3tu/zGm1ythYIL9vTl4EEv9f6CMx0n3Zn2M+OruUUwh9ylCj4afzSNEK9T8cr6zMnmTPzkpBvQ==} engines: {node: '>= 10'} cpu: [riscv64] os: [linux] + libc: [glibc] '@napi-rs/canvas-linux-x64-gnu@0.1.74': resolution: {integrity: sha512-73DIV4E7Y9CpIJuUXVl9H6+MEQXyRy4VJQoUGA1tOlcKQiStxqhq6UErL4decI28NxjyQXBhtYZKj5q8AJEuOg==} engines: {node: '>= 10'} cpu: [x64] os: [linux] + libc: [glibc] '@napi-rs/canvas-linux-x64-musl@0.1.74': resolution: {integrity: sha512-FgDMEFdGIJT3I2xejflRJ82/ZgDphyirS43RgtoLaIXI6zihLiZcQ7rczpqeWgAwlJNjR0He2EustsKe1SkUOg==} engines: {node: '>= 10'} cpu: [x64] os: [linux] + libc: [musl] '@napi-rs/canvas-win32-x64-msvc@0.1.74': resolution: {integrity: sha512-x6bhwlhn0wU7dfiP46mt5Bi6PowSUH4CJ4PTzGj58LRQ1HVasEIJgoMx7MLC48F738eJpzbfg3WR/D8+e9CeTA==} @@ -1117,56 +1122,67 @@ packages: resolution: {integrity: sha512-RkdOTu2jK7brlu+ZwjMIZfdV2sSYHK2qR08FUWcIoqJC2eywHbXr0L8T/pONFwkGukQqERDheaGTeedG+rra6Q==} cpu: [arm] os: [linux] + libc: [glibc] '@rollup/rollup-linux-arm-musleabihf@4.45.1': resolution: {integrity: sha512-3kJ8pgfBt6CIIr1o+HQA7OZ9mp/zDk3ctekGl9qn/pRBgrRgfwiffaUmqioUGN9hv0OHv2gxmvdKOkARCtRb8Q==} cpu: [arm] os: [linux] + libc: [musl] '@rollup/rollup-linux-arm64-gnu@4.45.1': resolution: {integrity: sha512-k3dOKCfIVixWjG7OXTCOmDfJj3vbdhN0QYEqB+OuGArOChek22hn7Uy5A/gTDNAcCy5v2YcXRJ/Qcnm4/ma1xw==} cpu: [arm64] os: [linux] + libc: [glibc] '@rollup/rollup-linux-arm64-musl@4.45.1': resolution: {integrity: sha512-PmI1vxQetnM58ZmDFl9/Uk2lpBBby6B6rF4muJc65uZbxCs0EA7hhKCk2PKlmZKuyVSHAyIw3+/SiuMLxKxWog==} cpu: [arm64] os: [linux] + libc: [musl] '@rollup/rollup-linux-loongarch64-gnu@4.45.1': resolution: {integrity: sha512-9UmI0VzGmNJ28ibHW2GpE2nF0PBQqsyiS4kcJ5vK+wuwGnV5RlqdczVocDSUfGX/Na7/XINRVoUgJyFIgipoRg==} cpu: [loong64] os: [linux] + libc: [glibc] '@rollup/rollup-linux-powerpc64le-gnu@4.45.1': resolution: {integrity: sha512-7nR2KY8oEOUTD3pBAxIBBbZr0U7U+R9HDTPNy+5nVVHDXI4ikYniH1oxQz9VoB5PbBU1CZuDGHkLJkd3zLMWsg==} cpu: [ppc64] os: [linux] + libc: [glibc] '@rollup/rollup-linux-riscv64-gnu@4.45.1': resolution: {integrity: sha512-nlcl3jgUultKROfZijKjRQLUu9Ma0PeNv/VFHkZiKbXTBQXhpytS8CIj5/NfBeECZtY2FJQubm6ltIxm/ftxpw==} cpu: [riscv64] os: [linux] + libc: [glibc] '@rollup/rollup-linux-riscv64-musl@4.45.1': resolution: {integrity: sha512-HJV65KLS51rW0VY6rvZkiieiBnurSzpzore1bMKAhunQiECPuxsROvyeaot/tcK3A3aGnI+qTHqisrpSgQrpgA==} cpu: [riscv64] os: [linux] + libc: [musl] '@rollup/rollup-linux-s390x-gnu@4.45.1': resolution: {integrity: sha512-NITBOCv3Qqc6hhwFt7jLV78VEO/il4YcBzoMGGNxznLgRQf43VQDae0aAzKiBeEPIxnDrACiMgbqjuihx08OOw==} cpu: [s390x] os: [linux] + libc: [glibc] '@rollup/rollup-linux-x64-gnu@4.45.1': resolution: {integrity: sha512-+E/lYl6qu1zqgPEnTrs4WysQtvc/Sh4fC2nByfFExqgYrqkKWp1tWIbe+ELhixnenSpBbLXNi6vbEEJ8M7fiHw==} cpu: [x64] os: [linux] + libc: [glibc] '@rollup/rollup-linux-x64-musl@4.45.1': resolution: {integrity: sha512-a6WIAp89p3kpNoYStITT9RbTbTnqarU7D8N8F2CV+4Cl9fwCOZraLVuVFvlpsW0SbIiYtEnhCZBPLoNdRkjQFw==} cpu: [x64] os: [linux] + libc: [musl] '@rollup/rollup-win32-arm64-msvc@4.45.1': resolution: {integrity: sha512-T5Bi/NS3fQiJeYdGvRpTAP5P02kqSOpqiopwhj0uaXB6nzs5JVi2XMJb18JUSKhCOX8+UE1UKQufyD6Or48dJg==} @@ -1287,24 +1303,28 @@ packages: engines: {node: '>=10'} cpu: [arm64] os: [linux] + libc: [glibc] '@swc/core-linux-arm64-musl@1.12.14': resolution: {integrity: sha512-ZkOOIpSMXuPAjfOXEIAEQcrPOgLi6CaXvA5W+GYnpIpFG21Nd0qb0WbwFRv4K8BRtl993Q21v0gPpOaFHU+wdA==} engines: {node: '>=10'} cpu: [arm64] os: [linux] + libc: [musl] '@swc/core-linux-x64-gnu@1.12.14': resolution: {integrity: sha512-71EPPccwJiJUxd2aMwNlTfom2mqWEWYGdbeTju01tzSHsEuD7E6ePlgC3P3ngBqB3urj41qKs87z7zPOswT5Iw==} engines: {node: '>=10'} cpu: [x64] os: [linux] + libc: [glibc] '@swc/core-linux-x64-musl@1.12.14': resolution: {integrity: sha512-nImF1hZJqKTcl0WWjHqlelOhvuB9rU9kHIw/CmISBUZXogjLIvGyop1TtJNz0ULcz2Oxr3Q2YpwfrzsgvgbGkA==} engines: {node: '>=10'} cpu: [x64] os: [linux] + libc: [musl] '@swc/core-win32-arm64-msvc@1.12.14': resolution: {integrity: sha512-sABFQFxSuStFoxvEWZUHWYldtB1B4A9eDNFd4Ty50q7cemxp7uoscFoaCqfXSGNBwwBwpS5EiPB6YN4y6hqmLQ==} diff --git a/src/lang/en/home.json b/src/lang/en/home.json index caa04855..f8574bac 100644 --- a/src/lang/en/home.json +++ b/src/lang/en/home.json @@ -136,10 +136,12 @@ "pending": "Pending", "uploading": "Uploading", "backending": "Uploading in the backend", + "tasked": "Successfully added to the task", "success": "Success", "error": "Error", "back": "Back to Upload", - "clear_done": "Clear Done" + "clear_done": "Clear Done", + "slice_upload": "Slice upload" }, "local_settings": { "aria2_rpc_url": "Aria2 RPC URL", diff --git a/src/pages/home/uploads/Upload.tsx b/src/pages/home/uploads/Upload.tsx index 26381aeb..39f0cb05 100644 --- a/src/pages/home/uploads/Upload.tsx +++ b/src/pages/home/uploads/Upload.tsx @@ -86,7 +86,7 @@ const Upload = () => { }) const allDone = () => { return uploadFiles.uploads.every(({ status }) => - ["success", "error"].includes(status), + ["success", "error", "tasked"].includes(status), ) } let fileInput: HTMLInputElement @@ -124,7 +124,7 @@ const Upload = () => { rapid(), ) if (!err) { - setUpload(path, "status", "success") + setUpload(path, "status", asTask() ? "tasked" : "success") setUpload(path, "progress", 100) } else { setUpload(path, "status", "error") @@ -148,7 +148,8 @@ const Upload = () => { onClick={() => { setUploadFiles("uploads", (_uploads) => _uploads.filter( - ({ status }) => !["success", "error"].includes(status), + ({ status }) => + !["success", "error", "tasked"].includes(status), ), ) console.log(uploadFiles.uploads) @@ -304,6 +305,7 @@ const Upload = () => { > {t("home.upload.add_as_task")} + { @@ -312,14 +314,16 @@ const Upload = () => { > {t("home.conflict_policy.overwrite_existing")} - { - setRapid(!rapid()) - }} - > - {t("home.upload.try_rapid")} - + + { + setRapid(!rapid()) + }} + > + {t("home.upload.try_rapid")} + + diff --git a/src/pages/home/uploads/slice_upload.ts b/src/pages/home/uploads/slice_upload.ts new file mode 100644 index 00000000..92113d2e --- /dev/null +++ b/src/pages/home/uploads/slice_upload.ts @@ -0,0 +1,688 @@ +import { password } from "~/store" +import { EmptyResp } from "~/types" +import { r, pathDir } from "~/utils" +import { SetUpload, Upload } from "./types" +import { + calculateHash, + calculateSliceHash, + fsUploadInfo, + fsPreup, + FsSliceupComplete, + HashType, +} from "./util" +import createMutex from "~/utils/mutex" + +const RETRY_CONFIG = { + maxRetries: 15, + retryDelay: 1000, + maxDelay: 30000, + backoffMultiplier: 2, + nativeSliceRetries: 8, +} + +enum UploadErrorType { + NETWORK_ERROR = "network_error", + SERVER_ERROR = "server_error", + FILE_ERROR = "file_error", + CANCEL_ERROR = "cancel_error", + TIMEOUT_ERROR = "timeout_error", + HASH_ERROR = "hash_error", + MEMORY_ERROR = "memory_error", +} + +class UploadError extends Error { + public type: UploadErrorType + public statusCode?: number + public retryable: boolean + public userMessage: string + + constructor( + type: UploadErrorType, + message: string, + userMessage: string, + statusCode?: number, + retryable: boolean = true, + ) { + super(message) + this.type = type + this.statusCode = statusCode + this.retryable = retryable + this.userMessage = userMessage + this.name = "UploadError" + } + + static fromAxiosError(error: any, chunkIndex?: number): UploadError { + const chunkMsg = + chunkIndex !== undefined ? `分片 ${chunkIndex + 1}` : "文件" + + if (error.code === "ECONNABORTED" || error.message?.includes("timeout")) { + return new UploadError( + UploadErrorType.TIMEOUT_ERROR, + `Upload timeout: ${error.message}`, + `${chunkMsg}上传超时,请检查网络连接`, + error.response?.status, + true, + ) + } + + if (!error.response) { + return new UploadError( + UploadErrorType.NETWORK_ERROR, + `Network error: ${error.message}`, + `网络连接失败,请检查网络状态`, + undefined, + true, + ) + } + + const status = error.response.status + const data = error.response.data + + if (status >= 500) { + return new UploadError( + UploadErrorType.SERVER_ERROR, + `Server error ${status}: ${data?.message || error.message}`, + `服务器暂时不可用 (${status}),正在重试...`, + status, + true, + ) + } else if (status === 413) { + return new UploadError( + UploadErrorType.FILE_ERROR, + `File too large: ${data?.message || error.message}`, + `${chunkMsg}过大,请选择较小的文件`, + status, + false, + ) + } else if (status === 401 || status === 403) { + return new UploadError( + UploadErrorType.SERVER_ERROR, + `Authorization failed: ${data?.message || error.message}`, + `认证失败,请重新登录`, + status, + false, + ) + } else { + return new UploadError( + UploadErrorType.SERVER_ERROR, + `HTTP ${status}: ${data?.message || error.message}`, + `上传失败 (${status}),${data?.message || "未知错误"}`, + status, + status >= 400 && status < 500 ? false : true, + ) + } + } + + static fromGenericError(error: any, context: string = ""): UploadError { + if (error instanceof UploadError) { + return error + } + + const message = error.message || String(error) + if (message.includes("memory") || message.includes("Memory")) { + return new UploadError( + UploadErrorType.MEMORY_ERROR, + `Memory error in ${context}: ${message}`, + `内存不足,请关闭其他程序或选择较小的文件`, + undefined, + false, + ) + } + + return new UploadError( + UploadErrorType.FILE_ERROR, + `${context} error: ${message}`, + `文件处理出错: ${message}`, + undefined, + false, + ) + } +} + +interface UploadProgress { + uploadedBytes: number + totalBytes: number + percentage: number + speed: number // bytes per second + remainingTime: number // seconds + activeChunks: number + completedChunks: number + totalChunks: number + lastError?: UploadError + stage: + | "preparing" + | "hashing" + | "uploading" + | "completing" + | "completed" + | "error" +} + +const progressMutex = createMutex() + +const retryWithBackoff = async ( + fn: () => Promise, + maxRetries: number = RETRY_CONFIG.maxRetries, + delay: number = RETRY_CONFIG.retryDelay, + context: string = "operation", +): Promise => { + let lastError: Error + + for (let i = 0; i <= maxRetries; i++) { + try { + return await fn() + } catch (error) { + lastError = error as Error + + if (i === maxRetries) { + throw lastError + } + + // Calculate delay time with exponential backoff + const waitTime = Math.min( + delay * Math.pow(RETRY_CONFIG.backoffMultiplier, i), + RETRY_CONFIG.maxDelay, + ) + + console.log( + `${context} failed, retrying in ${waitTime / 1000} seconds (${i + 1}/${maxRetries}):`, + (error as any) instanceof UploadError + ? (error as UploadError).userMessage + : (error as Error).message, + ) + + await new Promise((resolve) => setTimeout(resolve, waitTime)) + } + } + throw lastError! +} + +// Upload state management +interface UploadState { + isPaused: boolean + isCancelled: boolean + totalBytes: number + uploadedBytes: number + completedChunks: number + totalChunks: number + activeChunks: number + speed: number + lastError?: UploadError + onProgress?: (progress: UploadProgress) => void +} + +export const SliceUpload: Upload = async ( + uploadPath: string, + file: File, + setUpload: SetUpload, + asTask = false, + overwrite = false, +): Promise => { + let hashtype: string = HashType.Md5 + let slicehash: string[] = [] + let sliceupstatus: Uint8Array + let ht: string[] = [] + + let taskInfo: { + taskId: string + hash: any + sliceSize: number + sliceCnt: number + } | null = null + + // 初始化上传状态 + const state: UploadState = { + isPaused: false, + isCancelled: false, + totalBytes: file.size, + uploadedBytes: 0, + completedChunks: 0, + totalChunks: 0, + activeChunks: 0, + speed: 0, + } + + uploadQueue.addUpload(uploadPath, state) + + let speedInterval: any + const cleanup = () => { + if (speedInterval) { + clearInterval(speedInterval) + } + uploadQueue.removeUpload(uploadPath) + } + + const dir = pathDir(uploadPath) + + const resp = await fsUploadInfo(dir) + if (resp.code != 200) { + cleanup() + return new Error(`Upload info failed: ${resp.code} - ${resp.message}`) + } + + // hash计算 + if (resp.data.hash_md5_need) { + ht.push(HashType.Md5) + hashtype = HashType.Md5 + } + if (resp.data.hash_sha1_need) { + ht.push(HashType.Sha1) + hashtype = HashType.Sha1 + } + if (resp.data.hash_md5_256kb_need) { + ht.push(HashType.Md5256kb) + } + const hash = await calculateHash(file, ht) + const resp1 = await fsPreup( + dir, + file.name, + file.size, + hash, + overwrite, + asTask, + ) + if (resp1.code != 200) { + cleanup() + return new Error(`Preup failed: ${resp1.code} - ${resp1.message}`) + } + + state.totalChunks = resp1.data.slice_cnt + + taskInfo = { + taskId: resp1.data.task_id, + hash, + sliceSize: resp1.data.slice_size, + sliceCnt: resp1.data.slice_cnt, + } + + if (resp1.data.reuse) { + setUpload("progress", "100") + setUpload("status", "success") + setUpload("speed", "0") + cleanup() + return + } + if (resp.data.slice_hash_need) { + slicehash = await calculateSliceHash(file, resp1.data.slice_size, hashtype) + } + sliceupstatus = base64ToUint8Array(resp1.data.slice_upload_status) + + let lastTimestamp = Date.now() + let lastUploadedBytes = 0 + let completeFlag = false + + for (let i = 0; i < resp1.data.slice_cnt; i++) { + if (isSliceUploaded(sliceupstatus, i)) { + state.uploadedBytes += Math.min( + resp1.data.slice_size, + state.totalBytes - i * resp1.data.slice_size, + ) + } + } + + const uploadChunk = async ( + chunk: Blob, + idx: number, + slice_hash: string, + task_id: string, + ) => { + if (state.isCancelled) { + throw new UploadError( + UploadErrorType.CANCEL_ERROR, + "Upload cancelled by user", + "上传已取消", + undefined, + false, + ) + } + + while (state.isPaused && !state.isCancelled) { + await new Promise((resolve) => setTimeout(resolve, 100)) + } + let oldLoaded = 0 + + return retryWithBackoff( + async () => { + try { + const slice = chunk.slice(0, chunk.size) + const resp: EmptyResp = await r.put("/fs/slice_upload", slice, { + headers: { + "File-Path": encodeURIComponent(dir), + "X-Task-ID": task_id, + "X-Slice-Num": idx.toString(), + "X-Slice-Hash": slice_hash, + Password: password(), + }, + onUploadProgress: async (progressEvent: any) => { + if (!progressEvent.lengthComputable || state.isCancelled) { + return + } + const release = await progressMutex.acquire() + try { + const sliceuploaded = progressEvent.loaded - oldLoaded + state.uploadedBytes += sliceuploaded + oldLoaded = progressEvent.loaded + + state.completedChunks = Math.floor( + state.uploadedBytes / (state.totalBytes / state.totalChunks), + ) + + const progress = Math.min( + 100, + ((state.uploadedBytes / state.totalBytes) * 100) | 0, + ) + setUpload("progress", progress) + } finally { + progressMutex.release() + } + }, + }) + + if (resp.code != 200) { + throw new UploadError( + UploadErrorType.SERVER_ERROR, + `Slice upload failed: ${resp.code} - ${resp.message}`, + `分片 ${idx + 1} 上传失败: ${resp.message || "服务器错误"}`, + resp.code, + resp.code >= 500, + ) + } + return resp + } catch (err: any) { + // Convert to structured error + const uploadError = + err instanceof UploadError + ? err + : UploadError.fromAxiosError(err, idx) + + // Record last error + state.lastError = uploadError + + console.error( + `Slice ${idx + 1} upload failed:`, + uploadError.userMessage, + ) + throw uploadError + } + }, + RETRY_CONFIG.maxRetries, + RETRY_CONFIG.retryDelay, + `slice_${idx + 1}_upload`, + ) + } + + speedInterval = setInterval(() => { + if (completeFlag || state.isCancelled) { + clearInterval(speedInterval) + return + } + + const intervalLoaded = state.uploadedBytes - lastUploadedBytes + if (intervalLoaded < 1000) { + return + } + const speed = intervalLoaded / ((Date.now() - lastTimestamp) / 1000) + const complete = Math.min( + 100, + ((state.uploadedBytes / state.totalBytes) * 100) | 0, + ) + setUpload("speed", speed) + setUpload("progress", complete) + lastTimestamp = Date.now() + lastUploadedBytes = state.uploadedBytes + }, 1000) + + lastTimestamp = Date.now() + + if (!isSliceUploaded(sliceupstatus, 0)) { + const chunk = file.slice(0, resp1.data.slice_size) + try { + await uploadChunk( + chunk, + 0, + slicehash.length == 0 ? "" : slicehash.join(","), + resp1.data.task_id, + ) + } catch (err) { + completeFlag = true + setUpload("status", "error") + setUpload("speed", 0) + return err as Error + } + } else { + state.uploadedBytes += Math.min(resp1.data.slice_size, state.totalBytes) + } + + const concurrentLimit = 3 // 固定3个并发 + console.log( + `File size: ${(file.size / 1024 / 1024).toFixed(2)}MB, using ${concurrentLimit} concurrent uploads`, + ) + + const pendingSlices: number[] = [] + for (let i = 1; i < resp1.data.slice_cnt; i++) { + if (!isSliceUploaded(sliceupstatus, i)) { + pendingSlices.push(i) + } + } + + const errors: Error[] = [] + let currentIndex = 0 + + const processNextSlice = async (): Promise => { + while (currentIndex < pendingSlices.length) { + const sliceIndex = pendingSlices[currentIndex++] + + try { + const chunk = file.slice( + sliceIndex * resp1.data.slice_size, + (sliceIndex + 1) * resp1.data.slice_size, + ) + await uploadChunk( + chunk, + sliceIndex, + slicehash.length == 0 ? "" : slicehash[sliceIndex], + resp1.data.task_id, + ) + } catch (err) { + errors.push(err as Error) + } + } + } + + const tasks: Promise[] = [] + for (let i = 0; i < Math.min(concurrentLimit, pendingSlices.length); i++) { + tasks.push(processNextSlice()) + } + + await Promise.all(tasks) + + if (errors.length > 0) { + setUpload( + "progress", + Math.min(100, ((state.uploadedBytes / state.totalBytes) * 100) | 0), + ) + setUpload("status", "error") + cleanup() + + const serverErrors = errors.filter( + (e) => + e instanceof UploadError && e.type === UploadErrorType.SERVER_ERROR, + ) + const networkErrors = errors.filter( + (e) => + e instanceof UploadError && e.type === UploadErrorType.NETWORK_ERROR, + ) + + if (serverErrors.length > 0) { + return serverErrors[0] + } else if (networkErrors.length > 0) { + return networkErrors[0] + } else { + return errors[0] + } + } else { + if (!asTask) { + setUpload("status", "backending") + } + + try { + const resp = await retryWithBackoff( + () => FsSliceupComplete(dir, resp1.data.task_id), + RETRY_CONFIG.maxRetries, + RETRY_CONFIG.retryDelay, + "upload_complete", + ) + + completeFlag = true + cleanup() + + if (resp.code != 200) { + return new UploadError( + UploadErrorType.SERVER_ERROR, + `Upload complete failed: ${resp.code} - ${resp.message}`, + `上传完成确认失败: ${resp.message}`, + resp.code, + resp.code >= 500, + ) + } else if (resp.data.complete == 0) { + return new UploadError( + UploadErrorType.SERVER_ERROR, + "slice missing, please reupload", + "文件分片缺失,请重新上传", + undefined, + true, + ) + } + + return + } catch (error) { + cleanup() + return error instanceof UploadError + ? error + : UploadError.fromGenericError(error, "upload_complete") + } + } +} + +const base64ToUint8Array = (base64: string): Uint8Array => { + const binary = atob(base64) + const len = binary.length + const bytes = new Uint8Array(len) + for (let i = 0; i < len; i++) { + bytes[i] = binary.charCodeAt(i) + } + return bytes +} + +const isSliceUploaded = (status: Uint8Array, idx: number): boolean => { + // const bytes = base64ToUint8Array(statusBase64) + const byteIdx = Math.floor(idx / 8) + const bitIdx = idx % 8 + if (byteIdx >= status.length) return false + return (status[byteIdx] & (1 << bitIdx)) !== 0 +} + +// 上传队列管理 +class UploadQueue { + private static instance: UploadQueue + private uploads: Map = new Map() + + static getInstance(): UploadQueue { + if (!UploadQueue.instance) { + UploadQueue.instance = new UploadQueue() + } + return UploadQueue.instance + } + + addUpload(uploadPath: string, state: UploadState): void { + this.uploads.set(uploadPath, state) + } + + pauseUpload(uploadPath: string): void { + const state = this.uploads.get(uploadPath) + if (state) { + state.isPaused = true + } + } + + resumeUpload(uploadPath: string): void { + const state = this.uploads.get(uploadPath) + if (state) { + state.isPaused = false + } + } + + cancelUpload(uploadPath: string): void { + const state = this.uploads.get(uploadPath) + if (state) { + state.isCancelled = true + } + } + + removeUpload(uploadPath: string): void { + this.uploads.delete(uploadPath) + } + + getUploadState(uploadPath: string): UploadState | undefined { + return this.uploads.get(uploadPath) + } + + getAllUploads(): Array<{ path: string; state: UploadState }> { + return Array.from(this.uploads.entries()).map(([path, state]) => ({ + path, + state, + })) + } +} + +export const uploadQueue = UploadQueue.getInstance() + +export const pauseUpload = (uploadPath: string) => + uploadQueue.pauseUpload(uploadPath) +export const resumeUpload = (uploadPath: string) => + uploadQueue.resumeUpload(uploadPath) +export const cancelUpload = (uploadPath: string) => + uploadQueue.cancelUpload(uploadPath) + +export { UploadError, UploadErrorType } +export type { UploadProgress } + +export const getUploadDetails = ( + uploadPath: string, +): { + state?: UploadState + progress?: UploadProgress + errorMessage?: string +} => { + const state = uploadQueue.getUploadState(uploadPath) + if (!state) return {} + + const progress: UploadProgress = { + uploadedBytes: state.uploadedBytes, + totalBytes: state.totalBytes, + percentage: Math.min( + 100, + ((state.uploadedBytes / state.totalBytes) * 100) | 0, + ), + speed: state.speed, + remainingTime: + state.speed > 0 + ? (state.totalBytes - state.uploadedBytes) / state.speed + : 0, + activeChunks: state.activeChunks, + completedChunks: state.completedChunks, + totalChunks: state.totalChunks, + lastError: state.lastError, + stage: state.isCancelled + ? "error" + : state.uploadedBytes >= state.totalBytes + ? "completed" + : "uploading", + } + + return { + state, + progress, + errorMessage: state.lastError?.userMessage, + } +} diff --git a/src/pages/home/uploads/stream.ts b/src/pages/home/uploads/stream.ts index 72980cc1..9369b0ad 100644 --- a/src/pages/home/uploads/stream.ts +++ b/src/pages/home/uploads/stream.ts @@ -2,7 +2,7 @@ import { password } from "~/store" import { EmptyResp } from "~/types" import { r } from "~/utils" import { SetUpload, Upload } from "./types" -import { calculateHash } from "./util" +import { calculateHash, HashType } from "./util" export const StreamUpload: Upload = async ( uploadPath: string, file: File, @@ -22,10 +22,14 @@ export const StreamUpload: Upload = async ( Overwrite: overwrite.toString(), } if (rapid) { - const { md5, sha1, sha256 } = await calculateHash(file) - headers["X-File-Md5"] = md5 - headers["X-File-Sha1"] = sha1 - headers["X-File-Sha256"] = sha256 + const hash = await calculateHash(file, [ + HashType.Md5, + HashType.Sha1, + HashType.Sha256, + ]) + headers["X-File-Md5"] = hash.md5 + headers["X-File-Sha1"] = hash.sha1 + headers["X-File-Sha256"] = hash.sha256 } const resp: EmptyResp = await r.put("/fs/put", file, { headers: headers, diff --git a/src/pages/home/uploads/types.ts b/src/pages/home/uploads/types.ts index ade472f7..fa8534fc 100644 --- a/src/pages/home/uploads/types.ts +++ b/src/pages/home/uploads/types.ts @@ -12,6 +12,7 @@ export const StatusBadge = { pending: "neutral", uploading: "info", backending: "info", + tasked: "info", success: "success", error: "danger", } as const @@ -24,3 +25,10 @@ export type Upload = ( overwrite: boolean, rapid: boolean, ) => Promise + +export type HashInfo = { + md5: string + md5_256kb: string + sha1: string + sha256: string +} diff --git a/src/pages/home/uploads/uploads.ts b/src/pages/home/uploads/uploads.ts index 86b34053..47ad9634 100644 --- a/src/pages/home/uploads/uploads.ts +++ b/src/pages/home/uploads/uploads.ts @@ -2,6 +2,7 @@ import { objStore } from "~/store" import { FormUpload } from "./form" import { StreamUpload } from "./stream" import { Upload } from "./types" +import { SliceUpload } from "./slice_upload" type Uploader = { upload: Upload @@ -20,6 +21,11 @@ const AllUploads: Uploader[] = [ upload: FormUpload, provider: /.*/, }, + { + name: "Slice", + upload: SliceUpload, + provider: /.*/, + }, ] export const getUploads = (): Pick[] => { diff --git a/src/pages/home/uploads/util.ts b/src/pages/home/uploads/util.ts index ee7fa88c..2d9dd56e 100644 --- a/src/pages/home/uploads/util.ts +++ b/src/pages/home/uploads/util.ts @@ -1,5 +1,7 @@ -import { UploadFileProps } from "./types" +import { HashInfo, UploadFileProps } from "./types" +import { FsUpinfoResp, FsPreupResp, FsSliceupCompleteResp } from "~/types" import { createMD5, createSHA1, createSHA256 } from "hash-wasm" +import { r } from "~/utils" export const traverseFileTree = async (entry: FileSystemEntry) => { let res: File[] = [] @@ -50,6 +52,57 @@ export const traverseFileTree = async (entry: FileSystemEntry) => { return res } +export const fsUploadInfo = (path: string = "/"): Promise => { + return r.get("/fs/upload/info", { + headers: { + "File-Path": encodeURIComponent(path), + }, + }) +} + +export const fsPreup = async ( + path: string, + name: string, + size: number, + hash: HashInfo, + overwrite: boolean, + as_task: boolean, +): Promise => { + return r.post( + "/fs/preup", + { + path, + name, + size, + hash, + overwrite, + as_task, + }, + { + headers: { + "File-Path": encodeURIComponent(path), + }, + }, + ) +} + +export const FsSliceupComplete = async ( + path: string, + task_id: string, +): Promise => { + return r.post( + "/fs/slice_upload_complete", + { + task_id, + }, + { + headers: { + "File-Path": encodeURIComponent(path), + }, + }, + ) +} + export const File2Upload = (file: File): UploadFileProps => { return { name: file.name, @@ -61,24 +114,107 @@ export const File2Upload = (file: File): UploadFileProps => { } } -export const calculateHash = async (file: File) => { - const md5Digest = await createMD5() - const sha1Digest = await createSHA1() - const sha256Digest = await createSHA256() +export enum HashType { + Md5 = "md5", + Md5256kb = "md5_256kb", + Sha1 = "sha1", + Sha256 = "sha256", +} + +export const calculateHash = async ( + file: File, + hashType: string[] = [HashType.Md5], +) => { + let md5Digest: any, md5256kbDigest: any, sha1Digest: any, sha256Digest: any + let hash: HashInfo = { + md5: "", + md5_256kb: "", + sha1: "", + sha256: "", + } + // 初始化需要的 hash 实例 + for (const ht of hashType) { + if (ht === HashType.Md5) { + md5Digest = await createMD5() + } else if (ht === HashType.Md5256kb) { + md5256kbDigest = await createMD5() + } else if (ht === HashType.Sha1) { + sha1Digest = await createSHA1() + } else if (ht === HashType.Sha256) { + sha256Digest = await createSHA256() + } + } + const reader = file.stream().getReader() - const read = async () => { + let readBytes = 0 + const KB256 = 256 * 1024 + let md5256kbDone = false + + while (true) { const { done, value } = await reader.read() - if (done) { - return + if (done) break + + if (md5Digest) md5Digest.update(value) + if (sha1Digest) sha1Digest.update(value) + if (sha256Digest) sha256Digest.update(value) + + // 计算前256KB的md5 + if (md5256kbDigest && !md5256kbDone) { + let chunk = value + if (readBytes + chunk.length > KB256) { + // 只取剩余需要的部分 + chunk = chunk.slice(0, KB256 - readBytes) + md5256kbDone = true + } + md5256kbDigest.update(chunk) + readBytes += chunk.length + if (readBytes >= KB256) { + md5256kbDone = true + } } - md5Digest.update(value) - sha1Digest.update(value) - sha256Digest.update(value) - await read() } - await read() - const md5 = md5Digest.digest("hex") - const sha1 = sha1Digest.digest("hex") - const sha256 = sha256Digest.digest("hex") - return { md5, sha1, sha256 } + + if (md5Digest) hash.md5 = await md5Digest.digest("hex") + if (md5256kbDigest) hash.md5_256kb = await md5256kbDigest.digest("hex") + if (sha1Digest) hash.sha1 = await sha1Digest.digest("hex") + if (sha256Digest) hash.sha256 = await sha256Digest.digest("hex") + + return hash +} + +export const calculateSliceHash = async ( + file: File, + sliceSize: number, + hashType: string, +) => { + const sliceCount = Math.ceil(file.size / sliceSize) + const results: string[] = [] + + for (let i = 0; i < sliceCount; i++) { + const start = i * sliceSize + const end = Math.min(file.size, start + sliceSize) + const blob = file.slice(start, end) + const arrayBuffer = await blob.arrayBuffer() + let hash: string = "" + + if (hashType === HashType.Md5) { + const md5 = await createMD5() + md5.update(new Uint8Array(arrayBuffer)) + hash = await md5.digest("hex") + } else if (hashType === HashType.Sha1) { + const sha1 = await createSHA1() + sha1.update(new Uint8Array(arrayBuffer)) + hash = await sha1.digest("hex") + } else if (hashType === HashType.Sha256) { + const sha256 = await createSHA256() + sha256.update(new Uint8Array(arrayBuffer)) + hash = await sha256.digest("hex") + } else { + throw new Error("Unsupported hash type: " + hashType) + } + + results.push(hash) + } + + return results // 每个分片的hash组成的数组 } diff --git a/src/types/resp.ts b/src/types/resp.ts index 8934099c..b96739a4 100644 --- a/src/types/resp.ts +++ b/src/types/resp.ts @@ -41,6 +41,26 @@ export type FsGetResp = Resp< } > +export type FsPreupResp = Resp<{ + task_id: string + slice_size: number + slice_cnt: number + slice_upload_status: string + reuse: boolean +}> +export type FsUpinfoResp = Resp<{ + slice_hash_need: boolean //是否需要分片哈希 + hash_md5_need: boolean //是否需要md5 + hash_md5_256kb_need: boolean //是否需要前256KB的md5 + hash_sha1_need: boolean //是否需要sha1 +}> + +export type FsSliceupCompleteResp = Resp<{ + task_id: string + slice_upload_status: string + complete: number +}> + export type EmptyResp = Resp<{}> export type PResp = Promise> diff --git a/vite.config.ts b/vite.config.ts index 08fe5c88..971f0294 100644 --- a/vite.config.ts +++ b/vite.config.ts @@ -72,6 +72,7 @@ export default defineConfig({ // target: "es2015", //next // polyfillDynamicImport: false, rollupOptions: { + external: ["p-limit"], output: { assetFileNames: (assetInfo) => assetInfo.names?.some((name) => name.endsWith("pdf.worker.min.mjs"))