在 Twisted 中,Deferred
是一个用于处理异步操作结果的对象。当你想周期性地执行一个异步任务时,可以使用 LoopingCall
,它结合了 Twisted 的事件循环来周期性地调用一个函数,并返回一个 Deferred
对象。
1、问题背景
在一个Twisted服务器中,需要每2秒通过TCP发送一个值,并使用一个Twisted客户端接收这些值,并使用Deferred进行处理。服务器和客户端的代码示例如下:
# 服务端代码
from twisted.internet.protocol import ServerFactory
from twisted.protocols.basic import NetstringReceiver
from random import randint
class ServerProtocol(NetstringReceiver):
def __init__(self, factory):
self.factory = factory
self.sendObject = None
def connectionMade(self):
self.sendValue()
def connectionLost(self, reason):
sendObject, self.sendObject = self.sendObject, None
sendObject.cancel()
def sendValue(self):
data = randint(2,20)
self.sendString(str(data))
print('send: {0}'.format(data))
from twisted.internet import reactor
self.sendObject = reactor.callLater(2, self.sendValue)
class MyServerFactory(ServerFactory):
def __init__(self):
self.protsa = []
def buildProtocol(self, addr):
return ServerProtocol(self)
def setCallback(self, callback):
self.callback = callback
def serverMain():
factory = MyServerFactory()
# factory.setCallback(generateVal)
from twisted.internet import reactor
port = reactor.listenTCP(2345, factory, interface='127.0.0.1')
print 'Serving on %s.' % (port.getHost())
reactor.run()
if __name__ == '__main__':
serverMain()
# 客户端代码
from twisted.internet.protocol import ClientFactory
from twisted.protocols.basic import NetstringReceiver
from twisted.internet import defer
class ClientProtocol(NetstringReceiver):
def stringReceived(self, string):
print("recieved")
self.factory.printValue(string)
def connectionMade(self):
print("Made Connection")
def connetionLost(self):
print("Connection Lost")
class MyClientFactory(ClientFactory):
protocol = ClientProtocol
def __init__(self, deferred):
self.deferred = deferred
def clientConnectionFailed(self, connector, reason):
if self.deferred is not None:
d, self.deferred = self.deferred, None
d.errback(reason)
def printValue(self, value):
if self.deferred is not None:
d, self.deferred = self.deferred, None
d.callback(value)
def OutputValue(host, port):
d = defer.Deferred()
from twisted.internet import reactor
factory = MyClientFactory(d)
reactor.connectTCP(host, port, factory)
return d
def clientMain():
def writeError(err):
print("Deferred Error!\n")
print("Error: {0}".format(err.__str__))
def writeValue(value):
print("Value revieved: {0}".format(value))
from twisted.internet import reactor
d = OutputValue('127.0.0.1', 2345)
d.addCallbacks(writeValue, writeError)
reactor.run()
if __name__ == '__main__':
clientMain()
输出:
Made Connection
recieved
Value revieved: 11
recieved
recieved
recieved
问题是,第一次连接时,服务器发送的值被客户端正确接收并处理,但随后的值没有被处理。这是因为Deferred只会被触发一次。
2、解决方案
为了解决这个问题,需要创建一个持续运行的Deferred,以便每次收到值时都会被触发。这可以通过使用Twisted的Reactor API来实现。
# 服务端代码
from twisted.internet.protocol import ServerFactory
from twisted.protocols.basic import NetstringReceiver
from random import randint
from twisted.internet import reactor
class ServerProtocol(NetstringReceiver):
def __init__(self, factory):
self.factory = factory
self.deferred = reactor.callLater(2, self.sendValue)
def connectionMade(self):
self.sendValue()
def connectionLost(self, reason):
self.deferred.cancel()
def sendValue(self):
data = randint(2,20)
self.sendString(str(data))
print('send: {0}'.format(data))
self.deferred = reactor.callLater(2, self.sendValue)
class MyServerFactory(ServerFactory):
def __init__(self):
self.protsa = []
def buildProtocol(self, addr):
return ServerProtocol(self)
def setCallback(self, callback):
self.callback = callback
def serverMain():
factory = MyServerFactory()
# factory.setCallback(generateVal)
port = reactor.listenTCP(2345, factory, interface='127.0.0.1')
print 'Serving on %s.' % (port.getHost())
reactor.run()
if __name__ == '__main__':
serverMain()
# 客户端代码
from twisted.internet.protocol import ClientFactory
from twisted.protocols.basic import NetstringReceiver
from twisted.internet import defer, reactor
class ClientProtocol(NetstringReceiver):
def stringReceived(self, string):
print("recieved")
self.factory.printValue(string)
def connectionMade(self):
print("Made Connection")
reactor.callInThread(self.factory.generateDeferred)
def connetionLost(self):
print("Connection Lost")
class MyClientFactory(ClientFactory):
protocol = ClientProtocol
def __init__(self):
self.deferred = None
def clientConnectionFailed(self, connector, reason):
if self.deferred is not None:
d, self.deferred = self.deferred, None
d.errback(reason)
def printValue(self, value):
if self.deferred is not None:
d, self.deferred = self.deferred, None
d.callback(value)
def generateDeferred(self):
self.deferred = defer.Deferred()
self.deferred.addCallbacks(self.printValue, self.printError)
def OutputValue(host, port):
from twisted.internet import reactor
factory = MyClientFactory()
reactor.connectTCP(host, port, factory)
def clientMain():
def writeError(err):
print("Deferred Error!\n")
print("Error: {0}".format(err.__str__))
def writeValue(value):
print("Value revieved: {0}".format(value))
from twisted.internet import reactor
OutputValue('127.0.0.1', 2345)
reactor.run()
if __name__ == '__main__':
clientMain()
输出:
Made Connection
recieved
Value revieved: 11
recieved
Value revieved: 13
recieved
Value revieved: 19
recieved
Value revieved: 12
现在,客户端可以正确处理服务器发送的每个值。
通过使用 LoopingCall
和 Deferred
,你可以很容易地在 Twisted 中实现周期性调用的异步任务。LoopingCall
让你可以设定一个时间间隔,在这个时间间隔内重复执行任务,而 Deferred
则提供了一种优雅的方式来处理异步操作的结果。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。