Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Python 基于队列实现 tcp socket 连接池

Python 基于队列实现 tcp socket 连接池

作者头像
授客
发布于 2025-05-03 10:26:50
发布于 2025-05-03 10:26:50
8900
代码可运行
举报
文章被收录于专栏:授客的专栏授客的专栏
运行总次数:0
代码可运行

连接池实现

socket_pool.py

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# -*- coding:utf-8 -*-
import socket
import time
import threading
import os
import logging
import traceback
from queue import Queue, Empty

_logger = logging.getLogger('mylogger')

class SocketPool:
    def __init__(self, host, port, min_connections=10, max_connections=10):
        '''
        初始化Socket连接池
        :param host: 目标主机地址
        :param port: 目标端口号
        :param min_connections: 最小连接数
        :param max_connections: 最大连接数
        '''
        self.host = host
        self.port = port
        self.min_connections = min_connections
        self.max_connections = max_connections
        self.busy_sockets_dict = {} # 存放从连接池取出的socket的id
        self._sock_lock = threading.Lock()  # 线程锁保证计数正确
        self._pool = Queue(max_connections)  # 基于线程安全的队列存储连接
        self._lock = threading.Lock()        # 线程锁保证资源安全:
        self._init_pool()                    # 预创建连接
        self._start_health_check()           # 启动连接健康检查线程

    def _init_pool(self):
        '''预创建连接并填充到池中'''
        
        for _ in range(self.min_connections):
            sock = self._create_socket()
            self._pool.put(sock)

    def _create_socket(self):
        '''创建新的Socket连接'''
        
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            sock.connect((self.host, self.port))
            return sock
        except socket.error as e:
            raise ConnectionError(f'Failed to connect: {e}')  # 连接失败抛出异常

    def _start_health_check(self):
        '''启动后台线程定期检查连接有效性'''
        
        def check():
            while True:
                with self._lock:
                    for _ in range(self._pool.qsize()):
                        sock = self._pool.get()
                        self.busy_sockets_dict[sock] = 1
                        try:
                            sock.send(b'PING<END>')  # 发送心跳包验证连接状态
                            # 以下 11 为服务端返回数据字节长度,不能乱写,否则会导致获取非健康检查响应报文数据存在多余内容,不符合格式,从而导致数据解析问题
                            sock.recv(11)
                            self._pool.put(sock)
                            self.busy_sockets_dict.pop(sock)
                        except (socket.error, ConnectionResetError):
                            _logger.error('socket连接健康检查出错:%s, 关闭失效连接并创建新连接替换' % traceback.format_exc())
                            sock.close()  # 关闭失效连接并创建新连接替换
                            self.busy_sockets_dict.pop(sock)

                            new_sock = self._create_socket()
                            self._pool.put(new_sock)
                    
                    # 如果sock数量小于最小数量,则补充
                    for _ in range(0, self.min_connections - self._pool.qsize()):
                        new_sock = self._create_socket()
                        self._pool.put(new_sock)
                time.sleep(60)  # 每60秒检查一次
        threading.Thread(target=check, daemon=True).start()

    def get_connection(self):
        '''
        从池中获取一个可用连接
        :return: socket对象
        '''
        
        with self._sock_lock:
            if self._pool.empty():
                if len(self.busy_sockets_dict.keys()) < self.max_connections:
                    new_sock = self._create_socket()
                    self.busy_sockets_dict[new_sock] = 1
                    return new_sock
                else:
                    raise Empty('No available connections in pool')
            else:
                try:
                    sock = self._pool.get(block=False)
                    self.busy_sockets_dict[sock] = 1
                    return sock
                except Exception:
                    _logger.error('获取socket连接出错:%s' % traceback.format_exc())
                    raise
               

    def release_connection(self, sock):
        '''
        将连接归还到池中
        :param sock: 待归还的socket对象
        '''
        if not sock._closed:
            self._pool.put(sock)
        if sock in self.busy_sockets_dict:
            self.busy_sockets_dict.pop(sock)


    def close_all(self):
        '''关闭池中所有连接'''
        
        while not self._pool.empty():
            sock = self._pool.get()
            sock.close()
            self.busy_sockets_dict.pop(sock.id)
        self.busy_sockets_dict = {} # 兜底

host = os.environ.get('MODBUS_TCP_SERVER_HOST', '127.0.0.1')
port = int(os.environ.get('MODBUS_TCP_SERVER_PORT', '9000'))
min_connections = int(os.environ.get('DJANGO_SOCKET_POOL_MAX_CONNECTIONS', '10'))
max_connections = int(os.environ.get('DJANGO_SOCKET_POOL_MAX_CONNECTIONS', '100'))
socketPool = SocketPool(host, port, min_connections, max_connections)

使用连接池

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
from socket_pool import socketPool

def send_socket_msg(data):
    global socketPool
    
    try:
        sock = None
        # 获取连接(支持超时控制)
        sock = socketPool.get_connection()
        # 发送数据
        sock.sendall(data.encode('utf-8'))
    except Exception:
        error_msg = '发送消息出错:%s' % traceback.format_exc()
        _logger.error(error_msg)
        
        if sock is not None:
            sock.close()
            socketPool.release_connection(sock)
        return send_socket_msg(data)
    
    response = ''
    try:
        while True:
            chunk = sock.recv(4096)
            chunk = chunk.decode('utf-8')
            response += chunk
            if response.endswith('<END>'):
                response = response.rstrip('<END>')
                return {'success':True, 'message':response}
    except Exception:
        error_msg = '获取消息出错:%s' % traceback.format_exc()
        _logger.error(error_msg)
        return {'success':False, 'message': error_msg}
    finally:
        # 必须归还连接!
        socketPool.release_connection(sock)
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2025-05-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
python codis集群客户端(一) - 基于客户端daemon探活与服务列表维护
在使用codis时候,我们遇到的场景是,公司提供了HA的Proxy(例如N个),但是不暴露zookeeper(也就是说没有codis后端服务列表)。 如果暴露zk的话,可以看这一篇 要求在开发客户端a
用户1225216
2018/03/05
1.6K0
python codis集群客户端(一) - 基于客户端daemon探活与服务列表维护
python codis集群客户端(二) - 基于zookeeper对实例创建与摘除
 在这一篇中我们实现了不通过zk来编写codis集群proxys的api, 如果codis集群暴露zk给你的话,那么就方便了,探活和故障摘除与恢复codis集群都给你搞定了,你只需要监听zookeeper中实例的状态就好了。 下面看我的实现。 1、CodisByZKPool.py 这里通过zk读取并初始化pool_shards,简单说一下如何故障摘除和恢复 1)我们监听zk中节点状态改变,当发现某个实例对应的节点状态变化了,比如DELETE了,那么我们认为这个实例挂了,我们就会重新_create_pool刷
用户1225216
2018/03/05
1.7K2
python语言中的AOP利器:装饰器
一、前言 面向切面编程(AOP)是一种编程思想,与OOP并不矛盾,只是它们的关注点相同。面向对象的目的在于抽象和管理,而面向切面的目的在于解耦和复用。 举两个大家都接触过的AOP的例子: 1)java中mybatis的@Transactional注解,大家知道被这个注解注释的函数立即就能获得DB的事务能力。 2)python中的with threading.Lock(),大家知道,被这个with代码块包裹的部分立即获得同步的锁机制。 这样我们把事务和加锁这两种与业务无关的逻辑抽象出来,在逻辑上解耦,并且可以
用户1225216
2018/03/29
2.2K0
Python Redis 客户端连接池解析
Python Redis 的客户端使用了链接池机制,通过复用链接可以减低服务器的压力并在失败时重试。连接池其实是一种很通用的机制,在实现客户端是是一个经常需要(或许其实不需要)重复发明的轮子。
爬虫技术学习
2023/02/10
2.4K0
Python Redis 客户端连接池解析
python多线程socket编程--多
Python中实现socket通信的服务端比较复杂,而客户端非常简单,所以客户端基本上都是用sockct模块实现,而服务 端用有很多模块可以使用,如下:
py3study
2020/01/10
1.4K0
python多线程socket编程--多
Python从入门到入土-web应用开发
Django是高水准的Python编程语言驱动的一个开源模型.视图,控制器风格的Web应用程序框架,它起源于开源社区。使用这种架构,程序员可以方便、快捷地创建高品质、易维护、数据库驱动的应用程序。 这也正是OpenStack的Horizon组件采用这种架构进行设计的主要原因。另外,在Dj ango框架中,还包含许多功能强大的第三方插件,使得Django具有较强的可扩展性 。 Django 项目源自一个在线新闻 Web 站点,于 2005 年以开源的形式被释放出来。Django 框架的核心组件有:
共饮一杯无
2022/11/28
8170
python_socket
http://blog.csdn.net/rebelqsp/article/details/22109925
py3study
2020/01/10
3640
python redis链接建立实现分析
  今天在写zabbix storm job监控脚本的时候用到了python的redis模块,之前也有用过,但是没有过多的了解,今天看了下相关的api和源码,看到有ConnectionPool的实现,这里简单说下。 在ConnectionPool之前,如果需要连接redis,我都是用StrictRedis这个类,在源码中可以看到这个类的具体解释:
py3study
2020/01/06
1.1K0
python socket学习
[root@localhost example]# python echoclient1.py
py3study
2020/01/07
4270
python network programming tutorial
该文章讲述了如何使用Python实现一个简单的TCP聊天服务器。
s1mba
2017/12/28
8040
python network programming tutorial
python简单socket
clinet端 #! /usr/bin/env python # -*-coding: utf-8-*- #这段代码是clinet端 import socket   #for sockets import sys  #for exit try:     #create an AF_INET, STREAM socket (TCP)     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) except socket.error, msg:
py3study
2020/01/13
3160
SocketServer 源码分析
RequestHandlerClass 注册 handle 函数。 finish_request 中实例化,调用用户定义的 handle 函数
py3study
2020/01/11
1.7K0
python asynchrous network
select,poll,epoll test in python selecttest.py import select import socket import Queue #create a socket server=socket.socket(socket.AF_INET,socket.SOCK_STREAM) server.setblocking(False) #set option reused server.setsockopt(socket.SOL_SOCKET,socket.SO_REUS
pydata
2018/08/02
2460
Python 标准类库 - 因特网协议与支持之socketserver
1 通过子类化BaseRequestHandler 类,创建一个请求处理程序,并且重写handle()方法,该方法将处理接收到的请求
授客
2019/09/10
6530
Python 标准类库 - 因特网协议与支持之socketserver
[源码分析] 消息队列 Kombu 之 Hub
本系列我们介绍消息队列 Kombu。Kombu 的定位是一个兼容 AMQP 协议的消息队列抽象。通过本文,大家可以了解 Kombu 中的 Hub 概念。
罗西的思考
2021/03/17
8400
[源码分析] 消息队列 Kombu 之 Hub
Python Socket编程Python Socket编程
Python Socket编程 在使用Python做socket编程时,由于需要使用阻塞(默认)的方式来读取数据流,此时对于数据的结束每次都需要自己处理,太麻烦。并且网上也没找到太好的封装,所以就自己写了个简单的封装。 封装思路 客户端每次请求均发送一个 SocketRequest 对象,其中封装具体的数据,这里使用json。对于要发送的数据,会自动添加一个结束符标识(EOF = '0x00')。 服务器端接收数据时,根据结束符标识来生成完整的数据,并解包成 SocketRequest 对象。 服务器端根据
kongxx
2018/05/14
2.4K0
关于wsgi协议的理解
首先要了解 WSGI 规范的概念,WSGI(Web Server Gateway Interface)规范描述了web server(Gunicorn,uWSGI等)如何与web application(flask, django等)交互、web application如何处理请求,定义在 pep 3333。正是有了 WSGI 规范,我们才能在任意 web server 上跑各种 web 应用。WSGI API 定义看起来很简单:
步履不停凡
2019/09/11
5680
Prometheus 基于Python Django实现Prometheus Exporter
运行监控需求,需要采集Nginx 每个URL请求的相关信息,涉及两个指标:一分钟内平均响应时间,调用次数,并且为每个指标提供3个标签:请求方法,请求状态,请求URL,并向普罗米修斯暴露这些指标相关数据
授客
2021/10/21
1.6K0
TCP聊天服务器套接字|PyQt5+socket(TCP端口映射+端口放行)+logging+Thread(含日志,html)+anaconda打包32位exe(3.4万字)|python高阶
Python 3.8.6  [MSC v.1927 64 bit (AMD64)] on win32
zmh-program
2023/02/06
1.1K0
Python socket.help M
DESCRIPTION This module provides socket operations and some related functions. On Unix, it supports IP (Internet Protocol) and Unix domain sockets. On other systems, it only supports IP. Functions specific for a socket are available as meth
py3study
2020/01/03
6910
相关推荐
python codis集群客户端(一) - 基于客户端daemon探活与服务列表维护
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验