服务端需要处理客户端上传的视频压缩其大小,这里采用Golang + ffmpeg binding实现。采用Gin为web 框架实现webapi给客户端调用。采用ffmpg-go做go binging,ffmpeg放入docker镜像内,ffmpeg编译不在描述,见本博客其他文档。
go.mod 文件
module VideoProcess
go 1.18
require (
github.com/gin-gonic/gin v1.7.7
github.com/go-resty/resty/v2 v2.7.0
github.com/google/uuid v1.1.1
github.com/mattn/go-sqlite3 v2.0.3+incompatible
github.com/rabbitmq/amqp091-go v1.3.4
github.com/u2takey/ffmpeg-go v0.4.1
)
require (
github.com/gin-contrib/sessions v0.0.5
github.com/vcraescu/go-paginator/v2 v2.0.0
)
require (
github.com/aws/aws-sdk-go v1.38.20 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-playground/locales v0.13.0 // indirect
github.com/go-playground/universal-translator v0.17.0 // indirect
github.com/go-playground/validator/v10 v10.4.1 // indirect
github.com/golang/protobuf v1.3.3 // indirect
github.com/gorilla/context v1.1.1 // indirect
github.com/gorilla/securecookie v1.1.1 // indirect
github.com/gorilla/sessions v1.2.1 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/json-iterator/go v1.1.10 // indirect
github.com/kr/pretty v0.1.0 // indirect
github.com/leodido/go-urn v1.2.0 // indirect
github.com/mattn/go-isatty v0.0.12 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/stretchr/testify v1.7.0 // indirect
github.com/u2takey/go-utils v0.3.1 // indirect
github.com/ugorji/go/codec v1.1.7 // indirect
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83 // indirect
golang.org/x/net v0.0.0-20211029224645-99673261e6eb // indirect
golang.org/x/sys v0.0.0-20220408201424-a24fb2fb8a0f // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
gopkg.in/yaml.v2 v2.2.8 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
gorm.io/gorm v1.20.12 // indirect
)
ffmpeg 命令行:
ffmpeg -i 5.mp4 -c:v libx264 -c:a copy -preset slow -crf 16 -bufsize 1835k -profile:v high -movflags +faststart -color_primaries 1 -color_trc 1 -colorspace 1 -y -vf scale=720:-2:flags=lanczos -maxrate 1500k -minrate 1000k -x264opts keyint=60:minc-keyint=60:scenecut=0:force-cfr=1 s_5.mp4
这批参数将原始视频转我换为x264格式,crf模式并缩放到720p,最大码率控制在1500k,fix gop,关于这些的具体另文描述。
对于部分x265 10bit的视频是无法转换为x264视频的。因此如下命令
fmpeg -i 5.mp4 -c:v libx265 -c:a copy -preset fast -crf 28 -bufsize 1835k -profile:v main10 -movflags +faststart -color_primaries 1 -color_trc 1 -colorspace 1 -y -vf scale=720:-1:flags=lanczos -maxrate 1500k -minrate 1000k -x265-params keyint=60:min-keyint=60:scenecut=0 s_5.mp4
整个程序思路:客户端将需要转换的视频发JSON格式的数据到消息队列(RabbitMQ),golang消费信息后进入自己的队列,控制最多并发的ffmpeg进程数量,采用go channel。
视频处理流程
package Utlity
import (
Model "VideoProcess/Model"
"encoding/json"
"fmt"
"github.com/u2takey/ffmpeg-go"
"os"
"os/exec"
"runtime"
"strings"
"sync"
"time"
)
type VideProcessCallback func(isSuccess bool, uniqueCode string, destFiles []string, errorMessage string)
var lock = &sync.Mutex{}
var videoUtilInstance *VideoProcessUtility
type VideoProcessUtilInstance struct {
}
func (videoProcessUtilInstance *VideoProcessUtilInstance) GetInstance() *VideoProcessUtility {
if videoUtilInstance == nil {
lock.Lock()
defer lock.Unlock()
if videoUtilInstance == nil {
fmt.Println("Creating single instance now.")
videoUtilInstance = &VideoProcessUtility{}
} else {
fmt.Println("Single instance already created.")
}
} else {
fmt.Println("Single instance already created.")
}
return videoUtilInstance
}
type VideoProcessUtility struct {
tasks TaskQueue
wg sync.WaitGroup
callBack VideProcessCallback
configuration Model.Config
taskChannel *chan Task
}
func (util *VideoProcessUtility) Init(callback VideProcessCallback) {
var configInstance Model.ConfigSingletonInstance
util.configuration = *configInstance.GetInstance()
if util.taskChannel == nil && util.tasks == nil {
util.tasks = make(TaskQueue, 0)
var taskChannel = make(chan Task, util.configuration.MaxFFmpegInstanceCount)
util.taskChannel = &taskChannel
}
configInstance.GetInstance().MaxffmpegCountChangeHandler = util.changeInstanceCount
util.callBack = callback
}
func (util *VideoProcessUtility) changeInstanceCount(newMaxCount int) {
go func() {
for {
if util.tasks.IsEmpty() {
lock.Lock()
taskChannel := *util.taskChannel
if len(taskChannel) == 0 {
taskChannel = make(chan Task, newMaxCount)
lock.Unlock()
break
}
lock.Unlock()
}
time.Sleep(30 * time.Second)
}
}()
}
func (util *VideoProcessUtility) Prepare(src string, dest string, uniqueCode string, keyNO string) {
var task Task
task.SrcFile = src
task.DestFile = dest
task.UniqueCode = uniqueCode
task.KeyNO = keyNO
lock.Lock()
defer lock.Unlock()
util.tasks.Push(task)
}
func (util *VideoProcessUtility) Start() {
go func() {
for {
if !util.tasks.IsEmpty() {
singleTask := util.tasks.Pop()
*util.taskChannel <- singleTask
go util.processVideo(singleTask.SrcFile, singleTask.DestFile, singleTask.UniqueCode)
} else {
time.Sleep(time.Second * time.Duration(5))
}
}
}()
}
// 尝试获取原始视频信息
func (util *VideoProcessUtility) getVideoStreamInfo(src string) (string, string) {
//args := " " + src
_, err := os.Stat(src)
if err != nil {
panic("File Not exist")
}
cmd := exec.Command("ffprobe", "-show_streams", "-print_format", "json", "-v", "quiet", src)
out, err := cmd.CombinedOutput()
var profile string
var codecName string
if err == nil {
var videoStreamInfo Model.VideoStreamInfo
json.Unmarshal(out, &videoStreamInfo)
for _, streamInfo := range videoStreamInfo.Streams {
if streamInfo.CodecType == "video" {
profile = streamInfo.Profile
if streamInfo.CodecName == "hevc" {
codecName = "libx265"
} else if streamInfo.CodecName == "h264" {
codecName = "libx264"
}
break
}
}
if codecName == "libx264" {
if strings.Contains(profile, "10") {
return codecName, "high10"
} else if strings.Contains(profile, "422") {
return codecName, "high422"
} else if strings.Contains(profile, "444") {
return codecName, "high444"
} else {
return codecName, "high"
}
} else {
newProfile := strings.ReplaceAll(profile, " ", "")
newProfile = strings.ToLower(newProfile)
return codecName, newProfile
}
} else {
panic("get stream failed")
}
}
// 视频处理
func (util *VideoProcessUtility) processVideo(src string, dest string, uniqueCode string) {
var threads = int(runtime.NumCPU() / 2)
var command *ffmpeg_go.Stream
codec, profile := util.getVideoStreamInfo(src)
if codec == "libx264" {
command = ffmpeg_go.Input(src).
Output(dest,
ffmpeg_go.KwArgs{
"c:v": "libx264",
"preset": util.configuration.FFmpegPreset,
"crf": util.configuration.VideoCrf,
"profile:v": profile,
"level": "4.0",
"maxrate": util.configuration.VideoMaxRate,
"bufsize": "1835k",
"c:a": "copy",
"movflags": "frag_keyframe+empty_moov+delay_moov+default_base_moof+faststart",
"color_primaries": 1,
"color_trc": 1,
"colorspace": 1,
"vf": "scale=720:-2:flags=lanczos",
"x264opts": "keyint=60:min-keyint=60:scenecut=0:force-cfr=1",
"threads": threads,
}).OverWriteOutput()
} else {
command = ffmpeg_go.Input(src).
Output(dest,
ffmpeg_go.KwArgs{
"c:v": "libx265",
"preset": util.configuration.FFmpegPreset,
"profile:v": profile,
"maxrate": util.configuration.VideoMaxRate,
"bufsize": "1835k",
"c:a": "copy",
"threads": threads,
}).OverWriteOutput()
}
commandComplied := command.Compile()
commandLine := commandComplied.String()
var commandListInstance Model.FFmpegTasksInstance
commandListInstance.GetInstance().AddCommand(uniqueCode, commandLine)
err := command.Run()
isSuccess := true
errorMessage := ""
if err != nil {
isSuccess = false
errMessageBytes, _ := commandComplied.CombinedOutput()
errorMessage = string(errMessageBytes)
}
destFiles := make([]string, 0)
destFiles = append(destFiles, dest)
taskChannel := *util.taskChannel
taskFinished := <-taskChannel
fmt.Println(taskFinished)
util.callBack(isSuccess, uniqueCode, destFiles, errorMessage)
}
本文为Lokie.Wang原创文章,转载无需和我联系,但请注明来自lokie博客http://lokie.wang