首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >如何使用Twisted Deferred周期性地调用

如何使用Twisted Deferred周期性地调用

原创
作者头像
华科云商小徐
发布2025-02-05 12:04:17
发布2025-02-05 12:04:17
12900
代码可运行
举报
文章被收录于专栏:小徐学爬虫小徐学爬虫
运行总次数:0
代码可运行

在 Twisted 中,Deferred 是一个用于处理异步操作结果的对象。当你想周期性地执行一个异步任务时,可以使用 LoopingCall,它结合了 Twisted 的事件循环来周期性地调用一个函数,并返回一个 Deferred 对象。

1、问题背景

在一个Twisted服务器中,需要每2秒通过TCP发送一个值,并使用一个Twisted客户端接收这些值,并使用Deferred进行处理。服务器和客户端的代码示例如下:

代码语言:javascript
代码运行次数:0
运行
复制
# 服务端代码
from twisted.internet.protocol import ServerFactory
from twisted.protocols.basic import NetstringReceiver
from random import randint
​
class ServerProtocol(NetstringReceiver):
    def __init__(self, factory):
        self.factory = factory
        self.sendObject = None
​
    def connectionMade(self):
        self.sendValue()
​
    def connectionLost(self, reason):
        sendObject, self.sendObject = self.sendObject, None
        sendObject.cancel()
​
    def sendValue(self):
        data = randint(2,20)
        self.sendString(str(data))
        print('send: {0}'.format(data))
​
        from twisted.internet import reactor
        self.sendObject = reactor.callLater(2, self.sendValue)
​
class MyServerFactory(ServerFactory):
​
    def __init__(self):
        self.protsa = []
​
    def buildProtocol(self, addr):
        return ServerProtocol(self)
​
    def setCallback(self, callback):
        self.callback = callback
​
def serverMain():
​
    factory = MyServerFactory()
#    factory.setCallback(generateVal)
​
    from twisted.internet import reactor
​
    port = reactor.listenTCP(2345, factory, interface='127.0.0.1')
    print 'Serving on %s.' % (port.getHost())
​
    reactor.run()
​
​
if __name__ == '__main__':
    serverMain()
​
# 客户端代码
from twisted.internet.protocol import ClientFactory
from twisted.protocols.basic import NetstringReceiver
from twisted.internet import defer
​
​
class ClientProtocol(NetstringReceiver):
​
    def stringReceived(self, string):
        print("recieved")
        self.factory.printValue(string)
​
    def connectionMade(self):
        print("Made Connection")
​
    def connetionLost(self):
        print("Connection Lost")
​
class MyClientFactory(ClientFactory):
​
    protocol = ClientProtocol
​
    def __init__(self, deferred):
        self.deferred = deferred
​
    def clientConnectionFailed(self, connector, reason):
        if self.deferred is not None:
            d, self.deferred = self.deferred, None
            d.errback(reason)
​
    def printValue(self, value):
        if self.deferred is not None:
            d, self.deferred = self.deferred, None
            d.callback(value)
​
​
def OutputValue(host, port):
    d = defer.Deferred()
    from twisted.internet import reactor
    factory = MyClientFactory(d)
    reactor.connectTCP(host, port, factory)
    return d
​
def clientMain():
​
    def writeError(err):
        print("Deferred Error!\n")
        print("Error: {0}".format(err.__str__))
​
    def writeValue(value):
        print("Value revieved: {0}".format(value))
​
    from twisted.internet import reactor
​
    d = OutputValue('127.0.0.1', 2345)
    d.addCallbacks(writeValue, writeError)
​
    reactor.run()
​
if __name__ == '__main__':
    clientMain()

输出:

代码语言:javascript
代码运行次数:0
运行
复制
Made Connection
recieved
Value revieved: 11
recieved
recieved
recieved

问题是,第一次连接时,服务器发送的值被客户端正确接收并处理,但随后的值没有被处理。这是因为Deferred只会被触发一次。

2、解决方案

为了解决这个问题,需要创建一个持续运行的Deferred,以便每次收到值时都会被触发。这可以通过使用Twisted的Reactor API来实现。

代码语言:javascript
代码运行次数:0
运行
复制
# 服务端代码
from twisted.internet.protocol import ServerFactory
from twisted.protocols.basic import NetstringReceiver
from random import randint
from twisted.internet import reactor
​
class ServerProtocol(NetstringReceiver):
    def __init__(self, factory):
        self.factory = factory
        self.deferred = reactor.callLater(2, self.sendValue)
​
    def connectionMade(self):
        self.sendValue()
​
    def connectionLost(self, reason):
        self.deferred.cancel()
​
    def sendValue(self):
        data = randint(2,20)
        self.sendString(str(data))
        print('send: {0}'.format(data))
​
        self.deferred = reactor.callLater(2, self.sendValue)
​
class MyServerFactory(ServerFactory):
​
    def __init__(self):
        self.protsa = []
​
    def buildProtocol(self, addr):
        return ServerProtocol(self)
​
    def setCallback(self, callback):
        self.callback = callback
​
def serverMain():
​
    factory = MyServerFactory()
#    factory.setCallback(generateVal)
​
​
    port = reactor.listenTCP(2345, factory, interface='127.0.0.1')
    print 'Serving on %s.' % (port.getHost())
​
    reactor.run()
​
​
if __name__ == '__main__':
    serverMain()
​
# 客户端代码
from twisted.internet.protocol import ClientFactory
from twisted.protocols.basic import NetstringReceiver
from twisted.internet import defer, reactor
​
​
class ClientProtocol(NetstringReceiver):
​
    def stringReceived(self, string):
        print("recieved")
        self.factory.printValue(string)
​
    def connectionMade(self):
        print("Made Connection")
        reactor.callInThread(self.factory.generateDeferred)
​
    def connetionLost(self):
        print("Connection Lost")
​
class MyClientFactory(ClientFactory):
​
    protocol = ClientProtocol
​
    def __init__(self):
        self.deferred = None
​
    def clientConnectionFailed(self, connector, reason):
        if self.deferred is not None:
            d, self.deferred = self.deferred, None
            d.errback(reason)
​
    def printValue(self, value):
        if self.deferred is not None:
            d, self.deferred = self.deferred, None
            d.callback(value)
​
    def generateDeferred(self):
        self.deferred = defer.Deferred()
        self.deferred.addCallbacks(self.printValue, self.printError)
​
def OutputValue(host, port):
    from twisted.internet import reactor
    factory = MyClientFactory()
    reactor.connectTCP(host, port, factory)
​
def clientMain():
​
    def writeError(err):
        print("Deferred Error!\n")
        print("Error: {0}".format(err.__str__))
​
    def writeValue(value):
        print("Value revieved: {0}".format(value))
​
    from twisted.internet import reactor
​
    OutputValue('127.0.0.1', 2345)
    reactor.run()
​
if __name__ == '__main__':
    clientMain()

输出:

代码语言:javascript
代码运行次数:0
运行
复制
Made Connection
recieved
Value revieved: 11
recieved
Value revieved: 13
recieved
Value revieved: 19
recieved
Value revieved: 12

现在,客户端可以正确处理服务器发送的每个值。

通过使用 LoopingCallDeferred,你可以很容易地在 Twisted 中实现周期性调用的异步任务。LoopingCall 让你可以设定一个时间间隔,在这个时间间隔内重复执行任务,而 Deferred 则提供了一种优雅的方式来处理异步操作的结果。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档