前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊lettuce的指标监控

聊聊lettuce的指标监控

作者头像
code4it
发布2018-09-17 17:27:27
2.2K0
发布2018-09-17 17:27:27
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下lettuce的指标监控

DefaultCommandLatencyEventPublisher

lettuce-core-5.0.4.RELEASE-sources.jar!/io/lettuce/core/event/metrics/DefaultCommandLatencyEventPublisher.java

代码语言:javascript
复制
public class DefaultCommandLatencyEventPublisher implements MetricEventPublisher {

    private final EventExecutorGroup eventExecutorGroup;
    private final EventPublisherOptions options;
    private final EventBus eventBus;
    private final CommandLatencyCollector commandLatencyCollector;

    private final Runnable EMITTER = this::emitMetricsEvent;

    private volatile ScheduledFuture<?> scheduledFuture;

    public DefaultCommandLatencyEventPublisher(EventExecutorGroup eventExecutorGroup, EventPublisherOptions options,
            EventBus eventBus, CommandLatencyCollector commandLatencyCollector) {

        this.eventExecutorGroup = eventExecutorGroup;
        this.options = options;
        this.eventBus = eventBus;
        this.commandLatencyCollector = commandLatencyCollector;

        if (!options.eventEmitInterval().isZero()) {
            scheduledFuture = this.eventExecutorGroup.scheduleAtFixedRate(EMITTER, options.eventEmitInterval().toMillis(),
                    options.eventEmitInterval().toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    @Override
    public boolean isEnabled() {
        return !options.eventEmitInterval().isZero() && scheduledFuture != null;
    }

    @Override
    public void shutdown() {

        if (scheduledFuture != null) {
            scheduledFuture.cancel(true);
            scheduledFuture = null;
        }
    }

    @Override
    public void emitMetricsEvent() {

        if (!isEnabled() || !commandLatencyCollector.isEnabled()) {
            return;
        }

        eventBus.publish(new CommandLatencyEvent(commandLatencyCollector.retrieveMetrics()));
    }

}
  • 这里判断如果EventPublisherOptions的eventEmitInterval不为0的话,则开启调度定时调用emitMetricsEvent发布命令延时事件

LettuceConnectionConfiguration

spring-boot-autoconfigure-2.0.4.RELEASE-sources.jar!/org/springframework/boot/autoconfigure/data/redis/LettuceConnectionConfiguration.java

代码语言:javascript
复制
@Configuration
@ConditionalOnClass(RedisClient.class)
class LettuceConnectionConfiguration extends RedisConnectionConfiguration {

    private final RedisProperties properties;

    private final List<LettuceClientConfigurationBuilderCustomizer> builderCustomizers;

    LettuceConnectionConfiguration(RedisProperties properties,
            ObjectProvider<RedisSentinelConfiguration> sentinelConfigurationProvider,
            ObjectProvider<RedisClusterConfiguration> clusterConfigurationProvider,
            ObjectProvider<List<LettuceClientConfigurationBuilderCustomizer>> builderCustomizers) {
        super(properties, sentinelConfigurationProvider, clusterConfigurationProvider);
        this.properties = properties;
        this.builderCustomizers = builderCustomizers
                .getIfAvailable(Collections::emptyList);
    }

    @Bean(destroyMethod = "shutdown")
    @ConditionalOnMissingBean(ClientResources.class)
    public DefaultClientResources lettuceClientResources() {
        return DefaultClientResources.create();
    }

    //......
}
  • LettuceConnectionConfiguration使用默认配置创建了DefaultClientResources

DefaultEventPublisherOptions

lettuce-core-5.0.4.RELEASE-sources.jar!/io/lettuce/core/event/DefaultEventPublisherOptions.java

代码语言:javascript
复制
public class DefaultEventPublisherOptions implements EventPublisherOptions {

    public static final long DEFAULT_EMIT_INTERVAL = 10;
    public static final TimeUnit DEFAULT_EMIT_INTERVAL_UNIT = TimeUnit.MINUTES;
    public static final Duration DEFAULT_EMIT_INTERVAL_DURATION = Duration.ofMinutes(DEFAULT_EMIT_INTERVAL);

    //......
}
  • 这里DEFAULT_EMIT_INTERVAL_DURATION默认为10分钟

CommandLatencyCollector

lettuce-core-5.0.4.RELEASE-sources.jar!/io/lettuce/core/metrics/CommandLatencyCollector.java

代码语言:javascript
复制
public interface CommandLatencyCollector extends MetricCollector<Map<CommandLatencyId, CommandMetrics>> {

    /**
     * Record the command latency per {@code connectionPoint} and {@code commandType}.
     *
     * @param local the local address
     * @param remote the remote address
     * @param commandType the command type
     * @param firstResponseLatency latency value in {@link TimeUnit#NANOSECONDS} from send to the first response
     * @param completionLatency latency value in {@link TimeUnit#NANOSECONDS} from send to the command completion
     */
    void recordCommandLatency(SocketAddress local, SocketAddress remote, ProtocolKeyword commandType,
            long firstResponseLatency, long completionLatency);

}
  • DefaultCommandLatencyEventPublisher主要是依靠CommandLatencyCollector来获取指标的,而其指标的采集则依靠recordCommandLatency方法来记录

CommandHandler

lettuce-core-5.0.4.RELEASE-sources.jar!/io/lettuce/core/protocol/CommandHandler.java

代码语言:javascript
复制
/**
 * A netty {@link ChannelHandler} responsible for writing redis commands and reading responses from the server.
 *
 * @author Will Glozer
 * @author Mark Paluch
 * @author Jongyeol Choi
 * @author Grzegorz Szpak
 */
public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCommands {
    //......
   /**
     * @see io.netty.channel.ChannelInboundHandlerAdapter#channelRead(io.netty.channel.ChannelHandlerContext, java.lang.Object)
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        ByteBuf input = (ByteBuf) msg;

        if (!input.isReadable() || input.refCnt() == 0) {
            logger.warn("{} Input not readable {}, {}", logPrefix(), input.isReadable(), input.refCnt());
            return;
        }

        if (debugEnabled) {
            logger.debug("{} Received: {} bytes, {} commands in the stack", logPrefix(), input.readableBytes(), stack.size());
        }

        try {
            if (buffer.refCnt() < 1) {
                logger.warn("{} Ignoring received data for closed or abandoned connection", logPrefix());
                return;
            }

            if (debugEnabled && ctx.channel() != channel) {
                logger.debug("{} Ignoring data for a non-registered channel {}", logPrefix(), ctx.channel());
                return;
            }

            if (traceEnabled) {
                logger.trace("{} Buffer: {}", logPrefix(), input.toString(Charset.defaultCharset()).trim());
            }

            buffer.writeBytes(input);

            decode(ctx, buffer);
        } finally {
            input.release();
        }
    }

    private boolean decode(ChannelHandlerContext ctx, ByteBuf buffer, RedisCommand<?, ?, ?> command) {

        if (latencyMetricsEnabled && command instanceof WithLatency) {

            WithLatency withLatency = (WithLatency) command;
            if (withLatency.getFirstResponse() == -1) {
                withLatency.firstResponse(nanoTime());
            }

            if (!decode0(ctx, buffer, command)) {
                return false;
            }

            recordLatency(withLatency, command.getType());

            return true;
        }

        return decode0(ctx, buffer, command);
    }

    private void recordLatency(WithLatency withLatency, ProtocolKeyword commandType) {

        if (withLatency != null && clientResources.commandLatencyCollector().isEnabled() && channel != null && remote() != null) {

            long firstResponseLatency = withLatency.getFirstResponse() - withLatency.getSent();
            long completionLatency = nanoTime() - withLatency.getSent();

            clientResources.commandLatencyCollector().recordCommandLatency(local(), remote(), commandType,
                    firstResponseLatency, completionLatency);
        }
    }
}
  • 这里channelRead会调用decode方法,而decode方法最后调用clientResources.commandLatencyCollector().recordCommandLatency记录命令的延时

消费事件实例

代码语言:javascript
复制
public class LettuceEventConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(LettuceEventConsumer.class);

    EventBus eventBus;

    public LettuceEventConsumer(EventBus eventBus) {
        this.eventBus = eventBus;
    }

    @PostConstruct
    public void init(){
        eventBus.get().subscribe(e -> {
            LOGGER.info("event:{}",e);
        });
    }
}
  • 这里我们通过eventBus获取到Flux,然后进行消费,输出实例如下:
代码语言:javascript
复制
2018-09-11 16:32:57.361  INFO 6656 --- [xecutorLoop-1-3] com.example.config.LettuceEventConsumer  : event:{[local:any -> /192.168.99.100:6379, commandType=GET]=[count=1, timeUnit=MICROSECONDS, firstResponse=[min=884, max=888, percentiles={50.0=888, 90.0=888, 95.0=888, 99.0=888, 99.9=888}], completion=[min=950, max=954, percentiles={50.0=954, 90.0=954, 95.0=954, 99.0=954, 99.9=954}]], [local:any -> /192.168.99.100:6379, commandType=INFO]=[count=1, timeUnit=MICROSECONDS, firstResponse=[min=1449, max=1458, percentiles={50.0=1458, 90.0=1458, 95.0=1458, 99.0=1458, 99.9=1458}], completion=[min=2457, max=2473, percentiles={50.0=2473, 90.0=2473, 95.0=2473, 99.0=2473, 99.9=2473}]], [local:any -> /192.168.99.100:6379, commandType=PUBLISH]=[count=40, timeUnit=MICROSECONDS, firstResponse=[min=708, max=17956, percentiles={50.0=1343, 90.0=2719, 95.0=3948, 99.0=17956, 99.9=17956}], completion=[min=733, max=17956, percentiles={50.0=1376, 90.0=2752, 95.0=3981, 99.0=17956, 99.9=17956}]], [local:any -> /192.168.99.100:6379, commandType=SET]=[count=1, timeUnit=MICROSECONDS, firstResponse=[min=909, max=913, percentiles={50.0=913, 90.0=913, 95.0=913, 99.0=913, 99.9=913}], completion=[min=995, max=999, percentiles={50.0=999, 90.0=999, 95.0=999, 99.0=999, 99.9=999}]], [local:any -> /192.168.99.100:6379, commandType=SUBSCRIBE]=[count=1, timeUnit=MICROSECONDS, firstResponse=[min=19267, max=19398, percentiles={50.0=19398, 90.0=19398, 95.0=19398, 99.0=19398, 99.9=19398}], completion=[min=41418, max=41680, percentiles={50.0=41680, 90.0=41680, 95.0=41680, 99.0=41680, 99.9=41680}]]}

小结

lettuce通过内置eventBus,然后对其命令的执行发布相应的延时事件,client端可以根据需求消费eventBus的数据来获取lettuce的相关指标。可以说在指标监控场景,采用事件驱动的方式进行实现,显得更为灵活,把Event-Driven Architecture的思想发挥的淋漓尽致。

doc

  • clientresources.advanced-settings
  • Implement an EventBus system to publish events and metrics #124
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-09-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • DefaultCommandLatencyEventPublisher
    • LettuceConnectionConfiguration
      • DefaultEventPublisherOptions
      • CommandLatencyCollector
      • CommandHandler
      • 消费事件实例
      • 小结
      • doc
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档