首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >python twisted TCP通信 脚本

python twisted TCP通信 脚本

作者头像
用户5760343
发布2022-05-14 10:28:29
发布2022-05-14 10:28:29
7920
举报
文章被收录于专栏:sktjsktj

-------------------server-------------------

!/usr/bin/python

coding=utf-8

from twisted.internet.protocol import Protocol from twisted.internet.protocol import Factory from twisted.internet.endpoints import TCP4ServerEndpoint from twisted.internet import reactor

clients=[]

class Spreader(Protocol):

def init(self,factory): self.factory=factory

def connectionMade(self): self.factory.numProtocols+=1 self.transport.write("欢迎,你是第{}客户".format(self.factory.numProtocols).encode("utf-8")) clients.append(self)

def connectionLost(self,reason): self.factory.numProtocols-=1 clients.remove(self) print("lost connect: %d" % (self.factory.numProtocols))

def dataReceived(self,data): if data=="close": self.transport.lostConnection() for client in clients: if client!=self: client.transport.write(data.encode("utf-8")) else: print(data)

class SpreadFactory(Factory):

def init(self): self.numProtocols=0

def buildProtocol(self,addr): return Spreader(self)

endpoint=TCP4ServerEndpoint(reactor,8007) endpoint.listen(SpreadFactory()) reactor.run()

-------------------------------------------------client------------------------

!/usr/bin/python

coding=utf-8

from twisted.internet.protocol import Protocol, ClientFactory from twisted.internet import reactor import threading import time import sys import datetime

class Echo(Protocol): def init(self): self.connected = False

代码语言:javascript
复制
def connectionMade(self):
    self.connected = True

def connectionLost(self, reason):
    self.connected = False

def dataReceived(self, data):
    print(data.decode("utf-8"))

class EchoClientFactory(ClientFactory): def init(self): self.protocol = None

代码语言:javascript
复制
def startedConnecting(self, connector):
    print("Start to Connect...")

def buildProtocol(self, addr):
    print("Connected...")
    self.protocol = Echo()
    return self.protocol

def clientConnectionLost(self, connector, reason):
    print("Lost connection. Reason: ", reason)

def clientConnectionFailed(self, connector, reason):
    print("Connection is failed, Reason: ", reason)

bStop = False

def routine(factory): while not bStop: if factory.protocol and factory.protocol.connected: factory.protocol.transport.write("hello, I'm {} {} ".format( sys.argv[0], datetime.datetime.now() ).encode("utf-8")) print(sys.argv[0], datetime.datetime.now()) time.sleep(5)

host = '127.0.0.1' port = 8007 factory = EchoClientFactory() reactor.connectTCP(host, port, factory) threading.Thread(target=routine, args=(factory,)).start() reactor.run() bStop = True

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-05-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • !/usr/bin/python
  • coding=utf-8
  • !/usr/bin/python
  • coding=utf-8
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档