鼻子亲了脸

分布式部署中全局ID的生成方案
在公司的业务开发当中,由于是分布式系统,并发量还比较高,如果使用MySQL的主键自增和一些普通方案解决不了DB的问...
扫描右侧二维码阅读全文
23
2018/07

分布式部署中全局ID的生成方案

在公司的业务开发当中,由于是分布式系统,并发量还比较高,如果使用MySQL的主键自增和一些普通方案解决不了DB的问题,此时在全局唯一ID的事情上发愁,网上搜罗一堆,最终借鉴了美团的Leaf-snowflake方案来解决了问题。

1. 需求

美团开发团队的文章中当时列出的ID需求大致如下:

  1. 全局唯一性:不能出现重复的ID号,既然是唯一标识,这是最基本的要求。
  2. 趋势递增:在MySQL InnoDB引擎中使用的是聚集索引,由于多数RDBMS使用B-tree的数据结构来存储索引数据,在主键的选择上面我们应该尽量使用有序的主键保证写入性能.
  3. 单调递增:保证下一个ID一定大于上一个ID,例如事务版本号、IM增量消息、排序等特殊需求。
  4. 信息安全:如果ID是连续的,恶意用户的扒取工作就非常容易做了,直接按照顺序下载指定URL即可;如果是订单号就更危险了,竞对可以直接知道我们一天的单量。所以在一些应用场景下,会需要ID无规则、不规则。

2. 方案

上诉方案基本满足了业务需求,便对于Leaf进行了部分改造。改造过后的方案,去掉了时钟问题的解决办法,使用了在取Sequence时的双buffer方案。

全局ID = 业务编号(2位) + 毫秒时间戳(取后11位) + 机器ID(2位) + Sequence(4位)

业务编号保证了每个业务模块的唯一性,机器ID保证了分布式部署时数据的唯一性,毫秒时间戳和Sequence保证了每台机器的在一秒能够处理近万个ID的生成。足以承担公司的业务量。方案中过于依赖系统时钟,当时间回拨时也会造成问题。在Sequence中采用了leaf的双Buffer优化,每次获取Sequence时获取一定量的Sequence,当使用到临近系统预定的阈值时,再进行取Sequence,以解决数据IO的问题。

3. Sequence

每次进行取Sequence时,防止多并发情况,进行加同步锁,当多个任务进来时进行排队,一个取完另一个再进行取Sequence,防止Sequence取值错误。

具体代码:

import com.papakeji.dproject.dao.dao.SequenceDao;
import com.papakeji.dproject.dao.impl.SequenceDaoImpl;
import com.papakeji.dproject.msg.ErrorMsg;
import com.papakeji.dproject.util.CheckException;
import com.papakeji.dproject.util.FormatUtil;
import lombok.Data;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Service;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

/**
 * @author by mrdong916
 */

@Slf4j
@Scope("prototype")
@Service
@Configuration
public class IDService extends BaseService {
    /**
     * 获取配置文件中机器ID
     * 不允许为空
     */
    @Value("${machineId}")
    private String machineId;
    /**
     * 获取配置文件中Sequence的临界值
     * 最小值为10%
     * 最大值为100%
     */
    @Min(10)
    @Max(100)
    @Value("${threshold}")
    private int threshold;
    /**
     * 获取配置文件中号段区间 最小取值为100
     */
    @Min(100)
    @Value("${numberInterval}")
    public int numberInterval;
    /**
     * 当前值
     */
    private static int current;
    /**
     * 最大值
     */
    public static int maxValue;
    /**
     * 下一次sequence起始值
     */
    public static int nextStartValue;

    @Setter
    private String ruleId;

    @Autowired
    public SequenceDao sequenceDao;

    synchronized int getSequence(String sign) throws CheckException {
        log.info("线程{}:开始准备取Sequence的值,当前Sequence的值:{}", Thread.currentThread().getId(),current);
        //取sequence
        int sequence = current;
        //取完Sequence进行判断是否要切入新的Sequence号段
        if (current==maxValue){
            current = nextStartValue;
            maxValue = nextStartValue + numberInterval;
        }else{
            //sequence步增
            current++;
        }
        //判断sequence是否大于最大值 大于最大值抛异常
        if (sequence>maxValue){
            log.error("sequence value than maxValue,please check sequence");
            throw new CheckException(ErrorMsg.SYSTEM_ERROR_UNKNOWN.getErrorCode());
        }
        //判断当前剩余sequence是否小于或等于阈值 是则取sequence
        if (maxValue-sequence <= numberInterval*threshold/100){
            //获取新的一段Sequence
            nextStartValue = sequenceDao.getIntervalSequence(numberInterval,sign);
        }
        log.info("线程{}:完成更新Sequence的值,当前Sequence的值:{}", Thread.currentThread().getId(),current);
        return sequence;
    }

    /**
     * ID生成规则  2位规则编号 + 11位的毫秒 + 部署机器号2位 + 4位的Sequence
     * @return ID
     */
    public String getRuleID() throws CheckException {
        //当第一次进行取ID时进行初始化
        if (current==0 && maxValue==0){
            startSequence(ruleId);
        }
        //13位的毫秒时间戳
        String systemTime = String.valueOf(System.currentTimeMillis());
        return ruleId + systemTime.substring(2, systemTime.length()) + machineId + FormatUtil.NumberToSize(getSequence(ruleId), 4);
    }

    /**
     * 启动注入参数
     */
    public void startSequence(String ruleId){
        //获取Sequence最新的sequence
        current = sequenceDao.getIntervalSequence(numberInterval,ruleId);
        //设置最大值
        maxValue = current + numberInterval;
    }
}

代码中实例化的SequenceDao对象是进行对数据库的Sequence的获取,我使用存储过程对数据看进行操作:

4. 数据库

BEGIN
    #传入的变量

    # _size 一次获取多少个Sequence
    # _sign 业务代码的标示

    #定义结果变量 最终返回的数据
    DECLARE v_seq VARCHAR(20);

    #定义判断条件变量
    DECLARE v_is_loop INT(20);

    #定义更新值
    DECLARE v_is_update INT(20);

    #定义SQL异常 0 正常 1 异常
    DECLARE t_error INTEGER DEFAULT 0;

    #当SQL执行异常时t_error为1
    DECLARE CONTINUE HANDLER FOR SQLEXCEPTION SET t_error=1;

    #开启事务
    START TRANSACTION;

    #判断是否存在记录不存在则写入
    IF((SELECT COUNT(0) FROM djh_sequence WHERE ruleId = _sign ) =0 ) THEN
        INSERT INTO djh_sequence (ruleId,value) VALUES (_sign ,0);
    END IF;

    #查询当前的值 设置返回结果
    SET v_seq = (SELECT value FROM djh_sequence WHERE ruleId = _sign);

    #设置循环从1开始
    SET v_is_loop = 1;

    #设置更新值
    SET v_is_update = v_seq;

    #开启loop循环
    num_loop:LOOP
        #更新更新值
        SET v_is_update = (SELECT value FROM djh_sequence WHERE ruleId = _sign )+1;
        #判断是否开启新的一轮循环 大于9999就开启新的一轮循环
        IF v_is_update>9999 THEN
                #当value+1超过9999的时候,重置为0
                UPDATE djh_sequence SET value = 0;
        #不开启新的一轮循环 则加1
        ELSE
                #更新
                UPDATE djh_sequence SET value = v_is_update;
        END IF; 

        #设置循环条件+1
        SET v_is_loop = v_is_loop+1;

        #判断是否大于循环次数
        IF v_is_loop>_size THEN
                LEAVE num_loop;
        END IF;

    END LOOP;

    #返回起始值
    SELECT v_seq;

    #当SQL异常时执行回滚操作 
    IF t_error = 1 THEN  
        ROLLBACK;  
    ELSE
        COMMIT;
    END IF;
END
Last modification:March 18th, 2019 at 04:47 pm
If you think my article is useful to you, please feel free to appreciate

One comment

  1. 楚狂人博客

    实用性很强,谢谢分享

Leave a Comment