Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >FTP 文件传输服务

FTP 文件传输服务

作者头像
JadePeng
发布于 2021-12-10 06:05:16
发布于 2021-12-10 06:05:16
2.3K00
代码可运行
举报
运行总次数:0
代码可运行

昨晚心血来潮,尝试用python写了一个ftp文件传输服务,可以接收指令,从远程ftp服务器同步指定目录数据,最后没用上,开源出来。

https://github.com/jadepeng/ftp_transfer_service.git

运行原理

  • 'task_server' 是一个web服务器,可以接收传入任务,接收到任务后,将task写入mysql
  • 启动任务后,'task_server'会扫描ftp文件列表,写入redis队列
  • transfer_client 是传输执行程序,可以多点部署,该程序会读取redis队列,进行文件下载

使用

配置

修改 .env 文件, 配置mysql和redis地址

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
REDIS_SERVER=""
REDIS_PORT=6380
REDIS_PASSWORD=""
MYSQL_HOST=""
MYSQL_PORT=3306
MYSQL_PASSWORD=""
MYSQL_USER=""
MYSQL_DB=""

启动服务

server 端

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
python3 task_server.py

传输端,可以部署多个

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
python3 transfer_client.py

接收任务

POST /task/

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
{
  "taskId": "9",
  "serverPath": "/weblog",
  "storagePath": "/data",
  "host": "ftpServer",
  "port": 21,
  "user": "user",
  "password": "password"
}

启动传输

GET /task/{taskId}/start

查看进度

GET /task/{taskId}/progress

实现简介

第一次用fastapi来写web服务,这里记录下有意思的地方。

配置

可以通过配置类实现app的配置参数,pydantic还可以加载env文件更新配置

setting.py

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
from pydantic import BaseSettings


class APISettings(BaseSettings):
    mysql_host: str = "127.0.0.1"
    mysql_port: int = 3306
    mysql_password: str
    mysql_user: str
    mysql_db: str
    redis_server: str = "127.0.0.1"
    redis_port: int = 6380
    redis_password: str

    max_wait_time_count: int = 10

    class Config:
        env_file = ".env"
        env_file_encoding = 'utf-8'

redis 队列

通过list实现队列,rpush,blpop

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import redis

class RedisQueue(object):

    def __init__(self, name, namespace='queue', **redis_kwargs):
        self.__db= redis.Redis(**redis_kwargs)
        self.key = '%s:%s' %(namespace, name)

    def qsize(self):
        return self.__db.llen(self.key)  # 返回队列里面list内元素的数量

    def put(self, item):
        self.__db.rpush(self.key, item)  # 添加新元素到队列最右方

    def get_wait(self, timeout=None):
        item = self.__db.blpop(self.key, timeout=timeout)
        return item

    def get_nowait(self):
        item = self.__db.lpop(self.key)
        return item

redis BloomFilter

BloomFilter 可以用来去重

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import mmh3
import redis


class BloomFilter(object):
    def __init__(self, bf_key, bit_size=2000000, hash_count=4, start_seed=41, **redis_kwargs):
        self.bit_size = bit_size
        self.hash_count = hash_count
        self.start_seed = start_seed
        self.client = redis.Redis(**redis_kwargs)
        self.bf_key = bf_key

    def add(self, data):
        bit_points = self._get_hash_points(data)
        for index in bit_points:
            self.client.setbit(self.bf_key, index, 1)

    def madd(self, m_data):
        if isinstance(m_data, list):
            for data in m_data:
                self.add(data)
        else:
            self.add(m_data)

    def exists(self, data):
        bit_points = self._get_hash_points(data)
        result = [
            self.client.getbit(self.bf_key, index) for index in bit_points
        ]
        return all(result)

    def mexists(self, m_data):
        result = {}
        if isinstance(m_data, list):
            for data in m_data:
                result[data] = self.exists(data)
        else:
            result[m_data] = self.exists[m_data]
        return result

    def _get_hash_points(self, data):
        return [
            mmh3.hash(data, index) % self.bit_size
            for index in range(self.start_seed, self.start_seed +
                               self.hash_count)
        ]

python的orm框架sqlalchemy

sqlalchemy 需要先定义ORM

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
class TransferTask(Base):
    __tablename__ = 'transfer_task'

    taskId = Column(String(255), primary_key=True, index=True)
    serverPath = Column(String(255), nullable=False)
    storagePath = Column(String(255), nullable=False)
    host = Column(String(255), nullable=False)
    port = Column(Integer, nullable=False)
    user = Column(String(255), nullable=False)
    password = Column(String(255), nullable=False)
    time = Column(DateTime, nullable=False, default=datetime.now)

class TransferFailedFile(Base):
    __tablename__ = 'transfer_failed_file'
    id = Column(Integer, primary_key=True, index=True, autoincrement=True)
    taskId = Column(String(255), index=True)
    filePath = Column(String(1024), nullable=False)
    time = Column(DateTime, nullable=False, default=datetime.now)

class TransferProgress(Base):
    __tablename__ = 'transfer_task_progress'

    taskId = Column(String(255), primary_key=True, index=True)
    total = Column(Integer, nullable=False)
    status = Column(Integer, nullable=False)
    finished = Column(Integer, nullable=False)
    failed = Column(Integer, nullable=False)
    time = Column(DateTime, nullable=False, default=datetime.now)

if __name__ == '__main__':
    settings = APISettings()
    db = Database(settings.mysql_host, settings.mysql_port, settings.mysql_user, settings.mysql_password,
                  settings.mysql_db)
    Base.metadata.create_all(db.engine)

使用了sqlalchemy CRUD就比较方便了, 可以通过query,filter来查询和过滤

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 def get_or_create_progress(self, task: TransferTask):
        db = self.database.get_session()
        dbitem = db.query(TransferProgress).filter(TransferProgress.taskId == task.taskId).first()
        if not dbitem:
            dbitem = TransferProgress()
            dbitem.taskId = task.taskId
            dbitem.total = 0
            dbitem.status = TaskStatus.SCANNING.value
            dbitem.finished = 0
            dbitem.failed = 0
            db.add(dbitem)
            db.commit()
        return dbitem

这里需要注意的是,session需要close,不然session过多会报错,可以封装一个get_session,利用yield来自动释放

见database.py

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    def get_db(self):
        db = self.SessionLocal()
        try:
            yield db
        finally:
            db.close()

    def get_session(self):
        return next(self.get_db())

python ftp操作

python有个ftplib,可以用来操作ftp,这里简单封装一个client类, 实现listfiles和下载文件

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import ftplib
import os
from datetime import datetime
import ntpath


class FtpClient:

    def __init__(self, host: str, port: int, user: str, password: str):
        self.host = host
        self.port = port
        self.user = user
        self.password = password
        self.connect()

    def connect(self):
        self.ftp = ftplib.FTP()
        self.ftp.connect(host=self.host, port=self.port)
        self.ftp.login(self.user, self.password)
        self.ftp.encoding = "utf-8"

    def list_files(self, dir):
        self.ftp.cwd(dir)
        for file_data in self.ftp.mlsd():
            file_name, meta = file_data
            file_type = meta.get("type")
            if file_type == "file":
                try:
                    self.ftp.voidcmd("TYPE I")
                    file_size = self.ftp.size(file_name)
                    yield f"{dir}/{file_name}", file_size
                except Exception as e:
                    print(e)
            else:
                yield from self.list_files(dir + "/" + file_name)

    def download_file(self, file_name:str, local_file_name:str):
        try:
            self.ftp.retrbinary('RETR %s' % file_name, open(local_file_name, 'wb').write)
        except ftplib.error_perm:
            print('ERROR: cannot read file "%s"' % file_name)
            os.unlink(local_file_name)

下载程序

作为redis mq的消费者,要考虑的是下载失败了如何处理,异常退出如何处理?进度如何更新?

针对异常退出,这里用一个简单的方案,获取mq消息后,先将item写入到本地文件,这样如果client程序异常退出,下次进来还能继续 针对下载失败,这里失败后先重新放入队列,retryCount+1,如果超过最大重试次数,则写到错误记录。 进度更新,则依靠update+1执行。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def transfer_task_item(ftp, local_path, queque, task, task_item):
    try:
        local_file_name = local_path + task_item['fileName']
        print("transfer %s to %s" % (task_item['fileName'], local_file_name))

        # 文件已存在
        if os.path.exists(local_file_name):
            # 比较大小
            size = os.path.getsize(local_file_name)
            if size == task_item["fileSize"]:
                db_service.update_finished(task.taskId, 1)
                return

        dir = os.path.abspath(os.path.dirname(local_file_name))
        os.makedirs(dir, exist_ok=True)
        ftp.download_file(task_item['fileName'], local_file_name)
        # 更新进度
        db_service.update_finished(task.taskId, 1)
    except Exception as e:
        print(e)
        if task_item['retryCount'] < 3:
            task_item['retryCount'] = task_item['retryCount'] + 1
            queque.put(json.dumps(task_item))
        else:
            print(task_item['fileName'] + " transfer failed with max_retry_count")
            db_service.add_failed_file(task.taskId, task_item['fileName'])
    finally:
        remove_lock()
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-12-09 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
利用apache ftpserver搭建ftp服务器
二、usermanager采用mysql数据库管理用户时,ftpd-mysql.xml示例如下
用户7353950
2022/06/23
1.2K0
利用apache ftpserver搭建ftp服务器
Airflow自定义插件, 使用datax抽数
Airflow之所以受欢迎的一个重要因素就是它的插件机制。Python成熟类库可以很方便的引入各种插件。在我们实际工作中,必然会遇到官方的一些插件不足够满足需求的时候。这时候,我们可以编写自己的插件。不需要你了解内部原理,甚至不需要很熟悉Python, 反正我连蒙带猜写的。
Ryan-Miao
2019/10/01
3.3K0
Python面试题之Python面试题汇总
(1)与java相比:在很多方面,Python比Java要简单,比如java中所有变量必须声明才能使用,而Python不需要声明,用少量的代码构建出很多功能;(高效的高级数据结构)
Jetpropelledsnake21
2018/08/01
11.7K0
Python面试题之Python面试题汇总
用scrapy-redis爬去新浪-以及把数据存储到mysql\mongo
需求:爬取新浪网导航页(http://news.sina.com.cn/guide/)所有下所有大类、小类、小类里的子链接,以及子链接页面的新闻内容。
用户2337871
2019/07/19
1.3K0
用scrapy-redis爬去新浪-以及把数据存储到mysql\mongo
使用python实现的类似pt-deadlock-logger的死锁邮件告警
pt-deadlock-logger 用起来不太方便,主要是和我们的平台结合不够好,因此参考它的逻辑,我们使用python重新写了个类似功能。
保持热爱奔赴山海
2024/07/03
1220
SSH连接与自动化部署工具parami
paramiko是基于Python实现的SSH2远程安全连接,支持认证及密钥方法。可以实现远程命令执行,文件传输,中间SSH代理等功能,相对于Pexpect,封装层次更高。
py3study
2020/01/06
1.4K0
3 FTP文件传输服务
用户控制列表文件: /etc/vsftpd/ftpusers(黑名单)、/etc/vsftpd/user_list
py3study
2020/01/14
1.9K0
python-视频声音根据语音识别自动转为带时间的srt字幕文件
讯飞文字转写长语音只有5h免费,想要体验50000分钟白嫖的,看我另一篇文章 最近在看一些教程,发现没有字幕,网络上也没有匹配的,看着很别扭 因此我使用au处理了视频,得到了视频声音,wav格式,20多分钟长度 然后使用讯飞的语音识别接口识别了下,得到了每句话识别的文字和视频对应的时间 然后按照srt格式对其进行了输出 这样就能给那些没有字幕的视频自动添加字幕了 我的需求大致满足了,记录一下。
全栈程序员站长
2021/05/19
3.4K0
Python的flask:models.py来创建mysql数据库
西顾博客 用到的包 flask下有一个叫flask_sqlalchemy的数据库框架,没有安装的可以使用下面的来安装 $ pip install -i https://pypi.douban.com/simple/ flask-sqlalchemy flask就不多说了 $ pip install -i https://pypi.douban.com/simple/ flask pymysql $ pip install -i https://pypi.douban.com/simple/ pymysql
V站CEO-西顾
2018/06/17
1.2K0
Python 连接FTP服务器并实现文件夹下载实例演示,python区分ftp目录下文件和文件夹方法,ftp目录下包含中文名问题处理
[ 推荐文章 ] Python 地图篇 - 使用 pyecharts 绘制世界地图、中国地图、省级地图、市级地图实例详解
小蓝枣
2022/04/01
2.7K0
Python 连接FTP服务器并实现文件夹下载实例演示,python区分ftp目录下文件和文件夹方法,ftp目录下包含中文名问题处理
centos部署ftp服务_文件服务器搭建
vsftpd配置文件的默认路径是 /etc/vsftpd/vsftpd.conf。
全栈程序员站长
2022/10/01
1.8K0
matinal:python实现FTP文件上传
   通过python web server端上传大文件到FTP服务端,上传文件夹,下载文件等
matinal
2023/10/14
6280
实践应用|Python自动化连接FTP批量下载指定文件
有个小姐姐要从历史数据日志里根据一定的规则筛选一批数据,这批数据中有对局战场id字段,再根据这些id转化为文件名,连接远程FTP搜索该文件并下载到本地,然后打开文件删除前5行并在第6行行首添加一个字母,最后将该文件后缀名修改。 一天处理50+个这样的文件转化需求,简单算了下,差不多刚好要一天时间吧!!
可以叫我才哥
2021/08/05
1.2K0
[附录代码] shell脚本一步完成多层ssh跳转时的文件传输
transfer_file.sh #!/usr/bin/expect ############################################### # Author : PedroQin # Date : 2020-04-26 20:26:20 # Description : # Version : 1.0.0 ############################################### set timeout
PedroQin
2020/05/12
6430
FTP远程文件传输服务安装与配置
描述: FTP只通过TCP连接,没有用于FTP的UDP组件.FTP不同于其他服务的是它使用了两个端口, 一个数据端口和一个命令端口(或称为控制端口)。通常21端口是命令端口,20端口是数据端口。当混入主动(Active)/被动模式(Passive)的概念时,数据端口就有可能不是20了。
全栈工程师修炼指南
2022/09/29
2.1K0
FTP远程文件传输服务安装与配置
python ftp
完全是循环,目录的进行循环操作,而文件下载。最底层目录的文件下载完,回归上级目录。继续循环。
py3study
2020/01/09
9K0
基于Flask的Web应用开发
误入歧途
2025/03/14
1700
基于Flask的Web应用开发
FTP文件服务器
FTP (File transfer protocol) 是TCP/IP 协议组中的协议之一。他最主要的功能是在服务器与客户端之间进行文件的传输。FTP就是实现两台计算机之间的拷贝,从远程计算机拷贝文件至自己的计算机上,称之为“下载 (download)”文件。将文件从自己计算机中拷贝至远程计算机上,则称之为“上传(upload)”文件。这个古老的协议使用的是明码传输方式,且过去有相当多的安全危机历史。为了更安全的使用 FTP 协议,我们主要介绍较为安全但功能较少的 vsftpd(very secure File transfer protocol ) 这个软件。FTP是一个C/S类型的软件,FTP监听TCP端口号为21,数据端口为20。
星哥玩云
2022/09/15
22.9K0
python ftp上传文件 脚本
import ftplib # socket-based FTP tools
用户5760343
2022/05/13
5.3K0
相关推荐
利用apache ftpserver搭建ftp服务器
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验