需求:
本地平台:Windows 10 专业版 21H2 (19044.1826)、开发语言:go1.18.3 windows/amd64
监控目录下文件变动使用 github.com/fsnotify/fsnotify 上传测试服务器使用 Minio 进行测试
根据仓库中的示例代码也可以实现
package main
import (
"log"
"github.com/fsnotify/fsnotify"
)
func main() {
// Create new watcher.
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Fatal(err)
}
defer watcher.Close()
// Start listening for events.
go func() {
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
log.Println("event:", event)
if event.Op == fsnotify.Write {
log.Println("modified file:", event.Name)
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
log.Println("error:", err)
}
}
}()
// Add a path.
err = watcher.Add("/tmp")
if err != nil {
log.Fatal(err)
}
// Block main goroutine forever.
<-make(chan struct{})
}
监控文件生成时有两种监测结果(winodws):
Create
事件,两次Write
事件Create
事件,一次Write
事件经过测试Windows上是这样,Linux就比较一致,不论怎么创建文件,都是一次Create事件+一次Write事件
这个示例只能解决监控当前目录下的内容,子目录下的内容无法监控
解决:在监控到创建了目录以后,把新创建的目录加入到监控目录中去
func StartWatch(dir string) {
watch, _ := fsnotify.NewWatcher()
w := Watch{
watch: watch,
}
w.watchEx(dir)
log.Println("开始监控目录: ", dir, "...")
select {}
}
func (w *Watch) watchEx(dir string) {
//通过Walk来遍历目录下的所有子目录
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
//这里判断是否为目录,只需监控目录即可 || 目录下的文件也在监控范围内,不需要我们一个一个加
if info.IsDir() {
path, err := filepath.Abs(path)
if err != nil {
return err
}
err = w.watch.Add(path)
if err != nil {
return err
}
}
return nil
})
if err != nil {
log.Println("监控失败 : ", err.Error())
return
}
go w.watchExec()
}
func (w *Watch) watchExec() {
for {
select {
case ev := <-w.watch.Events:
{
if ev.Op&fsnotify.Create == fsnotify.Create {
fmt.Println("创建文件 : ", ev.Name)
//获取新创建文件的信息,如果是目录,则加入监控中
file, err := os.Stat(ev.Name)
if err == nil && file.IsDir() {
w.watch.Add(ev.Name)
fmt.Println("添加监控 : ", ev.Name)
}
}
if ev.Op&fsnotify.Write == fsnotify.Write {
fmt.Println("写入文件 : ", ev.Name)
}
if ev.Op&fsnotify.Remove == fsnotify.Remove {
fmt.Println("删除文件 : ", ev.Name)
//如果删除文件是目录,则移除监控
fi, err := os.Stat(ev.Name)
if err == nil && fi.IsDir() {
w.watch.Remove(ev.Name)
fmt.Println("删除监控 : ", ev.Name)
}
}
if ev.Op&fsnotify.Rename == fsnotify.Rename {
//如果重命名文件是目录,则移除监控 ,注意这里无法使用os.Stat来判断是否是目录了
//因为重命名后,go已经无法找到原文件来获取信息了,所以简单粗爆直接remove
fmt.Println("重命名文件 : ", ev.Name)
w.watch.Remove(ev.Name)
}
if ev.Op&fsnotify.Chmod == fsnotify.Chmod {
fmt.Println("修改权限 : ", ev.Name)
}
}
case err := <-w.watch.Errors:
{
log.Errorln("监控目录出错: ", err)
return
}
}
}
}
type Watch struct {
watch *fsnotify.Watcher
}
Minio
镜像进行搭建
Minio
新版和旧版还是有出入的,搭建以及后续维护和旧版差别比较大,而且网上各类教程主要针对旧版,方便后续排错,我是直接安装旧版
docker pull minio/minio:RELEASE.2021-06-17T00-10-46Z
docker run -d -p 9000:9000 --restart=always --name minioDemo\
-e "MINIO_ACCESS_KEY=admin" \
-e "MINIO_SECRET_KEY=admin123." \
-v D:/docker/minio/data:/data \
-v D:/docker/minio/config:/root/.minio \
minio/minio:RELEASE.2021-06-17T00-10-46Z server /data
Client
创建上有点坑,我也记录写下来过 详情查看过了段时间再看,发现官方的文档整理了,好多东西都没了,主要的代码Github也没有了 另一个官方文档
如下代码片段仅供参考(仅实现了上传对象及Tag)
func uploadHandler(ctx context.Context, path, keyName, tags string) error {
endpoint := global.GLO_CONF.S3EndPoint
accessKey := global.GLO_CONF.S3AccessKey
secretKey := global.GLO_CONF.S3SecretKey
bucket := global.GLO_CONF.S3Bucket
client, err := util.GetClient(ctx, endpoint, accessKey, secretKey)
if err != nil {
return errors.New("创建S3连接请求失败, " + err.Error())
}
if err = util.UPutObject(ctx, client, path, bucket, keyName); err != nil {
return err
}
if err = util.UPutTag(ctx, client, bucket, keyName, tags); err != nil {
return err
}
return nil
}
func GetClient(ctx context.Context, endpoint, accessKey, secretKey string) (*s3.Client, error) {
cfg, err := config.LoadDefaultConfig(
ctx,
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(accessKey, secretKey, "")),
config.WithEndpointResolverWithOptions(
aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
return aws.Endpoint{URL: endpoint}, nil
})),
config.WithRegion("us-east-1"),
)
if err != nil {
return nil, err
}
client := s3.NewFromConfig(cfg, func(o *s3.Options) {
o.UsePathStyle = true
o.EndpointOptions.DisableHTTPS = true
})
return client, nil
}
type S3PutObjectAPI interface {
PutObject(ctx context.Context,
params *s3.PutObjectInput,
optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error)
}
func PutFile(c context.Context, api S3PutObjectAPI, input *s3.PutObjectInput) (*s3.PutObjectOutput, error) {
return api.PutObject(c, input)
}
type S3PutTaggingAPI interface {
PutObjectTagging(ctx context.Context,
params *s3.PutObjectTaggingInput,
optFns ...func(*s3.Options)) (*s3.PutObjectTaggingOutput, error)
}
func putTag(c context.Context, api S3PutTaggingAPI, input *s3.PutObjectTaggingInput) (*s3.PutObjectTaggingOutput, error) {
return api.PutObjectTagging(c, input)
}
// UPutObject 上传对象
func UPutObject(ctx context.Context, client *s3.Client, path, bucket, key string) error {
file, err := os.Open(path)
if err != nil {
return errors.New("上传对象时打开文件失败, " + err.Error())
}
defer file.Close()
input := &s3.PutObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
Body: file,
}
_, err = PutFile(ctx, client, input)
if err != nil {
return errors.New("上传对象时发生错误, " + err.Error())
}
return nil
}
// UPutTag 上传标签
func UPutTag(ctx context.Context, client *s3.Client, bucket, key string, tags map[string]string) error {
input := &s3.PutObjectTaggingInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
}
index := 0
for k, v := range tags {
input.Tagging.TagSet[index] = types.Tag{Key: aws.String(k), Value: aws.String(v)}
index++
}
_, err := putTag(ctx, client, input)
if err != nil {
return errors.New("上传标签时发生错误, " + err.Error())
}
return nil
}
在实际实现过程中发现大文件在生成是比较耗时,而且创建文件的方式多种多样,一方面可以从监控的角度去解决,另一方面(我的方式),通过数据库的方式,这样刚好解决了上传超时或失败后的重传问题,多个协程之间通过 channel
来解决通信问题,最后代码主体结构变成了如下:
//init
...
//开启文件监控
go util.InitWatch(listeningPath)
//每隔INTERVAL 自动检测上传给对象存储服务器
go func() {
for {
timer := time.NewTimer(time.Second * time.Duration(global.GLO_CONF.Interval))
<-timer.C
core.UploadServe(global.GLO_DB)
}
}()
//每隔INTERVAL 转发完成信息
go func() {
for {
timer := time.NewTimer(time.Second * time.Duration(global.GLO_CONF.Interval))
<-timer.C
core.ScadaMesServe(global.GLO_DB)
}
}()
//删除数据库废弃数据, 超时时间: 30天
go func() {
for {
// 检查频率: 24小时
timer := time.NewTimer(time.Hour * time.Duration(24))
<-timer.C
core.CleanServe(global.GLO_DB)
}
}()
//阻塞等待文件变化并保存至数据库
for {
select {
case path := <-global.GLO_CH_FILE:
go core.Add2Cache(path, kmap)
case reqByte := <-global.GLO_CH_REQ:
go core.UpdateCache(reqByte)
}
}
当然也有别的方式,只是我没想到
本文作者:ZGGSONG
本文链接:https://www.zggsong.cn/archives/listen_file_upload_s3_with_golang.html
版权声明:本站所有未注明转载的文章均为原创,并采用CC BY-NV-SA 4.0授权协议,转载请注明来源