前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >如何实现类似于百度网盘大文件的断点续传

如何实现类似于百度网盘大文件的断点续传

作者头像
Careteen
发布2022-02-14 16:48:10
2K0
发布2022-02-14 16:48:10
举报
文章被收录于专栏:源码揭秘

目录

背景

工作中如果有负责开放平台,那么往往会有上传文件的诉求。一般10M内大小的图片,我们能通过一个上传接口即可,但如果文件大小超过100M或者1G甚至更大,通过一个接口在人机交互上显然不友好,期望为用户提供进度条,实时告知上传进度;而且用户可以选择暂停,比如断网或上传了错误文件,用户也能随时恢复上传;若用户重复上传相同文件时,系统能提示秒传成功。也就是实现类似于百度网盘的上传功能。

  • 小文件整体上传效果图
  • 大文件分片上传效果图

下面将从零搭建前端和服务端,实现小文件上传再循序渐进到上传大文件。

技术栈主要是前端:React、AntD、Typescript;服务端:TS-Node、Express...

文章首发于@lan-react/upload,转载请注明来源。客户端代码存放服务端代码存放

实现小文件整体上传

搭建前端环境

通过create-react-app --template typescript创建项目

引入antdyarn add antd然后yarn start运行项目

编写上传的组件

代码语言:javascript
复制
import React, { ChangeEvent, useState, useEffect } from 'react';
import { Row, Col, Input } from 'antd';
interface UploadProps {
  width?: number;
}
interface CurrentFile {
  file: File;
  dataUrl?: string;
  type?: string;
}
const isImage = (type: string = ''): boolean => type.includes('image');
const Upload: React.FC<UploadProps> = (props) => {
  const {
    width = 300,
  } = props;
  const [currentFile, setCurrentFile] = useState<CurrentFile>();
  const onFileChange = (event: ChangeEvent<HTMLInputElement>) => {
    const file: File = event.target.files![0];
    if (file) {
      const reader = new FileReader();
      reader.addEventListener('load', () => {
        setCurrentFile({
          file: file,
          dataUrl: reader.result as string,
          type: file.type,
        })
      });
      reader.readAsDataURL(file);
    }
  }
  return (
    <div>
      <Input type="file" style={{ width: width }} onChange={onFileChange} />
      { isImage(currentFile?.type) ? <img src={currentFile?.dataUrl} style={{ width: 100 }} alt={currentFile?.file.name} /> : null }
    </div>
  )
}
export default Upload;

小文件上传使用FormData

代码语言:javascript
复制
return (
  <div>
    <Input type="file" style={{ width: width }} onChange={onFileChange} />
+    <Button type="primary" onClick={() => onFileUpload(UploadType.WHOLE)}>小文件整体上传</Button>
  </div>
)

编写一个按钮然后处理上传

代码语言:javascript
复制
// 上传类型
enum UploadType {
  WHOLE,
  PART,
}
// 大小检测
const checkSize = (size: number = 0, maxSize: number = 2 * 1024 * 1024 * 1024): boolean => {
  if (size > maxSize) {
    message.error(`文件大小不能超过2G`)
    return false
  }
  return true
}
const onFileUpload = (type: UploadType = UploadType.WHOLE) => {
  if (!currentFile?.file) {
    message.error('请选择文件~')
    return
  }
  if (!checkSize(currentFile?.file?.size)) return
  switch (type) {
    case UploadType.WHOLE:
      wholeUpload();
      break;
  }
}
// 整体上传
const wholeUpload = async () => {
  const formData = new FormData()
  formData.append('file', currentFile?.file as File)
  formData.append('name', currentFile?.file.name as string)
  const res = await request({
    url: '/wholeUpload',
    method: 'POST',
    data: formData,
  })
  message.success('上传成功');
}

然后简单封装下request

代码语言:javascript
复制
export interface Config {
  baseUrl?: string;
  url?: string;
  method?: string;
  headers?: any;
  data?: any;
}
export const request = (conf: Config): Promise<any> => {
  const config: Config = {
    method: 'GET',
    baseUrl: 'http://localhost:8000',
    headers: {},
    data: {},
    ...conf
  }
  return new Promise((resolve, reject) => {
    const xhr = new XMLHttpRequest();
    xhr.open(config.method as string, `${config.baseUrl}${config.url}`);
    for (const key in config.headers) {
      if (Object.prototype.hasOwnProperty.call(config.headers, key)) {
        const value = config.headers[key];
        xhr.setRequestHeader(key, value);
      }
    }
    xhr.responseType = 'json';
    xhr.onreadystatechange = () => {
      if (xhr.readyState === 4) {
        if (xhr.status === 200) {
          resolve(xhr.response);
        } else {
          reject(xhr.response);
        }
      }
    }
    xhr.send(config.data);
  })
}
搭建服务端环境

使用nodemonts-node

代码语言:javascript
复制
"scripts": {
  "dev": "cross-env PORT=8000 nodemon --exec ts-node --files ./src/www.ts"
},

借助http模块起服务编写./src/www.ts

代码语言:javascript
复制
import app from './app'
import http from 'http'
const port = process.env.PORT || 8000
const server = http.createServer(app)
const onError = (error: any) => {
  console.error(error)
}
const onListening = () => {
  console.log(`Listening on port ${port}`)
}
server.listen(port)
server.on('error', onError)
server.on('listening', onListening)

然后编写app.ts

代码语言:javascript
复制
import express, { Request, Response, NextFunction } from 'express'
import path from 'path'
import fs from 'fs-extra'
import logger from 'morgan'
import cors from "cors"
import multiparty from 'multiparty'
import createError from 'http-errors'
import { INTERNAL_SERVER_ERROR } from 'http-status-codes'
const app = express()
const PUBLIC_DIR = path.resolve(__dirname, 'public')
app.use(logger('dev'))
app.use(express.json())
app.use(express.urlencoded({ extended: true }))
app.use(cors())
app.use(express.static(PUBLIC_DIR))
app.post('/upload', async (req: Request, res: Response, next: NextFunction) => {
  const form = new multiparty.Form()
  form.parse(req, async (err: any, fields, files) => {
    if (err) return next(err)
    const name = fields.name[0]
    const file = files.file[0]
    await fs.move(file.path, path.resolve(PUBLIC_DIR, name), { overwrite: true })
    res.json({
      success: true
    })
  })
})
app.use((_req: Request, _res: Response, next: NextFunction) => {
  next(createError(404))
})
app.use((error: any, _req: Request, res: Response, _next: NextFunction) => {
  res.status(error.status || INTERNAL_SERVER_ERROR)
  res.json({
    succuess: false,
    error,
  })
})
export default app

将文件存放到本地,即服务端项目根目录的public目录。

然后启动服务npm run dev

再如下操作,实现上传小文件。

接下来将实现大文件的分片上传

如何实现大文件分片上传

大文件分片上传的思路

  • 客户端将大文件进行分割。(这里粒度为10M,分割后文件按fileName-${index}进行排序以便服务端合并)
  • 为了实现秒传功能,需要对文件内容计算出hash值。(计算hash比较耗时,借助worker实现,并提供进度条)
  • 客户端对分割后的小文件依次调用接口上传。
  • 服务端提供上传接口。(将所有小文件存放到临时目录)
  • 客户端上传所有分片文件后,调用请求合并的接口。
  • 服务端提供合并接口。(按上述已排序的文件名进行合并,合并成大文件后存放本地)
  • 客户端提供暂停/恢复功能。(暂停即调用xhr.abort(),恢复即重新上传)
  • 特别的:在上传之前客户端会调用校验接口。(得知文件是否已上传?文件上传了哪一部分?)
  • 服务端提供校验接口。(根据文件名在临时目录下读取分片,如果有则将分片信息返回客户端)
  • 客户端根据返回内容进行处理。(有成品文件即为秒传、有分片文件则只上传剩余部分)
  • 上传完毕

上述大致实现思路,下面将介绍实现细节和注意点。

客户端实现分片
代码语言:javascript
复制
const onFileUpload = (type: UploadType = UploadType.WHOLE) => {
  // ...
  switch (type) {
    case UploadType.WHOLE:
      wholeUpload();
      break;
+    case UploadType.PART:
+      partUpload()
+      break;
  }
}
代码语言:javascript
复制
interface Part {
  chunk: Blob;
  size: number;
  fileName?: string;
  chunkName?: string;
  loaded?: number;
  percent?: number;
  xhr?: XMLHttpRequest;
}
const partUpload = async () => {
  setUploadStatus(UploadStatus.UPLOADING);
  // 1. 对文件进行分片
  // 2. 根据分片计算文件hash
  // 3. 分片上传
  const partList = createChunks(currentFile?.file as File);
  const fileHash = await generateHash(partList);
  console.log(fileHash, 'fileHash');
  const lastDotIdx = currentFile?.file.name.lastIndexOf('.');
  const extName = currentFile?.file.name.slice(lastDotIdx);
  const fileName = `${fileHash}${extName}`;
  partList.forEach((part: Part, index: number) => {
    part.fileName = fileName;
    part.chunkName = `${fileName}-${index}`;
    part.loaded = 0;
    part.percent = 0;
  })
  setFileName(fileName);
  setPartList(partList);
  await uploadParts(partList, fileName)
}

再依次实现createChunksgenerateHash两个核心方法

代码语言:javascript
复制
const createChunks = (file: File, size: number = DEAFULT_SIZE): Part[] => {
  let current: number = 0;
  const partList: Part[] = [];
  while (current < file.size) {
    const chunk: Blob = file.slice(current, current + size);
    partList.push({
      chunk,
      size: chunk.size,
    })
    current += size
  }
  return partList;
}
客户端计算hash
代码语言:javascript
复制
const generateHash = (partList: Part[]): Promise<any> => {
  return new Promise((resolve, reject) => {
    const worker = new Worker('/generateHash.js');
    worker.postMessage({ partList });
    worker.onmessage = (event) => {
      const { percent, hash } = event.data;
      setHashPercent(percent);
      if (hash) {
        resolve(hash);
      }
    }
    worker.onerror = error => {
      reject(error);
    }
  })
}

主要借助Workerpublic下新建generateHash.js文件

代码语言:javascript
复制
self.importScripts('https://cdn.bootcss.com/spark-md5/3.0.0/spark-md5.js');
self.onmessage = async (event) => {
  const { partList } = event.data;
  const spark = new self.SparkMD5.ArrayBuffer();
  let percent = 0;
  const perSize = 100 / partList.length;
  const buffers = await Promise.all(partList.map(({ chunk }) => new Promise((resolve, reject) => {
    const reader = new FileReader();
    reader.readAsArrayBuffer(chunk);
    reader.onload = (e) => {
      percent += perSize;
      self.postMessage({ percent: Number(percent.toFixed(2)) });
      resolve(e.target.result);
    }
  })));
  buffers.forEach(buffer => spark.append(buffer));
  self.postMessage({ percent: 100, hash: spark.end() });
  self.close();
}

将每一个分片的计算进度postMessagegenerateHash,然后实时同步进度setHashPercent

客户端上传分片
代码语言:javascript
复制
const uploadParts = async (partList: Part[], fileName: string) => {
  const res = await request({
    url: `/verify/${fileName}`,
  })
  if (res.code === 200) {
    if (!res.data.needUpload) {
      message.success('秒传成功');
      setPartList(partList.map((part: Part) => ({
        ...part,
        loaded: DEAFULT_SIZE,
        percent: 100,
      })))
      reset()
      return
    }
    try {
      const { uploadedList } = res.data
      const requestList = createRequestList(partList, uploadedList, fileName);
      const partsRes = await Promise.all(requestList);
      if (partsRes.every(item => item.code === 200)) {
        const mergeRes = await request({
          url: `/merge/${fileName}`,
        })
        if (mergeRes.code === 200) {
          message.success('上传成功');
          reset()
        } else {
          message.error('上传失败,请稍后重试~');
        }
      } else {
        message.error('上传失败,请稍后重试~');
      }
    } catch (error) {
      message.error('上传失败或暂停');
      console.error(error);
    }
  }
}
  • 在上传之前客户端会调用校验接口。(得知文件是否已上传?文件上传了哪一部分?)
  • 客户端对分割后的小文件依次调用接口上传。
  • 客户端上传所有分片文件后,调用请求合并的接口。
服务端实现校验接口
代码语言:javascript
复制
export const PUBLIC_DIR = path.resolve(__dirname, 'public')
export const TEMP_DIR = path.resolve(__dirname, 'temp')
const DEAFULT_SIZE = 1024 * 1024 * 10;
// ...
app.get('/verify/:fileName', async (req: Request, res: Response, _next: NextFunction) => {
  const { fileName } = req.params
  const filePath = path.resolve(PUBLIC_DIR, fileName)
  const existFile = await fs.pathExists(filePath)
  if (existFile) {
    res.json({
      code: 200,
      msg: 'success',
      data: {
        needUpload: false,
      },
    })
    return
  }
  const folderPath = path.resolve(TEMP_DIR, fileName)
  const existFolder = await fs.pathExists(folderPath)
  let uploadedList: any[] = []
  if (existFolder) {
    uploadedList = await fs.readdir(folderPath)
    uploadedList = await Promise.all(uploadedList.map(async (fileName: string) => {
      const stat = await fs.stat(path.resolve(folderPath, fileName))
      return {
        fileName,
        size: stat.size,
      }
    }))
  }
  res.json({
    code: 200,
    msg: 'success',
    data: {
      needUpload: true,
      uploadedList,
    }
  })
})
服务端实现分片上传接口
代码语言:javascript
复制
app.post('/partUpload/:fileName/:start/:chunkName', async (req: Request, res: Response, _next: NextFunction) => {
  const { fileName, chunkName, start } = req.params
  const folderPath = path.resolve(TEMP_DIR, fileName)
  const existFolder = await fs.pathExists(folderPath)
  if (!existFolder) {
    await fs.mkdirs(folderPath)
  }
  const filePath = path.resolve(folderPath, chunkName)
  const ws = fs.createWriteStream(filePath, {
    start: Number(start),
    flags: 'a',
  })
  req.on('end', () => {
    ws.close()
    res.json({
      code: 200,
      msg: 'success',
      data: true,
    })
  })
  req.on('error', () => {
    ws.close()
  })
  req.on('close', () => {
    ws.close()
  })
  req.pipe(ws)
})
  • fileName 根据文件名创建临时目录存放分片文件
  • chunkName 分片名,命名格式为fileName-${index}
  • start 记录分片上传了多少
服务端实现合并接口
代码语言:javascript
复制
app.get('/merge/:fileName', async (req: Request, res: Response, _next: NextFunction) => {
  const { fileName } = req.params
  try {
    await mergeChunks(fileName)
    res.json({
      code: 200,
      msg: 'success',
      data: true,
    })
  } catch (error) {
    res.json({
      code: 1,
      msg: 'error',
      data: false,
    })
  }
})

与客户端相对应,合并规则与分割规则相反。

代码语言:javascript
复制
const getIndex = (str: string) => {
  const matched = str.match(/-(\d{1,})$/)
  return matched ? Number(matched[1]) : 0
}

const pipeStream = (filePath: string, ws: WriteStream) => new Promise((resolve, _reject) => {
  const rs = fs.createReadStream(filePath)
  rs.on('end', async () => {
    await fs.unlink(filePath)
    resolve()
  })
  rs.pipe(ws)
})

export const mergeChunks = async (fileName: string, size: number = DEAFULT_SIZE) => {
  const filePath = path.resolve(PUBLIC_DIR, fileName)
  const folderPath = path.resolve(TEMP_DIR, fileName)
  const folderFiles = await fs.readdir(folderPath)
  folderFiles.sort((a, b) => getIndex(a) - getIndex(b))
  await Promise.all(folderFiles.map((chunk: string, index: number) => pipeStream(
    path.resolve(folderPath, chunk),
    fs.createWriteStream(filePath, {
      start: index * size
    })
  )))
  await fs.rmdir(folderPath)
}

使用流而非直接写文件的方式

客户端实现暂停/恢复功能
代码语言:javascript
复制
<Row>
  <Col span={24}>
    {
      uploadStatus === UploadStatus.INIT && <Button type="primary" onClick={() => onFileUpload(UploadType.PART)}>大文件分片上传</Button>
    }
    {
      uploadStatus === UploadStatus.UPLOADING && <Button type="primary" onClick={() => onFilePause()}>暂停</Button>
    }
    {
      uploadStatus === UploadStatus.PAUSE && <Button type="primary" onClick={() => onFileResume()}>恢复</Button>
    }
  </Col>
</Row>

实现暂停即调用xhr.abort方法,需要先在request.ts中为其提供可挂载

代码语言:javascript
复制
// request.ts
if (config.setXhr) {
  config.setXhr(xhr);
}
// upload.tsx
const onFilePause = () => {
  partList.forEach((part: Part) => part.xhr && part.xhr.abort())
  setUploadStatus(UploadStatus.PAUSE)
}

在此之前还需完善createRequestList方法

代码语言:javascript
复制
const createRequestList = (partList: Part[], uploadedList: Uploaded[], fileName: string): Promise<any>[] => {
  return partList.filter((part: Part) => {
    const uploadedFile = uploadedList.find(item => item.fileName === part.chunkName);
    if (!uploadedFile) { // 此chunk还没上传过
      part.loaded = 0;
      part.percent = 0;
      return true;
    }
    if (uploadedFile.size < part.chunk.size) { // 此chunk上传了一部分
      part.loaded = uploadedFile.size;
      part.percent = Number((part.loaded / part.chunk.size * 100).toFixed(2));
      return true;
    }
    // 上传过了
    return false;
  }).map((part: Part) => request({
    url: `/partUpload/${fileName}/${part.loaded}/${part.chunkName}`,
    method: 'POST',
    headers: {
      'Content-Type': 'application/octet-stream',
    },
    setXhr: (xhr: XMLHttpRequest) => { // +挂载
      part.xhr = xhr;
    },
    onProgress: (event: ProgressEvent) => { // +进度条
      part.percent = Number(((part.loaded! + event.loaded) / part.chunk.size * 100).toFixed(2));
      console.log('part percent: ', part.percent)
      setPartList([...partList])
    },
    data: part.chunk.slice(part.loaded),
  }))
}

再实现恢复功能,即重新请求一次

代码语言:javascript
复制
const onFileResume = async () => {
  await uploadParts(partList, fileName)
  setUploadStatus(UploadStatus.UPLOADING)
}
客户端实现进度条功能

如上面的方法createRequestList代码,在onProgress中实时获取到上传进度

代码语言:javascript
复制
const columns = [
  {
    title: '分片名',
    dataIndex: 'chunkName',
    key: 'chunkName',
    width: '20%',
  },
  {
    title: '进度条',
    dataIndex: 'percent',
    key: 'percent',
    width: '80%',
    render: (value: number) => {
      return <Progress percent={value} />
    }
  }
]

const totalPercent = partList.length > 0 
  ? partList.reduce((memo: number, curr: Part) => memo + curr.percent!, 0) / partList.length
  : 0
{ uploadStatus !== UploadStatus.INIT ? (
  <>
    <Row>
      <Col span={4}>
        Hash进度条
      </Col>
      <Col>
        <Progress percent={hashPercent} />
      </Col>
    </Row>
    <Row>
      <Col span={4}>
        Total进度条
      </Col>
      <Col>
        <Progress percent={totalPercent} />
      </Col>
    </Row>
    <Table 
      columns={columns}
      dataSource={partList}
      rowKey={row => row.chunkName as string}
    />
  </>
) : null }
客户端实现文件秒传

在上面uploadParts方法中已经实现

代码语言:javascript
复制
const uploadParts = async (partList: Part[], fileName: string) => {
  const res = await request({
    url: `/verify/${fileName}`,
  })
  if (res.code === 200) {
    if (!res.data.needUpload) {
      message.success('秒传成功');
      setPartList(partList.map((part: Part) => ({
        ...part,
        loaded: DEAFULT_SIZE,
        percent: 100,
      })))
      reset()
      return
    }
    // ...
  }
}
bingo

效果图如下

总结

  • 小文件上传使用FormData,大文件上传设置'Content-Type': 'application/octet-stream'FormData可携带参数,octet-stream参数可设置在url中。
代码语言:javascript
复制
formData.append('file', currentFile?.file as File)
formData.append('name', currentFile?.file.name as string)

request({ url: `/partUpload/${fileName}/${part.loaded}/${part.chunkName}` })
  • 由于File继承自Blob,客户端可使用Blob.slice对大文件进行分割;服务端对分片文件存储,提供合并接口按切割顺序进行合并(使用createWriteStream/createReadStream)。
  • 为了实现秒传功能,需要对文件进行唯一标识,服务端校验为已上传文件直接返回成功和访问地址。
    • 使用Worker创建后台任务计算大文件唯一标识,避免页面卡死。
    • 使用Spark-md5计算文件唯一标识MD5
  • 提供进度条功能
    • 计算MD5时可借助Worker.postMessage按分片粒度通知前端计算进度
    • 上传分片可借助xhr.upload.onprogress实时通知前端上传进度
    • 前端借助Antd-Progress/Table展示进度条
  • 提供暂停/恢复功能
    • 暂停借助xhr.abort()终止请求
    • 重新上传获取上传情况,再只上传未上传部分。在服务端读取上传分片情况,客户端上传时再次借助Blob.slice(part.loaded)。服务端存储时按fs.createWriteStream(filePath, { start: Number(start), flags: 'a' })进行追加文件。
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020-08-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 目录
  • 背景
  • 实现小文件整体上传
    • 搭建前端环境
      • 搭建服务端环境
      • 如何实现大文件分片上传
        • 客户端实现分片
          • 客户端计算hash
            • 客户端上传分片
              • 服务端实现校验接口
                • 服务端实现分片上传接口
                  • 服务端实现合并接口
                    • 客户端实现暂停/恢复功能
                      • 客户端实现进度条功能
                        • 客户端实现文件秒传
                          • bingo
                          • 总结
                          相关产品与服务
                          文件存储
                          文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
                          领券
                          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档