前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >动态指定IP调用Dubbo服务

动态指定IP调用Dubbo服务

作者头像
加多
发布2019-05-15 15:50:06
4.7K1
发布2019-05-15 15:50:06
举报
文章被收录于专栏:Java编程技术

一、前言

前面我们探讨了如何获取某一个Dubbo的服务的提供者列表,本节我们探讨如何使用Dubbo的扩展,实现指定IP调用。

那么什么时候需要指定ip来调用那,我们考虑一个并行任务处理系统,系统接受一个大任务后会切割为若干个子任务,然后把子任务分派到不同的机器上去执行,这时候就需要把子任务路由到指定的ip上去运行,如下图:

二、实现

在Dubbo中集群容错策略Cluster是SPI扩展接口,DUbbo框架提供了丰富的集群容错策略实现,本节我们就基于扩展接口实现指定IP调用功能。

首先我们实现扩展接口Cluster:

代码语言:javascript
复制
  public class MyCluster implements Cluster{

    @Override
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new MyClusterInvoker(directory);
    }
}

然后我们看自己实现的MyClusterInvoker

代码语言:javascript
复制
public class MyClusterInvoker<T> extends MyAbstractClusterInvoker<T> {

    public MyClusterInvoker(Directory<T> directory) {
        super(directory);
    }

        @Override
    protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance)
            throws RpcException {

        //1.查看是否设置了指定ip
        String ip = (String) RpcContext.getContext().get("ip");
        if (StringUtils.isBlank(ip)) {
            throw new RuntimeException("ip is blank ");
        }
        //2.检查是否有可用invoker
        checkInvokers(invokers,invocation);
        
        //3.根据指定ip获取对应invoker
        Invoker<T> invoked = invokers.stream().filter(invoker -> invoker.getUrl().getHost().equals(ip))
                .findFirst().orElse(null);
        //4.检查是否有可用invoker
        if(null == invoked) {
            throw new RpcException(RpcException.NO_INVOKER_AVAILABLE_AFTER_FILTER,
                    "Failed to invoke the method " + invocation.getMethodName() + " in the service "
                            + getInterface().getName() + ". No provider available for the service "
                            + directory.getUrl().getServiceKey() + " from ip " + ip + " on the consumer "
                            + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion()
                            + ". Please check if the providers have been started and registered.");
       }
        //5.发起远程调用,失败则抛出异常
        try {
            
            return invoked.invoke(invocation);
        } catch (Throwable e) {
            if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
                throw (RpcException) e;
            }
            throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0,
                    "Fail invoke providers " + (invoked != null?invoked.getUrl():"")+ " " + loadbalance.getClass().getSimpleName()
                            + " select from all providers " + invokers + " for service " + getInterface().getName()
                            + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost()
                            + " use dubbo version " + Version.getVersion()
                            + ", but no luck to perform the invocation. Last error is: " + e.getMessage(),
                    e.getCause() != null ? e.getCause() : e);
        }
    }

...
}
  • 如上代码1,我们从RpcContext.getContext()获取了属性值ip,如果指定了改值说明指定了ip,
  • 代码2则检查是否有可用的服务提供者,如果没有则抛出异常。
  • 代码3变量invokers列表查找指定IP对应的Invoker
  • 代码4 检查是否有对应IP对应的Invoker,没有则抛出异常。
  • 代码5 具体使用选择的invoker发起远程调用。

注意我们还修改了框架的AbstractClusterInvoker为MyAbstractClusterInvoker:

代码语言:javascript
复制
public Result invoke(final Invocation invocation) throws RpcException {
    checkWhetherDestroyed();

    // binding attachments into invocation.
    Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        ((RpcInvocation) invocation).addAttachments(contextAttachments);
    }
    List<Invoker<T>> invokers = list(invocation);
    
    LoadBalance loadbalance = null;//initLoadBalance(invokers, invocation);

    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    return doInvoke(invocation, invokers, loadbalance);

}

这里我们把 LoadBalance loadbalance = initLoadBalance(invokers, invocation); 修改为了 LoadBalance loadbalance = null;因为我们不需要负载均衡了。

扩展实现写好后,要把扩展实现配置到下面文件

然后在消费端调用时候进行下面设置就可以指定ip调用了。

代码语言:javascript
复制
//设置集群容错策略为我们自己的
 referenceConfig.setCluster("myCluster");
//指定ip,企图让ip为30.10.67.231的服务提供者来处理服务
RpcContext.getContext().set("ip", "30.10.67.231");

三、总结

Dubbo是一个高度可扩充的框架,基于SPI的扩展接口,我们可以根据需要定制我们自己的实现,本文我们则基于集群容错策略实现了基于ip调用的扩展。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019.05.12 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、前言
  • 二、实现
  • 三、总结
相关产品与服务
负载均衡
负载均衡(Cloud Load Balancer,CLB)提供安全快捷的四七层流量分发服务,访问流量经由 CLB 可以自动分配到多台后端服务器上,扩展系统的服务能力并消除单点故障。轻松应对大流量访问场景。 网关负载均衡(Gateway Load Balancer,GWLB)是运行在网络层的负载均衡。通过 GWLB 可以帮助客户部署、扩展和管理第三方虚拟设备,操作简单,安全性强。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档