正文共:5455 字 5 图 预计阅读时间:14 分钟
Nothing is so fatiguing as the eternal hanging on of an uncompleted task.
没有哪一件事情比藏在心里的一件无法完成的事情更劳累的。
小闫语录:
心头积攒的琐事是阻碍你前进的罪魁祸首,尽管有些事、有些人难以释怀更难放下,但是你的心又有谁懂?放过自己,成全自己,做更好的自己。
GitHub地址:
https://github.com/EthanYan6/rpc_divide.git
为了方便大家查看,我将RPC相关的代码放在了GitHub上面,大家可以clone到本地进行查看。
历史文章导航:
RPC(一)
RPC(二)
RPC(三)
RPC(四)
RPC(五)
咱们前面已经将RPC消息数据构造好了,接下来呢,就可以通过网络在调用双方进行传递了。传递的方式常用的有两种,一种是TCP(传输控制协议),一种是HTTP。下面我们会逐一进行讲解。
由于它的可靠性,TCP成为了最常用的方式。而且我们可以直接借助socket工具进行TCP开发。下面我们回顾一些TCP编程的相关知识:
通讯过程:客户端和服务端建立连接,期间涉及到三次握手和四次挥手。
TCP服务端编写:
sock = socket.socket() # 创建一个套接字
sock.bind() # 绑定端口
sock.listen() # 监听连接
sock.accept() # 接受新连接
sock.close() # 关闭服务器套接字
TCP客户端编写:
sock = socket.socket() # 创建一个套接字
sock.connect() # 连接远程服务器
sock.recv() # 接收数据
sock.send() # 数据尽可能的发送
sock.sendall() # 数据完全发送
sock.close() # 关闭连接
大家应该有点疑惑,TCP处于传输层还好理解,可以用来实现RPC传输。HTTP可是在应用层的协议啊,用它来传输是不是有点大材小用,杀鸡用宰牛刀啊?暂且不考虑这个问题,我们只考虑能否实现的问题,HTTP基于TCP实现,而且HTTP协议已经实现了TCP的收发,它还是一个公共的标准,各种语言都提供了HTTP实现的工具,我们直接通过一些库使用就好了,最大的优点便在于此--方便。
具体怎么操作呢?我们可以将构造好的RPC消息数据嵌入到HTTP报文中的body部分,而对HTTP的path路径等都无需关心。如下:
HTTP/1.0 POST /
Content-Type: binary
Content-Length: 5096
# 此处放置RPC消息数据
HTTP的通讯效率不如TCP高,所以并不常用。
下面我们实现一个RPC的传输协议,先实现客户端传输工具:
class Channel(object):
"""
用于客户端建立网络连接
"""
def __init__(self, host, port):
"""
:param host: 服务器地址
:param port: 服务器端口号
"""
self.host = host
self.port = port
def get_connection(self):
"""
获取连接对象
:return: 与服务器通讯的socket
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((self.host, self.port))
return sock
class Server(object):
"""
RPC服务器
"""
def __init__(self, host, port, handlers):
# 创建socket的工具对象
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 设置socket,重用地址
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 绑定地址
self.host = host
self.port = port
sock.bind((self.host, self.port))
self.sock = sock
self.handlers = handlers
def serve(self):
"""
开启服务器运行,提供RPC服务
:return:
"""
# 1.开启服务器的监听,等待客户端的连接请求
self.sock.listen(128)
# 2.接受客户端的连接请求
while True:
client_sock, client_addr = self.sock.accept()
print('与客户端%s建立了链接' % str(client_addr))
# 3.交给ServerStub,完成客户端的具体的RPC调用请求
stub = ServerStub(client_sock, self.handlers)
try:
while True:
stub.process()
except EOFError:
# 表示客户端关闭了连接:
print('客户端关闭了连接')
client_sock.close()
class ClientStub(object):
"""
用来帮助客户端完成远程过程调用 RPC调用
stub = ClientStub()
stub.divide(200, 100)
框架是通用的,如果想实现加法,stub.add()类似
"""
def __init__(self, channel):
self.channel = channel
self.conn = self.channel.get_connection()
# 提供一个方法供客户端进行调用
def divide(self, num1, num2=1):
# 将调用的参数打包成消息协议的数据
proto = DivideProtocol()
args = proto.args_encode(num1, num2)
# 将消息数据通过网络发送给服务器
self.conn.sendall(args)
# 接收服务器返回的返回值消息数据,并进行解析
result = proto.result_decode(self.conn)
# 将结果值(正常float 或 异常InvalidOperation)返回给客户端
if isinstance(result, float):
# 正常
return result
else:
# 异常
raise result
def add(self):
pass
class ServerStub(object):
"""
帮助服务端完成远端过程调用
"""
def __init__(self, connection, handlers):
"""
:param connection: 与客户端的连接
:param handlers: 真正本地被调用的方法(函数 过程)
class Handlers:
@staticmethod
def divide(num1, num2=1):
pass
@staticmethod
def add():
pass
"""
self.conn = connection
self.method_proto = MethodProtocol(self.conn)
self.process_map = {
'divide': self._process_divide
}
self.handlers = handlers
def process(self):
"""
当服务端接受了一个客户端的连接,建立好连接后,完成远端调用处理
:return:
"""
# 1.接收消息数据,并解析方法的名字
name = self.method_proto.get_method_name()
# 2.根据解析获得的方法(过程)名,调用响应的过程协议,接收并解析消息数据
# self.process_map[name]()
_process = self.process_map[name]
_process()
def _process_divide(self):
"""
处理除法过程调用
:return:
"""
# 1.创建用于除法过程调用参数协议数据解析的工具
proto = DivideProtocol()
# 2.解析调用参数消息数据
args = proto.args_decode(self.conn)
# args = {"num1": xxx, "num2": xxx}
# 3.进行除法的本地过程调用
# 将本地调用过程的返回值(包括可能的异常)打包成消息协议数据,通过网络返回给客户端
try:
val = self.handlers.divide(**args)
except InvalidOpreation as e:
ret_message = proto.result_encode(e)
else:
ret_message = proto.result_encode(val)
self.conn.sendall(ret_message)
# def _process_add(self):
# pass
我们模拟一下服务器与本地调用。
创建一个 server.py
文件:
from services import InvalidOpreation
from services import Server
class Handlers:
@staticmethod
def divide(num1, num2=1):
"""
除法
:param num1: int
:param num2: int
:return: float
"""
# 增加判断操作,抛出自定义异常
if num2 == 0:
raise InvalidOpreation()
val = num1 / num2
return val
if __name__ == '__main__':
# 开启服务器
_server = Server('127.0.0.1', 8000, Handlers)
_server.serve()
创建一个 client.py
文件:
from services import ClientStub
from services import Channel
from services import InvalidOpreation
# 创建与服务器的连接
channel = Channel('127.0.0.1', '8000')
# 创建用于RPC调用的工具
stub = ClientStub(channel)
# 进行调用
try:
val = stub.divide(200,0)
except InvalidOpreation as e:
print(e.message)
else:
print(val)
上面就将RPC完整实现了,但是案例有个小缺陷,就是每次服务器只能处理一个客户端,一个处理完成之后才开始处理下一个客户端。为了提升服务器的性能呢,我们可以将其做成多线程版本的RPC服务器。
class ThreadServer(object):
"""
多线程RPC服务器
"""
def __init__(self, host, port, handlers):
# 创建socket的工具对象
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 设置socket,重用地址
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 绑定地址
self.host = host
self.port = port
sock.bind((self.host, self.port))
self.sock = sock
self.handlers = handlers
def serve(self):
"""
开启服务器运行,提供RPC服务
:return:
"""
# 1.开启服务器的监听,等待客户端的连接请求
self.sock.listen(128)
print('服务器开始监听')
# 2.接受客户端的连接请求
while True:
client_sock, client_addr = self.sock.accept()
print('与客户端%s建立了链接' % str(client_addr))
# 创建子线程处理这个客户端
t = threading.Thread(target=self.handle, args=(client_sock,))
# 开启子线程执行
t.start()
def handle(self, client_sock):
"""
子线程调用的方法,用来处理一个客户端的请求
:return:
"""
# 交给ServerStub,完成客户端的具体的RPC调用请求
stub = ServerStub(client_sock, self.handlers)
try:
while True:
stub.process()
except EOFError:
# 表示客户端关闭了连接:
print('客户端关闭了连接')
client_sock.close()
到此为止,RPC的简单实现就完成了。下一次文章将开始讲解分布式RPC的相关内容,尽情期待......
优质文章推荐:
redis操作命令总结
MySQL相关操作
SQL查询语句
前端中那些让你头疼的英文单词
Flask框架重点知识总结回顾
团队开发注意事项
浅谈密码加密
Django框架中的英文单词
Django中数据库的相关操作
DRF框架中的英文单词
DRF框架
Django相关知识点回顾
python技术面试题-腾讯