前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Leaf——美团分布式ID生成系统

Leaf——美团分布式ID生成系统

作者头像
Yuyy
发布2023-03-08 21:16:36
4810
发布2023-03-08 21:16:36
举报

Leaf——美团点评分布式ID生成系统 https://github.com/Meituan-Dianping/Leaf

数据库号段发号

线程池设置线程编号

代码语言:javascript
复制
public static class UpdateThreadFactory implements ThreadFactory {

        private static int threadInitNumber = 0;

        private static synchronized int nextThreadNum() {
            return threadInitNumber++;
        }

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "Thread-Segment-Update-" + nextThreadNum());
        }
    }

加载号段 DoubleCheck

代码语言:javascript
复制
            if (!buffer.isInitOk()) {
                synchronized (buffer) {
                    if (!buffer.isInitOk()) {
                        try {
                            updateSegmentFromDb(key, buffer.getCurrent());
                            logger.info("Init buffer. Update leafkey {} {} from db", key, buffer.getCurrent());
                            buffer.setInitOk(true);
                        } catch (Exception e) {
                            logger.warn("Init buffer {} exception", buffer.getCurrent(), e);
                        }
                    }
                }
            }

动态调整步长

代码语言:javascript
复制
            long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp();
            int nextStep = buffer.getStep();

            // 15分钟内更新号段,就放大步长

            if (duration < SEGMENT_DURATION) {
                if (nextStep * 2 > MAX_STEP) {
                    // do nothing
                } else {
                    nextStep = nextStep * 2;
                }
            } else if (duration < SEGMENT_DURATION * 2) {
                // do nothing with nextStep
            } else {

                // 30分钟以上才更新号段,就缩小步长,避免停机造成大量号段浪费
                // 最小步长是数据库里配置的

                nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;
            }

预加载号段

代码语言:javascript
复制
            buffer.rLock().lock();
            try {
                final Segment segment = buffer.getCurrent();
                if (!buffer.isNextReady() && (segment.getIdle() < 0.9 * segment.getStep()) && buffer.getThreadRunning().compareAndSet(false, true)) {
                    service.execute(new Runnable() {
  • 防止当前号段用尽时阻塞等待加载新号段
  • 对共享变量的判断,需要加读写锁,保证线程安全
  • 预加载条件
    • 下一个号段没有就绪
    • 剩余号小于90%
    • CAS成功后去异步加载

修改多个共享变量加锁

代码语言:javascript
复制
buffer.wLock().lock();
                                    buffer.setNextReady(true);
                                    buffer.getThreadRunning().set(false);
                                    buffer.wLock().unlock();

获取锁后 DoubleCheck

代码语言:javascript
复制
            buffer.wLock().lock();
            try {
                final Segment segment = buffer.getCurrent();
                long value = segment.getValue().getAndIncrement();
                if (value < segment.getMax()) {
                    return new Result(value, Status.SUCCESS);
                }

优化轮询等待

代码语言:javascript
复制
    private void waitAndSleep(SegmentBuffer buffer) {
        int roll = 0;
        while (buffer.getThreadRunning().get()) {
            roll += 1;
            if (roll > 10000) {
                try {
                    TimeUnit.MILLISECONDS.sleep(10);
                    break;
                } catch (InterruptedException e) {
                    logger.warn("Thread {} Interrupted", Thread.currentThread().getName());
                    break;
                }
            }
        }
    }
  • 先循环1w次,超过1w次时,每次sleep 10 ms
  • 循环不耗时,主要看循环内部的代码。例如10亿次循环累加才几百毫秒
  • 等待分为正常情况和异常情况
    • 正常情况很快,所以轮询请求
    • 异常情况很慢,例如数据库异常、网络异常,所以需要sleep,让出cpu

类snowflake算法

位运算

代码语言:javascript
复制
    private final long workerIdBits = 10L;
    private final long maxWorkerId = ~(-1L << workerIdBits);//最大能够分配的workerid =1023

使用位运算提高效率

使用负数 + 非运算~代替减 1 操作

  • 还可以用与运算(&)判断是否是偶数
  • n % 2 == 0 等价于 (n & 1) == 0 ,注意由于==的优先级高于&,所以需要给与运算加上小括号。
代码语言:javascript
复制
public int nextPos() {
    return (currentPos + 1) % 2;
}

public int nextPos() {
    return (currentPos + 1) & 1;
}

id生成提供默认实现

代码语言:javascript
复制
public class ZeroIDGen implements IDGen {
    @Override
    public Result get(String key) {
        return new Result(0, Status.SUCCESS);
    }

    @Override
    public boolean init() {
        return true;
    }
}

    public SnowflakeService() throws InitException {
        Properties properties = PropertyFactory.getProperties();
        boolean flag = Boolean.parseBoolean(properties.getProperty(Constants.LEAF_SNOWFLAKE_ENABLE, "true"));
        if (flag) {
            // init
        } else {
            idGen = new ZeroIDGen();
            logger.info("Zero ID Gen Service Init Successfully");
        }
    }
  • 当用户未正确配置Leaf时,提供一个不影响Leaf运行,但不能使用的id生成器

非主线程作为守护线程运行

代码语言:javascript
复制
    private void ScheduledUploadData(final CuratorFramework curator, final String zk_AddressNode) {
        Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r, "schedule-upload-time");
                thread.setDaemon(true);
                return thread;
            }
        }).scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                updateNewData(curator, zk_AddressNode);
            }
        }, 1L, 3L, TimeUnit.SECONDS);//每3s上报数据

    }
  • 当所有非守护线程停止运行时,jvm就会退出
  • 如果不以守护线程运行,即使main方法退出,非守护线程依然会继续运行,jvm不会退出

缓存workId

代码语言:javascript
复制
    private static final String PROP_PATH = System.getProperty("java.io.tmpdir") + File.separator + PropertyFactory.getProperties().getProperty("leaf.name") + "/leafconf/{port}/workerID.properties";

    public boolean init() {
        try {
            // 从zk获取workId
        } catch (Exception e) {
            LOGGER.error("Start node ERROR {}", e);
            try {
                Properties properties = new Properties();
                properties.load(new FileInputStream(new File(PROP_PATH.replace("{port}", port + ""))));
                workerID = Integer.valueOf(properties.getProperty("workerID"));
                LOGGER.warn("START FAILED ,use local node file properties workerID-{}", workerID);
            } catch (Exception e1) {
                LOGGER.error("Read file error ", e1);
                return false;
            }
        }
        return true;
    }
  • 在节点文件系统上缓存一个workid值,zk失效,机器重启时保证能够正常启动
  • 容器化时得挂载出来
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-2-16 1,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 数据库号段发号
    • 线程池设置线程编号
      • 加载号段 DoubleCheck
        • 动态调整步长
          • 预加载号段
            • 修改多个共享变量加锁
              • 获取锁后 DoubleCheck
                • 优化轮询等待
                • 类snowflake算法
                  • 位运算
                    • id生成提供默认实现
                      • 非主线程作为守护线程运行
                        • 缓存workId
                        相关产品与服务
                        数据库
                        云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
                        领券
                        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档