实践一个大文件上传

发表于 2023-05-07 04:56:17
更新于 2024-04-18 13:33:37
Javascriptgo

提前看最终效果

功能要点

大文件面临两个问题,首先就是用户体验如果上传一半时,有网络问题或其他原因中断,需要从头开始上传,体验不好;一般服务的网关或者程序都会限制一次请求的大小,超过大小的文件无法上传。 所以可以通过文件分片解决上传大小问题;利用信息摘要算法避免重传。

前端:

  • 通过 input type="file" 接收用户选择的文件。
  • 点击上传时,将完整文件首先做一次信息摘要算法,我使用的是 SHA-256。信息摘要的意思是无论输入的消息有多长,计算出来的消息摘要的长度总是固定的。这样我们在上传前,可以先做一次校验,判断之前是否已经上传过了,上传过了的文件可以打到“秒传”的效果,但是这次我没有做这个,其实做起来也容易,我这次计算这个值的目的是以这个值作为存放分片的目录名,方便后续操作分片。
  • 利用 Blob 支持的 slice 方法,可以将用户的文件分片,具体分割的大小可以根据网络和文件大小综合决定,由于只是一个演示,我使用了一个固定值, 使用一个固定值去切分,这样才能保持 hash 的命中,才能有效判断切片是否上传过,如果不是一个固定的值很容易导致不能命中。
  • 每一个分片可以额外计算一次信息摘要,每次上传分片前,也可以事先和服务端确认该分片之前是否上传过。每个分片传行地上传效率太低,全部一次上传可能对于服务器压力过大,我们可以使用异步任务池的概念控制并发数,这也是一个常见的考点,这次暂且使用第三方库 tiny-async-pool 快速把这个示例完成。

服务端:

  • 实现静态文件服务,需要将用户上传的文件,以服务形式提供出来,方便验证上传的文件。
  • 实现验证分片是否上传的接口,在后端每个分片以 tmp/[文件hash]/[分片序号]-[分片hash] 的形式存在,我们可以比较容易得到分片是否存在。
  • 实现上传分片接口,这个接口只需要按照规则,写文件即可。
  • 实现合并接口,由于前端分片后是并发上传,我们无法保证切片到达的顺序,所以需要等全部分片上传完成后,客户端通知后端进行合并。合并后可以由后端以同样的摘要算法计算一次,可以判断最后合并的文件是不是上传的同一个文件。

代码

go
package main

import (
	"crypto/sha256"
	"fmt"
	"github.com/gin-contrib/cors"
	"github.com/gin-gonic/gin"
	"net/http"
	"os"
	"sort"
	"strconv"
	"strings"
)

func main() {
	router := gin.Default()

	router.Use(cors.Default())

	router.Static("/tmp", "./tmp")

	router.POST("/verify", func(c *gin.Context) {
		index := c.PostForm("index")
		fileHash := c.PostForm("fileHash")
		sliceHash := c.PostForm("sliceHash")

		exist := isFileExist(fmt.Sprintf("tmp/%s/%s-%s", fileHash, index, sliceHash))

		c.JSON(http.StatusOK, gin.H{
			"exist": exist,
		})
	})

	router.POST("/upload", func(c *gin.Context) {
		index := c.PostForm("index")
		fileHash := c.PostForm("fileHash")
		sliceHash := c.PostForm("sliceHash")
		file, err := c.FormFile("file")

		if err != nil {
			c.JSON(http.StatusBadRequest, gin.H{
				"message": err.Error(),
			})
			return
		}

		fullPath := fmt.Sprintf("tmp/%s/%s-%s", fileHash, index, sliceHash)

		err = c.SaveUploadedFile(file, fullPath)
		if err != nil {
			c.JSON(http.StatusBadRequest, gin.H{
				"message": err.Error(),
			})
			return
		}

		c.JSON(http.StatusOK, gin.H{})
	})

	router.POST("/merge", func(c *gin.Context) {
		fileHash := c.PostForm("fileHash")
		dirname := fmt.Sprintf("tmp/%s", fileHash)
		dir, err := os.ReadDir(dirname)
		if err != nil {
			c.JSON(http.StatusBadRequest, gin.H{
				"message": err.Error(),
			})
			return
		}

		sort.SliceStable(dir, func(i, j int) bool {
			x := strings.Split(dir[i].Name(), "-")
			y := strings.Split(dir[j].Name(), "-")

			nx, _ := strconv.Atoi(x[0])
			ny, _ := strconv.Atoi(y[0])
			return nx < ny
		})

		all := []byte{}
		for _, file := range dir {
			data, err := os.ReadFile(fmt.Sprintf("tmp/%s/%s", fileHash, file.Name()))
			if err != nil {
				c.JSON(http.StatusBadRequest, gin.H{
					"message": err.Error(),
				})
				return
			}
			all = append(all, data...)
		}
		target := fmt.Sprintf("tmp/%s.mp4", fileHash)
		err = os.WriteFile(target, all, 0666)
		if err != nil {
			c.JSON(http.StatusBadRequest, gin.H{
				"message": err.Error(),
			})
			return
		}

		h := sha256.New()
		h.Write(all)

		bs := h.Sum(nil)

		hash := fmt.Sprintf("%x", bs)
		if hash == fileHash {
			err := os.RemoveAll(dirname)
			if err != nil {
				c.JSON(http.StatusBadRequest, gin.H{
					"message": "目录删除失败",
				})
				return
			}
			c.JSON(http.StatusOK, gin.H{
				"message": "大成功",
				"url":     "http://127.0.0.1:8080/" + target,
			})
			return
		} else {
			c.JSON(http.StatusBadRequest, gin.H{
				"message": "hash 不一致",
			})
			return
		}
	})

	err := router.Run(":8080")
	if err != nil {
		println(err)
		return
	}
}

func isFileExist(fileName string) bool {
	_, err := os.Stat(fileName)
	return err == nil
}
vue
<script setup lang="ts">
import asyncPool from 'tiny-async-pool'
import { hash } from '@/utils'
import { reactive, ref } from 'vue'

const CHUNK_SIZE = 1024 * 512

type Task = {
  index: number
  blob: Blob
  name: string
  fileHash: string
  sliceHash: string
  isComplete: boolean
}

const inputRef = ref<HTMLInputElement | null>(null)
const tasks = reactive<Task[]>([])
const url = ref('')

async function uploadSingleFile() {
  let uploadedSize = 0
  const file = inputRef?.value?.files?.[0]
  if (!file) {
    return
  }

  const fullHash = await hash(await file.arrayBuffer())

  let index = 0
  while (uploadedSize < file.size) {
    index++
    const fileChunk = file.slice(uploadedSize, uploadedSize + CHUNK_SIZE)
    const sliceHash = await hash(await fileChunk.arrayBuffer())

    tasks.push({
      index,
      blob: fileChunk,
      fileHash: fullHash,
      sliceHash: sliceHash,
      name: file.name,
      isComplete: false
    })
    uploadedSize += fileChunk.size
  }

  for await (const result of asyncPool(5, tasks, doTask)) {
    console.log(result)
  }

  const formData = new FormData()
  formData.append('fileHash', fullHash)
  const response = await fetch('http://127.0.0.1:8080/merge', {
    method: 'POST',
    body: formData
  })
  const json = await response.json()
  url.value = json.url
}

function doTask(task: Task) {
  return new Promise(async (resolve, reject) => {
    try {
      const formData = new FormData()
      formData.append('index', task.index.toString())
      formData.append('name', task.name)
      formData.append('fileHash', task.fileHash)
      formData.append('sliceHash', task.sliceHash)

      const verifyResponse = await fetch('http://127.0.0.1:8080/verify', {
        method: 'POST',
        body: formData
      })
      let message = await verifyResponse.json()
      if (message.exist) {
        // 已上传
        task.isComplete = true
        resolve('已上传,跳过这个 blob')
        return
      }

      // 未上传
      formData.append('file', task.blob)
      const response = await fetch('http://127.0.0.1:8080/upload', {
        method: 'POST',
        body: formData
      })
      message = await response.json()
      task.isComplete = true
      resolve(message)
    } catch (error) {
      reject(error)
    }
  })
}
</script>

<template>
  <main class="flex flex-col items-center gap-4">
    <h1 class="text-3xl font-bold pt-4">大文件上传</h1>
    <input
      ref="inputRef"
      type="file"
      class="file-input file-input-bordered file-input-warning w-full max-w-xs"
    />
    <button class="btn gap-2" @click="uploadSingleFile">上传</button>
    <a :href="url">{{ url }}</a>
    <div class="flex w-64 gap-2 flex-wrap">
      <div
        v-for="item in tasks"
        :key="item"
        class="flex-shrink-0 w-4 h-4 bg-gray-200"
        :class="{ 'bg-green-500': item.isComplete }"
      ></div>
    </div>
  </main>
</template>
ts
export async function hash(buffer: ArrayBuffer) {
  const hash = await crypto.subtle.digest('SHA-256', buffer)
  return Array.from(new Uint8Array(hash))
    .map((b) => b.toString(16).padStart(2, '0'))
    .join('')
}

总结

这次实践,只是对分片上传做了简单的实现,比较重点的只有一个 Blob 的 slice 方法。

后续可以再去了解一下常见的信息摘要算法和去实现一下自己的异步池。

还留有一些疑问:

  • 不同文件的 hash 是否会计算出来同一个值,如果会那该怎么解决?
  • HTTP 支持请求范围,允许服务器只发送 HTTP 消息的一部分到客户端。有没有相反的方式,可以每次发送一部分文件分片到服务器?