前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >java protostuff 序列化_使用Protostuff序列化

java protostuff 序列化_使用Protostuff序列化

作者头像
全栈程序员站长
发布2022-08-22 15:51:47
1.2K0
发布2022-08-22 15:51:47
举报
文章被收录于专栏:全栈程序员必看

大家好,又见面了,我是你们的朋友全栈君。

rpc调用,有多种序列化的方式,通用如json,mongodb使用的bson;java方面的,比如Java默认的序列化,比如hessian;还有跨语言的,比如thrift、protocolbuf。thrift和pb的好处是序列化后size比较小,但是缺点是得生成java代码,这个挺鸡肋的,所以不管二者运行时效率有多高,开发效率相对比较低的。像hessian,是有一些在用,但是感觉不如pb那样强大。所以也一直在寻找运行效率与开发效率兼得的序列化方式。偶尔在网上看到protostuff,觉得找到了一直在找的这种序列化方式。

protostuff简介

protobuf的一个缺点是需要数据结构的预编译过程,首先要编写.proto格式的配置文件,再通过protobuf提供的工具生成各种语言响应的代码。由于java具有反射和动态代码生成的能力,这个预编译过程不是必须的,可以在代码执行时来实现。有protostuff已经实现了这个功能。

protostuff效率

Ser Time+Deser Time (ns)

Size, Compressed size [light] in bytes

使用

pom依赖

com.dyuproject.protostuff

protostuff-core

1.0.8

com.dyuproject.protostuff

protostuff-runtime

1.0.8

工具类

public class SerializationUtil {

private static Map, Schema>> cachedSchema = new ConcurrentHashMap, Schema>>();

private static Objenesis objenesis = new ObjenesisStd(true);

private static Schema getSchema(Class clazz) {

@SuppressWarnings(“unchecked”)

Schema schema = (Schema) cachedSchema.get(clazz);

if (schema == null) {

schema = RuntimeSchema.getSchema(clazz);

if (schema != null) {

cachedSchema.put(clazz, schema);

}

}

return schema;

}

/**

* 序列化

*

* @param obj

* @return

*/

public static byte[] serializer(T obj) {

@SuppressWarnings(“unchecked”)

Class clazz = (Class) obj.getClass();

LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);

try {

Schema schema = getSchema(clazz);

return ProtostuffIOUtil.toByteArray(obj, schema, buffer);

} catch (Exception e) {

throw new IllegalStateException(e.getMessage(), e);

} finally {

buffer.clear();

}

}

/**

* 反序列化

*

* @param data

* @param clazz

* @return

*/

public static T deserializer(byte[] data, Class clazz) {

try {

T obj = objenesis.newInstance(clazz);

Schema schema = getSchema(clazz);

ProtostuffIOUtil.mergeFrom(data, obj, schema);

return obj;

} catch (Exception e) {

throw new IllegalStateException(e.getMessage(), e);

}

}

}

基于netty的rpc

NettyServer

public class NettyServer {

private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);

private int ioThreadNum;

//内核为此套接口排队的最大连接个数,对于给定的监听套接口,内核要维护两个队列,未链接队列和已连接队列大小总和最大值

private int backlog;

private int port;

private Channel channel;

private EventLoopGroup bossGroup;

private EventLoopGroup workerGroup;

public NettyServer(int ioThreadNum, int backlog, int port) {

this.ioThreadNum = ioThreadNum;

this.backlog = backlog;

this.port = port;

}

public void start() throws InterruptedException {

bossGroup = new NioEventLoopGroup();

workerGroup = new NioEventLoopGroup(this.ioThreadNum);

final Map demoService = new HashMap();

demoService.put(“com.codecraft.service.HelloService”, new HelloServiceImpl());

ServerBootstrap serverBootstrap = new ServerBootstrap();

serverBootstrap.group(bossGroup, workerGroup)

.channel(NioServerSocketChannel.class)

.option(ChannelOption.SO_BACKLOG, backlog)

//注意是childOption

.childOption(ChannelOption.SO_KEEPALIVE, true)

.childOption(ChannelOption.TCP_NODELAY, true)

.childHandler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel socketChannel) throws Exception {

socketChannel.pipeline()

.addLast(new RpcDecoder(RpcRequest.class))

.addLast(new RpcEncoder(RpcResponse.class))

.addLast(new ServerRpcHandler(demoService));

}

});

channel = serverBootstrap.bind(“127.0.0.1”,port).sync().channel();

logger.info(“NettyRPC server listening on port “+ port + ” and ready for connections…”);

Runtime.getRuntime().addShutdownHook(new Thread(){

@Override

public void run(){

//do shutdown staff

}

});

}

public void stop() {

if (null == channel) {

throw new ServerStopException();

}

bossGroup.shutdownGracefully();

workerGroup.shutdownGracefully();

channel.closeFuture().syncUninterruptibly();

bossGroup = null;

workerGroup = null;

channel = null;

}

}

ServerRpcHandler

public class ServerRpcHandler extends SimpleChannelInboundHandler {

private static final Logger logger = LoggerFactory.getLogger(ServerRpcHandler.class);

private final Map serviceMapping;

public ServerRpcHandler(Map serviceMapping) {

this.serviceMapping = serviceMapping;

}

@Override

protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest) throws Exception {

RpcResponse response = new RpcResponse();

response.setTraceId(rpcRequest.getTraceId());

try {

logger.info(“server handle request:{}”,rpcRequest);

Object result = handle(rpcRequest);

response.setResult(result);

} catch (Throwable t) {

response.setError(t);

}

channelHandlerContext.writeAndFlush(response);

}

private Object handle(RpcRequest request) throws Throwable {

String className = request.getClassName();

Object serviceBean = serviceMapping.get(className);

Class> serviceClass = serviceBean.getClass();

String methodName = request.getMethodName();

Class>[] parameterTypes = request.getParameterTypes();

Object[] parameters = request.getParameters();

FastClass serviceFastClass = FastClass.create(serviceClass);

FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);

return serviceFastMethod.invoke(serviceBean, parameters);

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {

logger.error(cause.getMessage(), cause);

RpcResponse response = new RpcResponse();

if(cause instanceof ServerException){

response.setTraceId(((ServerException) cause).getTraceId());

}

response.setError(cause);

ctx.writeAndFlush(response);

}

}

NettyClient

public class NettyClient implements IClient {

private EventLoopGroup workerGroup;

private Channel channel;

private int workerGroupThreads;

private ClientRpcHandler clientRpcHandler;

private final Optional> NO_TIMEOUT = Optional.>absent();

public NettyClient(int workerGroupThreads) {

this.workerGroupThreads = workerGroupThreads;

}

public void connect(InetSocketAddress socketAddress) {

workerGroup = new NioEventLoopGroup(workerGroupThreads);

clientRpcHandler = new ClientRpcHandler();

Bootstrap bootstrap = new Bootstrap();

bootstrap

.group(workerGroup)

.channel(NioSocketChannel.class)

.option(ChannelOption.SO_KEEPALIVE, true)

.option(ChannelOption.TCP_NODELAY, true)

.handler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ch.pipeline()

.addLast(new RpcDecoder(RpcResponse.class))

.addLast(new RpcEncoder(RpcRequest.class))

.addLast(clientRpcHandler);

}

});

channel = bootstrap.connect(socketAddress.getAddress().getHostAddress(), socketAddress.getPort())

.syncUninterruptibly()

.channel();

}

public RpcResponse syncSend(RpcRequest request) throws InterruptedException {

System.out.println(“send request:”+request);

channel.writeAndFlush(request).sync();

return clientRpcHandler.send(request,NO_TIMEOUT);

}

public RpcResponse asyncSend(RpcRequest request,TimeUnit timeUnit,long timeout) throws InterruptedException {

channel.writeAndFlush(request);

return clientRpcHandler.send(request, Optional.of(Pair.of(timeout,timeUnit)));

}

public InetSocketAddress getRemoteAddress() {

SocketAddress remoteAddress = channel.remoteAddress();

if (!(remoteAddress instanceof InetSocketAddress)) {

throw new RuntimeException(“Get remote address error, should be InetSocketAddress”);

}

return (InetSocketAddress) remoteAddress;

}

public void close() {

if (null == channel) {

throw new ClientCloseException();

}

workerGroup.shutdownGracefully();

channel.closeFuture().syncUninterruptibly();

workerGroup = null;

channel = null;

}

}

ClientRpcHandler

@ChannelHandler.Sharable

public class ClientRpcHandler extends SimpleChannelInboundHandler {

//用blocking queue主要是用阻塞的功能,省的自己加锁

private final ConcurrentHashMap> responseMap = new ConcurrentHashMap>();

//messageReceived

@Override

protected void channelRead0(ChannelHandlerContext ctx, RpcResponse rpcResponse) throws Exception {

System.out.println(“receive response:”+rpcResponse);

BlockingQueue queue = responseMap.get(rpcResponse.getTraceId());

queue.add(rpcResponse);

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

super.exceptionCaught(ctx, cause);

cause.printStackTrace();

}

public RpcResponse send(RpcRequest request,Optional> timeout) throws InterruptedException {

responseMap.putIfAbsent(request.getTraceId(), new LinkedBlockingQueue(1));

RpcResponse response = null;

try {

BlockingQueue queue = responseMap.get(request.getTraceId());

if(timeout == null || !timeout.isPresent()){

response = queue.take();

}else{

response = queue.poll(timeout.get().getKey(),timeout.get().getValue());

}

} finally {

responseMap.remove(request.getTraceId());

}

return response;

}

}

decoder

public class RpcDecoder extends ByteToMessageDecoder {

private Class> genericClass;

public RpcDecoder(Class> genericClass) {

this.genericClass = genericClass;

}

@Override

protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception {

if (byteBuf.readableBytes() < 4) {

return;

}

byteBuf.markReaderIndex();

int dataLength = byteBuf.readInt();

if (dataLength < 0) {

channelHandlerContext.close();

}

if (byteBuf.readableBytes() < dataLength) {

byteBuf.resetReaderIndex();

}

byte[] data = new byte[dataLength];

byteBuf.readBytes(data);

Object obj = SerializationUtil.deserializer(data, genericClass);

list.add(obj);

}

}

encoder

public class RpcEncoder extends MessageToByteEncoder {

private Class> genericClass;

public RpcEncoder(Class> genericClass) {

this.genericClass = genericClass;

}

@Override

protected void encode(ChannelHandlerContext channelHandlerContext, Object obj, ByteBuf byteBuf) throws Exception {

if (genericClass.isInstance(obj)) {

byte[] data = SerializationUtil.serializer(obj);

byteBuf.writeInt(data.length);

byteBuf.writeBytes(data);

}

}

}

参考

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/137829.html原文链接:https://javaforall.cn

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022年5月5,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档