前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用自定义协议实现Python向Netty传输数据

使用自定义协议实现Python向Netty传输数据

作者头像
书唐瑞
发布2022-06-02 14:48:22
1K0
发布2022-06-02 14:48:22
举报
文章被收录于专栏:Netty历险记

本篇文章,自定义一个数据协议,通过Python语言,使用这个自定义的数据协议,将数据发送给Netty接收端. 之所以使用两种不同的语言,也在说明,数据之间的传输与语言无关.只要发送端和接收端彼此遵守相同的协议即可. 关于协议,无处不在,比如与网络相关的HTTP协议, 比如向Redis发送命令使用的RESP协议,比如Dubbo消费者和提供者之间的数据传输,比如RocketMQ消费者与服务端之间的消息传输,比如JVM中使用jstack命令获取堆栈信息时所使用的协议,等等. 它们之间必然会有一套相关的协议,用于数据传输. 一切皆协议,世间协议再多,常见的协议也无外乎那么几个,在Netty中已经默认提供了相关常见协议的解码器.

代码语言:javascript
复制
// 基于固定长度
FixedLengthFrameDecoder
// 使用固定长度字段存储内容的长度
LengthFieldBasedFrameDecoder
// 基于换行符
LineBasedFrameDecoder
// 基于分隔符
DelimiterBasedFrameDecoder

比如HTTP协议,在它的请求头里面就使用Content-Length指明了请求正文的长度. 其实Dubbo,RocketMQ,Redis等它们也是类似思想实现的,这样的协议很常见很实用.

于是乎,Netty就默认实现了一种这样的协议解码器,即LengthFieldBasedFrameDecoder解码器.在之前的文章中有介绍它,这里就不介绍了.

而我们本篇自定义的协议也是和它类似的,如下所示

整个协议是由请求头(head)和请求体(body)两个部分组成, 请求头(head)用于存放请求体(body)的长度,请求体(body)是真正存放数据.

接下来就是通过代码演示环节

首先看下Python端(作为客户端,用于发送数据)

代码语言:javascript
复制
#! /usr/bin/env python
# -*- coding:utf-8 -*-

from socket import *
import struct
import json

class Book(object):

    def __init__(self, addr, year):
        self.addr = addr
        self.year = year

    def addr(self):
        return self.addr

    def keys(self):
        return ['addr', 'year']

    def __getitem__(self, item):
        return getattr(self, item)


if __name__ == '__main__':
    client = socket(AF_INET, SOCK_STREAM)
    # 连接服务端
    client.connect(('127.0.0.1', 8082))

    book = Book('成都', 2021)
    
    # 将对象book转成JSON格式的数据
    json = json.dumps(dict(book))
    # 将JSON格式数据转成字节类型,用于网络传输
    body = bytes(json, 'utf-8')
    # 计算实际有效数据(body)的长度
    body_len = len(body)

    # head中存body的长度
    # I表示整型,即用一个整型大小空间存储body的长度; 而>表示按照大端存储, 至于什么是大端小端存储可以Google
    head = struct.pack('>I', body_len)
    # 按照我们自定义的协议格式,将字节数据发送到服务端
    client.sendall(head + body)
    

再看一下Netty接收端

代码语言:javascript
复制
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class Server {


    public static void main(String[] args) throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(8);

        ServerBootstrap serverBootstrap = new ServerBootstrap();

        try {

            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) {
                            ChannelPipeline channelPipeline = ch.pipeline();
                            // 按照我们自定义的协议,将有效数据解码出来,解码成有效的字节数据
                            channelPipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
                            // 将解码出来的字节数据转换成实体对象
                            channelPipeline.addLast(new MyDecoder());
                            // 处理实体对象
                            channelPipeline.addLast(new ServerHandler());

                        }
                    });

            ChannelFuture channelFuture = serverBootstrap.bind("127.0.0.1", 8082).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
代码语言:javascript
复制

import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;


public class MyDecoder extends SimpleChannelInboundHandler<ByteBuf> {


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {

      // 进入到这个channelRead0方法就表明之前的LengthFieldBasedFrameDecoder解码器已经把有效body数据解码出来了, 这个channelRead0方法得到的就是body有效数据.
        byte[] bytes = new byte[msg.readableBytes()];
        msg.readBytes(bytes);

        String json = new String(bytes, "UTF-8");
        MyModel v = JSON.parseObject(json, MyModel.class);
        ctx.fireChannelRead(v);
    }
}
代码语言:javascript
复制
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;


public class ServerHandler extends SimpleChannelInboundHandler<MyModel> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MyModel msg) throws Exception {
        System.out.println(msg.getAddr());
    }
}

以上Netty代码的功能,如下图所示

先运行Netty服务端,再运行Python客户端,在控制台就可以看到Netty服务端接收到的数据.

简单录制了一个视频

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-10-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Netty历险记 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档