本文摘自于《Spring Cloud微服务 入门 实战与进阶》一书。
配置中心最重要的一个特性就是实时推送了,正因为有这个特性,我们可以依赖配置中心做很多事情。在我自己开发的Smconf这个配置中心,Smconf是依赖于Zookeeper的Watch机制来实现实时推送。
上图简要描述了配置发布的大致过程:
ReleaseMessage消息是通过Mysql实现了一个简单的消息队列。之所有没有采用消息中间件,是为了让Apollo在部署的时候尽量简单,尽可能减少外部依赖。
上图简要描述了发送ReleaseMessage的大致过程:
通知是采用基于Http长连接实现,主要分为下面几个步骤:
Apollo推送这块代码比较多,就不在本书中详细分析了,我把推送这块的代码稍微简化了下,给大家进行讲解,这样理解起来会更容易。当然我这边会比较简单,很多细节就不做考虑了,只是为了能够让大家明白Apollo推送的核心原理。
发送ReleaseMessage的逻辑我们就写一个简单的接口,用队列存储,测试的时候就调用这个接口模拟配置有更新,发送ReleaseMessage消息。
@RestControllerpublic class NotificationControllerV2 implements ReleaseMessageListener {
// 模拟配置更新,往里插入数据表示有更新 public static Queue<String> queue = new LinkedBlockingDeque<>();
@GetMapping("/addMsg") public String addMsg() { queue.add("xxx"); return "success"; }
}
消息发送之后,前面我们有讲过Config Service会启动一个线程定时扫描ReleaseMessage表,去查看是否有新的消息记录,然后取通知客户端,这边我们也启动一个线程去扫描:
@Componentpublic class ReleaseMessageScanner implements InitializingBean {
@Autowired private NotificationControllerV2 configController;
@Override public void afterPropertiesSet() throws Exception { // 定时任务从数据库扫描有没有新的配置发布 new Thread(() -> { for (;;) { String result = NotificationControllerV2.queue.poll(); if (result != null) { ReleaseMessage message = new ReleaseMessage(); message.setMessage(result); configController.handleMessage(message); } } }).start();; }
}
循环去读取NotificationControllerV2中的队列,如果有消息的话就构造一个ReleaseMessage的对象,然后调用NotificationControllerV2中的handleMessage()方法进行消息的处理。
ReleaseMessage就一个字段,模拟消息内容:
public class ReleaseMessage { private String message;
public void setMessage(String message) { this.message = message; } public String getMessage() { return message; }}
接下来,我们看handleMessage做了什么样的工作
NotificationControllerV2实现了ReleaseMessageListener接口,ReleaseMessageListener中定义了handleMessage()方法。
public interface ReleaseMessageListener { void handleMessage(ReleaseMessage message);}
handleMessage就是当配置发生变化的时候,通知的消息监听器,消息监听器得到配置发布的信息后,则会通知对应的客户端:
@RestControllerpublic class NotificationControllerV2 implements ReleaseMessageListener {
private final Multimap<String, DeferredResultWrapper> deferredResults = Multimaps .synchronizedSetMultimap(HashMultimap.create());
@Override public void handleMessage(ReleaseMessage message) { System.err.println("handleMessage:"+ message); List<DeferredResultWrapper> results = Lists.newArrayList(deferredResults.get("xxxx")); for (DeferredResultWrapper deferredResultWrapper : results) { List<ApolloConfigNotification> list = new ArrayList<>(); list.add(new ApolloConfigNotification("application", 1)); deferredResultWrapper.setResult(list); } }
}
Apollo的实时推送是基于Spring DeferredResult实现的,在handleMessage()方法中可以看到是通过deferredResults获取DeferredResult,deferredResults就是第一行的Multimap,Key其实就是消息内容,Value就是DeferredResult的业务包装类DeferredResultWrapper,我们来看下DeferredResultWrapper的代码:
public class DeferredResultWrapper { private static final long TIMEOUT = 60 * 1000;// 60 seconds
private static final ResponseEntity<List<ApolloConfigNotification>> NOT_MODIFIED_RESPONSE_LIST = new ResponseEntity<>(HttpStatus.NOT_MODIFIED);
private DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> result;
public DeferredResultWrapper() { result = new DeferredResult<>(TIMEOUT, NOT_MODIFIED_RESPONSE_LIST); }
public void onTimeout(Runnable timeoutCallback) { result.onTimeout(timeoutCallback); }
public void onCompletion(Runnable completionCallback) { result.onCompletion(completionCallback); }
public void setResult(ApolloConfigNotification notification) { setResult(Lists.newArrayList(notification)); }
public void setResult(List<ApolloConfigNotification> notifications) { result.setResult(new ResponseEntity<>(notifications, HttpStatus.OK)); }
public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> getResult() { return result; }}
通过setResult()方法设置返回结果给客户端,以上就是当配置发生变化,然后通过消息监听器通知客户端的原理,那么客户端是在什么时候接入的呢?
@RestControllerpublic class NotificationControllerV2 implements ReleaseMessageListener {
// 模拟配置更新,往里插入数据表示有更新 public static Queue<String> queue = new LinkedBlockingDeque<>();
private final Multimap<String, DeferredResultWrapper> deferredResults = Multimaps .synchronizedSetMultimap(HashMultimap.create());
@GetMapping("/getConfig") public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> getConfig() { DeferredResultWrapper deferredResultWrapper = new DeferredResultWrapper(); List<ApolloConfigNotification> newNotifications = getApolloConfigNotifications(); if (!CollectionUtils.isEmpty(newNotifications)) { deferredResultWrapper.setResult(newNotifications); } else { deferredResultWrapper.onTimeout(() -> { System.err.println("onTimeout"); });
deferredResultWrapper.onCompletion(() -> { System.err.println("onCompletion"); }); deferredResults.put("xxxx", deferredResultWrapper); } return deferredResultWrapper.getResult(); }
private List<ApolloConfigNotification> getApolloConfigNotifications() { List<ApolloConfigNotification> list = new ArrayList<>(); String result = queue.poll(); if (result != null) { list.add(new ApolloConfigNotification("application", 1)); } return list; }}
NotificationControllerV2中提供了一个/getConfig的接口,客户端在启动的时候会调用这个接口,这个时候会执行getApolloConfigNotifications()方法去获取有没有配置的变更信息,如果有的话证明配置修改过,直接就通过deferredResultWrapper.setResult(newNotifications);返回结果给客户端了,客户端收到结果后重新拉取配置的信息进行覆盖本地的配置。
如果getApolloConfigNotifications()方法没有返回配置修改的信息,证明配置没有发生修改,就将DeferredResultWrapper对象添加到deferredResults中,等待后续配置发生变化时消息监听器进行通知。
同时这个请求就会挂起,不会立即返回,挂起是通过DeferredResultWrapper中的下面的代码实现的:
private static final long TIMEOUT = 60 * 1000;// 60 seconds
private static final ResponseEntity<List<ApolloConfigNotification>> NOT_MODIFIED_RESPONSE_LIST = new ResponseEntity<>(HttpStatus.NOT_MODIFIED);
private DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> result;
public DeferredResultWrapper() { result = new DeferredResult<>(TIMEOUT, NOT_MODIFIED_RESPONSE_LIST);}
在创建DeferredResult对象的时候指定了超时的时间和超时后返回的响应码,如果60秒内没有消息监听器进行通知,那么这个请求就会超时,超时后客户端就收到的响应码就是304。
整个Config Service的流程就走完了,接下来我们看客户端是怎么实现的,我们简单的写个测试类模拟客户端注册:
public class ClientTest { public static void main(String[] args) { reg(); }
private static void reg() { System.err.println("注册"); String result = request("http://localhost:8081/getConfig"); if (result != null) { // 配置有更新,重新拉取配置 // ...... } // 重新注册 reg(); }
private static String request(String url) { HttpURLConnection connection = null; BufferedReader reader = null; try { URL getUrl = new URL(url); connection = (HttpURLConnection) getUrl.openConnection(); connection.setReadTimeout(90000); connection.setConnectTimeout(3000); connection.setRequestMethod("GET"); connection.setRequestProperty("Accept-Charset", "utf-8"); connection.setRequestProperty("Content-Type", "application/json"); connection.setRequestProperty("Charset", "UTF-8"); System.out.println(connection.getResponseCode()); if (200 == connection.getResponseCode()) { reader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8")); StringBuilder result = new StringBuilder(); String line = null; while ((line = reader.readLine()) != null) { result.append(line); } System.out.println("结果 " + result); return result.toString(); } } catch (IOException e) { e.printStackTrace(); } finally { if (connection != null) { connection.disconnect(); } } return null; }}
首先启动/getConfig接口所在的服务,然后启动客户端,客户端就会发起注册请求,如果有修改直接获取到结果,进行配置的更新操作。如果无修改,请求会挂起,这边客户端设置的读取超时时间是90秒,大于服务端的60秒超时时间。
每次收到结果后,无论是有修改还是没修改,都必须重新进行注册,通过这样的方式就可以达到配置实时推送的效果。
我们可以调用之前写的/addMsg接口来模拟配置发生变化,调用之后客户端就能马上得到返回结果。
本文摘自于《Spring Cloud微服务 入门 实战与进阶》一书。