解决跨区网络稳定性较差,尤其往海外推送
代码托管:gitlab
CI:tekton
pipline/task: Serverless容器(spot实例按秒计费)
任务管理:redis
镜像分发工具:crane
func Sync(src, dest string) error {
// 创建使用 docker 配置的 keychain
kc, err := NewDockerConfigKeychain()
if err != nil {
return err
}
// 使用 docker 配置进行认证
err = crane.Copy(src, dest,
crane.WithNoClobber(false),
crane.WithAuthFromKeychain(kc))
return err
}
type TaskProcessor struct {
store *store.RedisStore
workerPool chan struct{} // 用于限制并发数的信号量
wg sync.WaitGroup
}
// NewTaskProcessor 创建一个新的任务处理器
// maxWorkers 指定最大并发任务数
func NewTaskProcessor(store *store.RedisStore, maxWorkers int) *TaskProcessor {
return &TaskProcessor{
store: store,
workerPool: make(chan struct{}, maxWorkers),
}
}
// ProcessTask 处理单个任务
func (p *TaskProcessor) ProcessTask(task *types.ImageTask) {
p.wg.Add(1)
go func() {
defer p.wg.Done()
// 获取worker槽位
p.workerPool <- struct{}{}
defer func() {
// 释放worker槽位
<-p.workerPool
}()
ctx := context.Background()
// 更新状态为运行中
task.Status = types.TaskStatusRunning
if err := p.store.SaveTask(ctx, task); err != nil {
log.Printf("Failed to update task status: %v", err)
return
}
// 执行同步
err := imageDist.Sync(task.Src, task.Dest)
// 更新最终状态
if err != nil {
task.Status = types.TaskStatusFailed
task.Error = err.Error()
} else {
task.Status = types.TaskStatusComplete
}
if err := p.store.SaveTask(ctx, task); err != nil {
log.Printf("Failed to update final task status: %v", err)
}
}()
}
// Wait 等待所有任务完成
func (p *TaskProcessor) Wait() {
p.wg.Wait()
}
// Shutdown 优雅关闭任务处理器
func (p *TaskProcessor) Shutdown(ctx context.Context) {
done := make(chan struct{})
go func() {
p.wg.Wait()
close(done)
}()
select {
case <-ctx.Done():
log.Println("Shutdown timeout, some tasks may still be running")
case <-done:
log.Println("All tasks completed successfully")
}
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。