前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >分布式改造剧集1

分布式改造剧集1

作者头像
SecondWorld
发布2018-04-19 18:47:15
6780
发布2018-04-19 18:47:15
举报
文章被收录于专栏:Java开发者杂谈

背景介绍

​ 我所在的项目组,使用的技术一直是接近原始社会的:jdk1.6 + SpringMVC + hessian + Mybatis,当前最火的中间件技术Redis、MQ是完全没有用到,更别说什么分布式这种高大上的东西了。开始一直以为可能接下来的日子我都会在原始社会中渡过,直到我跳槽到另一家公司。

​ 事情总是在最不抱希望的时候出现转机,最近老大指派我牵头做分布式改造。作为技术痴的我,刚接到这个消息的时候别提有多激动了。虽然平时空闲时间会去学习最新的一些技术,但总觉得没怎么学进去。一是年级大了,对于一个东西不用就忘;二是都只是敲一些demo代码,相信各位大神都知道,真正项目中遇到的技术难题永远比demo中的那些小问题复杂的多。

​ 废话不多说,先来说说这次分布式改造的预期:

  • 应用的分布式:这一点很容易理解,就是希望以前的单节点应用能够部署成多节点,一个请求可以转发到多个节点中的任意一个
  • 缓存的分布式:好在我们项目对缓存的依赖性不是特别高,项目中使用的缓存也大部分仅仅是为了提升效率。对于内存缓存(Ehcache),希望在不同节点间能够同步,对数据的实时性和一致性要求不是特别高
  • 锁的分布式:业务互斥其实一直是我们项目的一个复杂所在。因为是金融行业,一旦业务互斥没有做好,就会出现严重的资金风险。对锁的可靠性要求特别高,对于互斥的业务锁,只要一个节点能够拿到,其他节点一定不能拿到。项目以前的实现是直接通过内存中的一个ConcurrentHashMap去实现的。如果多节点部署的话,很显然每个节点都会存在一个内存锁,原来的锁将完全不起作用

​ 当然除了预期之外,考虑到部署环境的复杂性:一共几十套环境,后面可能上百。有的部署在腾讯云上,有的部署在客户自己内部系统中,一个微小的部署变动可能会被放大几十倍。所以领导除了给出预期之外,还给了以下两点要求:

  • 尽量不要升级JDK
  • 尽量不要引入新的中间件或者新的外部应用部署

​ 好了,背景暂且交代到这里。我们基本对这次分布式改造剧集有了了解,下面开始进入正片…...

第一章:纯DIY分布式

第一集: 应用分布式改造

探索之路

​ 对于当今的互联网企业码农,只要公司不处于原始社会。分析了上面的需求之后,对于这种请求多节点转发负载的,很自然地就会想到nginx。没错,我开始也想到了nginx,并且本地测试了,对于当前项目的转发只需下面简单的配置即可:

代码语言:javascript
复制
upstream apps {
    server 127.0.0.1:8380;
    server 127.0.0.1:8480;
    server 127.0.0.1:8580;
}

server {
    listen 9999;
    server_name 127.0.0.1:9999;
    #本机的hessian服务最终都发布成 */service/*.hs的形式
    location /service/ {
        proxy_pass http://apps/myapp-war/;
    }
}

​ 但是各位别忘了要求的第二点,好吧?忘了的话请再次回头看上面? 。显然,引入nginx这种最简答的方式不行…...

​ 既然不能引入这种转发的应用,那么只有DIY一个调用分发的实现了。那么,究竟如何实现呢?永远不要忘记没有什么问题是看源码解决不了的,如果有,那么请debug源码。

​ hessian与Spring的集成,客户端最终注入到Spring容器中的bean类为org.springframework.remoting.caucho.HessianProxyFactoryBean,我们跟进这个类的源码,发现该类的上下级关系为:

代码语言:javascript
复制
HessianProxyFactoryBean extends HessianClientInterceptor implements FactoryBean<Object>

最终实际客户端的调用时通过HessianClientInterceptor类的invoke方法来实现的,该类的主要代码如下:

代码语言:javascript
复制
public class HessianClientInterceptor extends UrlBasedRemoteAccessor implements MethodInterceptor {

    private HessianProxyFactory proxyFactory = new HessianProxyFactory();

    private Object hessianProxy;
    
    @Override
    public void afterPropertiesSet() {
        super.afterPropertiesSet();
        // 类初始化完成之后会调用prepare方法,对hessianProxy进行初始化
        prepare();
    }

    /**
     * Initialize the Hessian proxy for this interceptor.
     * @throws RemoteLookupFailureException if the service URL is invalid
     */
    public void prepare() throws RemoteLookupFailureException {
        try {
            this.hessianProxy = createHessianProxy(this.proxyFactory);
        }
        catch (MalformedURLException ex) {
            throw new RemoteLookupFailureException("Service URL [" + getServiceUrl() + "] is invalid", ex);
        }
    }

    /**
     * Create the Hessian proxy that is wrapped by this interceptor.
     * @param proxyFactory the proxy factory to use
     * @return the Hessian proxy
     * @throws MalformedURLException if thrown by the proxy factory
     * @see com.caucho.hessian.client.HessianProxyFactory#create
     */
    protected Object createHessianProxy(HessianProxyFactory proxyFactory) throws MalformedURLException {
        Assert.notNull(getServiceInterface(), "'serviceInterface' is required");
        // 根据配置文件中的配置创建代理类
        return proxyFactory.create(getServiceInterface(), getServiceUrl(), getBeanClassLoader());
    }
    
    // 最终hessian调用时调用的方法
    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {
        if (this.hessianProxy == null) {
            throw new IllegalStateException("HessianClientInterceptor is not properly initialized - " +
                    "invoke 'prepare' before attempting any operations");
        }

        ClassLoader originalClassLoader = overrideThreadContextClassLoader();
        try {
            // 这一句特别关键,最终是使用前面初始化过的hessianProxy的对应方法,最终的hessian地址也存在该对象中
            return invocation.getMethod().invoke(this.hessianProxy, invocation.getArguments());
        }
        catch (InvocationTargetException ex) {
            Throwable targetEx = ex.getTargetException();
            // Hessian 4.0 check: another layer of InvocationTargetException.
            if (targetEx instanceof InvocationTargetException) {
                targetEx = ((InvocationTargetException) targetEx).getTargetException();
            }
            if (targetEx instanceof HessianConnectionException) {
                throw convertHessianAccessException(targetEx);
            }
            else if (targetEx instanceof HessianException || targetEx instanceof HessianRuntimeException) {
                Throwable cause = targetEx.getCause();
                throw convertHessianAccessException(cause != null ? cause : targetEx);
            }
            else if (targetEx instanceof UndeclaredThrowableException) {
                UndeclaredThrowableException utex = (UndeclaredThrowableException) targetEx;
                throw convertHessianAccessException(utex.getUndeclaredThrowable());
            }
            else {
                throw targetEx;
            }
        }
        catch (Throwable ex) {
            throw new RemoteProxyFailureException(
                    "Failed to invoke Hessian proxy for remote service [" + getServiceUrl() + "]", ex);
        }
        finally {
            resetThreadContextClassLoader(originalClassLoader);
        }
    }

​ 仔细分析上面源码和我加的中文注释,不难发现解决问题的关键就在与在实际调用之前替换hessianProxy或者针对同一个hessianProxy替换其指向的url。即我们需要对原来注入到Spring容器中的org.springframework.remoting.caucho.HessianProxyFactoryBean类做定制,替换成我们的类,然后在调用之前动态替换hessianProxy。一种方式是对于需要路由的服务接口xml声明做替换:

代码语言:javascript
复制
<bean id="channelAccTaskServiceFacade" class="com.rampage.distribute.factory.DistributeHessainProxyFactoryBean">
        <!--hessian.rampage.server为分布式之前配置文件配置的固定服务器地址 -->
        <property name="serviceUrl" value="${hessian.rampage.server}/services/helloService.hs" />
        <property name="serviceInterface" value="com.rampage.demo.facade.cle.service.HelloService" />
    </bean>

​ 这种可能对代码的改动较大,而且如果不想实现路由的话又得替换回来。另外一种实现就像前面我写的服务定制一样,在Springbean定义加载完成初始化之前做拦截,对bean进行增强替换,这里我们采用第二种方式。应为这样更灵活,可以自定义替换规则。


实现Demo

​ 我这里给出我的一个实现demo,大致步骤分为如下几步:

  • 定义一个注解,用来添加到hessian服务接口上,表示该服务需要在客户端调用的时候进行分布式增强,分发到不同节点:
代码语言:javascript
复制
  @Target({ElementType.TYPE})
  @Retention(RetentionPolicy.RUNTIME)
  @Documented
  @Inherited
  public @interface Distribute {
  }
  • 定义一个BFP, 在bean初始化之前根据是否增强为分布式bean策略,来决定是否对hessian服务进行增强:
代码语言:javascript
复制
  // 定义一个BFP,在bean初始化之前进行增强
  @Component
  public class DistributeBeanFactoryPostProcessor implements BeanFactoryPostProcessor, Ordered {
    
    private static final Logger LOGGER = LoggerFactory.getLogger(DistributeBeanFactoryPostProcessor.class);
    
    private DistributeStrategy distributeStrategy;
    
    private boolean openDistribute;
    
    {
        // 从配置文件加载是否开启分布式以及分布式分发策略
        Properties properties = new Properties();
        try {
            properties.load(DistributeBeanFactoryPostProcessor.class.getClassLoader().getResource("app-config.properties").openStream());
        } catch (IOException e) {
            LOGGER.error("加载classPath下配置文件【app-config.properties】失败!", e);
        }
        
        openDistribute = Libs.toBoolean(properties.getProperty("open.hessian.client.distribute", "false"), false);
        if (openDistribute) {
            Class<?> clazz = null;
            String strategyClassName = properties.getProperty("hessian.client.distribute.strategy", "com.ramapge.distribute.AnnotationDistributeStrategy");
            try {
                clazz = Class.forName(strategyClassName);
                distributeStrategy = (DistributeStrategy) clazz.newInstance();
            } catch (Exception e) {
                openDistribute = false;
                LOGGER.error("初始化分布式策略类失败!", e);
            }
        }
    }

    @Override
    public int getOrder() {
        return Integer.MAX_VALUE;
    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        // 未开启分布式策略,则直接返回
        if (!openDistribute) {
            LOGGER.error("未开启分布式分发策略, 跳过分布式BeanFactory后置处理......");
            return;
        }
        
        LOGGER.info("进入分布式策略BeanFactory后置处理, 分布式策略类为【{}】......", distributeStrategy.getClass().getName());
        String[] beanDefNames = beanFactory.getBeanDefinitionNames();
        if (ArrayUtils.isEmpty(beanDefNames)) {
            return;
        }
        
        BeanDefinition beanDef = null;
        // 替换hessian客户端的实现类为分布式hessian支持类
        for (String beanName : beanDefNames) {
            beanDef = beanFactory.getBeanDefinition(beanName);
            // 如果满足分布式分发策略,则替换hessian客户端工厂类 FIXME: 测试渠道日结分布式,后续删掉该判断条件
            if (distributeStrategy.doDistribute(beanFactory, beanDef)) {
    beanDef.setBeanClassName("com.rampage.distribute.DistributeHessainProxyFactoryBean");
            }
        }
    }
  }

  // 是否转变成分布式策略
  public interface DistributeStrategy {
    boolean doDistribute(ConfigurableListableBeanFactory beanFactory, BeanDefinition beanDef);
  }

  // 注解转发策略,这里是如果有对应的Distribute注解,则将其变成分布式调用
  public class AnnotationDistributeStrategy implements DistributeStrategy {
    @Override
    public boolean doDistribute(ConfigurableListableBeanFactory beanFactory, BeanDefinition beanDef) {
        if (!"org.springframework.remoting.caucho.HessianProxyFactoryBean".equals(beanDef.getBeanClassName())) {
            return false;
        }
        
        // 只分发有@Distribute注解的bean
        MutablePropertyValues pv = beanDef.getPropertyValues();
        if (!pv.contains("serviceInterface")) {
            return false;
        }
        
        TypedStringValue interfaceName = (TypedStringValue) pv.getPropertyValue("serviceInterface").getValue();
        try {
            Class<?> hessianInterface = Thread.currentThread().getContextClassLoader().loadClass(interfaceName.getValue());
            Distribute distribute = hessianInterface.getAnnotation(Distribute.class);
            if (distribute == null) {
                return false;
            }
        } catch (ClassNotFoundException e) {
            return false;
        }   
        return true;
    }

  }
  • 定制的、hessianProxyFactoryBean实现:
代码语言:javascript
复制
  /**
   * 分布式Hessian Bean工厂
   * @author ziyuqi
   *
   */
  public class DistributeHessainProxyFactoryBean extends HessianProxyFactoryBean {
    
    /**
     * 从hessian代理类列表
     */
    private List<Object> slaveHessianProxies = new ArrayList<Object>();
    
    /**
     * 主hessian代理类
     */
    private Object masterHessianProxy;
    
    private HessianProxyFactory proxyFactory;

    @Override
    protected Object createHessianProxy(HessianProxyFactory proxyFactory) throws MalformedURLException {
        // 将配置中的proxy设置为主代理类,并且返回null
        this.masterHessianProxy = super.createHessianProxy(proxyFactory);
        this.proxyFactory = proxyFactory;
        return null;
    }
    
    

    @Override
    public void afterPropertiesSet() {
        super.afterPropertiesSet();
        
        // TODO: 实现从节点可配置,动态读取当前配置信息进行转发
        // 初始化从Hessian代理列表 
        String masterServiceUrl = getServiceUrl();
        int suffixIndex = masterServiceUrl.lastIndexOf("/services/");
          // 配置文件中配置的http://127.0.0.1:8580/myapps-war作为主节点,这里demo写死两个从节点
        String[] slavePrefixes = new String[] {"http://127.0.0.1:8480/myapps-war", "http://127.0.0.1:8580/myapps-war"};
        
        for (String slavePrefix : slavePrefixes) {
            try {
                Object slaveHessianProxy = this.proxyFactory.create(getServiceInterface(), slavePrefix + masterServiceUrl.substring(suffixIndex), getBeanClassLoader());
                slaveHessianProxies.add(slaveHessianProxy);
            } catch (MalformedURLException e) {
                throw new RemoteLookupFailureException("Service URL [" + slavePrefix + getServiceUrl() + "] is invalid", e);

            }
        }
    }


    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {
        if (this.masterHessianProxy == null && this.slaveHessianProxies.isEmpty()) {
            throw new IllegalStateException("HessianClientInterceptor is not properly initialized - " +
                    "invoke 'prepare' before attempting any operations");
        }
        ClassLoader originalClassLoader = overrideThreadContextClassLoader();
        try {
            return invocation.getMethod().invoke(routeHessianProxy(invocation), invocation.getArguments());
        }
        catch (InvocationTargetException ex) {
            Throwable targetEx = ex.getTargetException();
            // Hessian 4.0 check: another layer of InvocationTargetException.
            if (targetEx instanceof InvocationTargetException) {
                targetEx = ((InvocationTargetException) targetEx).getTargetException();
            }
            if (targetEx instanceof HessianConnectionException) {
                throw convertHessianAccessException(targetEx);
            }
            else if (targetEx instanceof HessianException || targetEx instanceof HessianRuntimeException) {
                Throwable cause = targetEx.getCause();
                throw convertHessianAccessException(cause != null ? cause : targetEx);
            }
            else if (targetEx instanceof UndeclaredThrowableException) {
                UndeclaredThrowableException utex = (UndeclaredThrowableException) targetEx;
                throw convertHessianAccessException(utex.getUndeclaredThrowable());
            }
            else {
                throw targetEx;
            }
        }
        catch (Throwable ex) {
            throw new RemoteProxyFailureException(
                    "Failed to invoke Hessian proxy for remote service [" + getServiceUrl() + "]", ex);
        }
        finally {
            resetThreadContextClassLoader(originalClassLoader);
        }
    }

    /**
     * 路由hessian调用
     * @param invocation 方法调用对象
     * @return 路由hessian代理对象
     */
    private Object routeHessianProxy(MethodInvocation invocation) {
        if (this.slaveHessianProxies.isEmpty()) {
            return this.masterHessianProxy;
        }
        
        // TODO: 修改随机选取算法
        int totalCount = this.slaveHessianProxies.size() + 1;
        int nextIndex = (new Random()).nextInt(totalCount);
        if (nextIndex == 0) {
            return this.masterHessianProxy;
        }
        
        return this.slaveHessianProxies.get(nextIndex - 1);
    }
  }
代码语言:javascript
复制
  @Distribute
  public interface HelloService {
      void sayHello(String name);
  }

展望

​ 前面的简单实现Demo,虽然简单,但其实也预留了一些可以扩展的点可供后续展望:

  • DistributeStrategy可以定制,如果只针对特定路径下的服务接口做增强,可以只替换策略,不需要对原来代码做改动
  • 如果使用Distribute注解可以考虑增加一个服务组的概念,增对不同的服务组进行不同的地址转发处理
  • 可以考虑增加一个界面对于从节点进行增删该,可以增加权重的概念,根据不同的权重定制不同的路由规则

后续

​ 这只是DIY分布式改造第一篇只应用分布式改造,后续还有缓存分布式及锁的分布式改造,出于篇幅和时间限制,今天暂时写到这里。后续再做更新。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018-04-08 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景介绍
  • 第一章:纯DIY分布式
    • 第一集: 应用分布式改造
      • 探索之路
      • 实现Demo
      • 展望
      • 后续
相关产品与服务
消息队列 TDMQ
消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档