在公司的业务开发当中,由于是分布式系统,并发量还比较高,如果使用MySQL的主键自增和一些普通方案解决不了DB的问题,此时在全局唯一ID的事情上发愁,网上搜罗一堆,最终借鉴了美团的Leaf-snowflake方案来解决了问题。
1. 需求
美团开发团队的文章中当时列出的ID需求大致如下:
- 全局唯一性:不能出现重复的ID号,既然是唯一标识,这是最基本的要求。
- 趋势递增:在MySQL InnoDB引擎中使用的是聚集索引,由于多数RDBMS使用B-tree的数据结构来存储索引数据,在主键的选择上面我们应该尽量使用有序的主键保证写入性能.
- 单调递增:保证下一个ID一定大于上一个ID,例如事务版本号、IM增量消息、排序等特殊需求。
- 信息安全:如果ID是连续的,恶意用户的扒取工作就非常容易做了,直接按照顺序下载指定URL即可;如果是订单号就更危险了,竞对可以直接知道我们一天的单量。所以在一些应用场景下,会需要ID无规则、不规则。
2. 方案
上诉方案基本满足了业务需求,便对于Leaf进行了部分改造。改造过后的方案,去掉了时钟问题的解决办法,使用了在取Sequence时的双buffer方案。
全局ID = 业务编号(2位) + 毫秒时间戳(取后11位) + 机器ID(2位) + Sequence(4位)
业务编号保证了每个业务模块的唯一性,机器ID保证了分布式部署时数据的唯一性,毫秒时间戳和Sequence保证了每台机器的在一秒能够处理近万个ID的生成。足以承担公司的业务量。方案中过于依赖系统时钟,当时间回拨时也会造成问题。在Sequence中采用了leaf的双Buffer优化,每次获取Sequence时获取一定量的Sequence,当使用到临近系统预定的阈值时,再进行取Sequence,以解决数据IO的问题。
3. Sequence
每次进行取Sequence时,防止多并发情况,进行加同步锁,当多个任务进来时进行排队,一个取完另一个再进行取Sequence,防止Sequence取值错误。
具体代码:
import com.papakeji.dproject.dao.dao.SequenceDao;
import com.papakeji.dproject.dao.impl.SequenceDaoImpl;
import com.papakeji.dproject.msg.ErrorMsg;
import com.papakeji.dproject.util.CheckException;
import com.papakeji.dproject.util.FormatUtil;
import lombok.Data;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Service;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
/**
* @author by mrdong916
*/
@Slf4j
@Scope("prototype")
@Service
@Configuration
public class IDService extends BaseService {
/**
* 获取配置文件中机器ID
* 不允许为空
*/
@Value("${machineId}")
private String machineId;
/**
* 获取配置文件中Sequence的临界值
* 最小值为10%
* 最大值为100%
*/
@Min(10)
@Max(100)
@Value("${threshold}")
private int threshold;
/**
* 获取配置文件中号段区间 最小取值为100
*/
@Min(100)
@Value("${numberInterval}")
public int numberInterval;
/**
* 当前值
*/
private static int current;
/**
* 最大值
*/
public static int maxValue;
/**
* 下一次sequence起始值
*/
public static int nextStartValue;
@Setter
private String ruleId;
@Autowired
public SequenceDao sequenceDao;
synchronized int getSequence(String sign) throws CheckException {
log.info("线程{}:开始准备取Sequence的值,当前Sequence的值:{}", Thread.currentThread().getId(),current);
//取sequence
int sequence = current;
//取完Sequence进行判断是否要切入新的Sequence号段
if (current==maxValue){
current = nextStartValue;
maxValue = nextStartValue + numberInterval;
}else{
//sequence步增
current++;
}
//判断sequence是否大于最大值 大于最大值抛异常
if (sequence>maxValue){
log.error("sequence value than maxValue,please check sequence");
throw new CheckException(ErrorMsg.SYSTEM_ERROR_UNKNOWN.getErrorCode());
}
//判断当前剩余sequence是否小于或等于阈值 是则取sequence
if (maxValue-sequence <= numberInterval*threshold/100){
//获取新的一段Sequence
nextStartValue = sequenceDao.getIntervalSequence(numberInterval,sign);
}
log.info("线程{}:完成更新Sequence的值,当前Sequence的值:{}", Thread.currentThread().getId(),current);
return sequence;
}
/**
* ID生成规则 2位规则编号 + 11位的毫秒 + 部署机器号2位 + 4位的Sequence
* @return ID
*/
public String getRuleID() throws CheckException {
//当第一次进行取ID时进行初始化
if (current==0 && maxValue==0){
startSequence(ruleId);
}
//13位的毫秒时间戳
String systemTime = String.valueOf(System.currentTimeMillis());
return ruleId + systemTime.substring(2, systemTime.length()) + machineId + FormatUtil.NumberToSize(getSequence(ruleId), 4);
}
/**
* 启动注入参数
*/
public void startSequence(String ruleId){
//获取Sequence最新的sequence
current = sequenceDao.getIntervalSequence(numberInterval,ruleId);
//设置最大值
maxValue = current + numberInterval;
}
}
代码中实例化的SequenceDao对象是进行对数据库的Sequence的获取,我使用存储过程对数据看进行操作:
4. 数据库
BEGIN
#传入的变量
# _size 一次获取多少个Sequence
# _sign 业务代码的标示
#定义结果变量 最终返回的数据
DECLARE v_seq VARCHAR(20);
#定义判断条件变量
DECLARE v_is_loop INT(20);
#定义更新值
DECLARE v_is_update INT(20);
#定义SQL异常 0 正常 1 异常
DECLARE t_error INTEGER DEFAULT 0;
#当SQL执行异常时t_error为1
DECLARE CONTINUE HANDLER FOR SQLEXCEPTION SET t_error=1;
#开启事务
START TRANSACTION;
#判断是否存在记录不存在则写入
IF((SELECT COUNT(0) FROM djh_sequence WHERE ruleId = _sign ) =0 ) THEN
INSERT INTO djh_sequence (ruleId,value) VALUES (_sign ,0);
END IF;
#查询当前的值 设置返回结果
SET v_seq = (SELECT value FROM djh_sequence WHERE ruleId = _sign);
#设置循环从1开始
SET v_is_loop = 1;
#设置更新值
SET v_is_update = v_seq;
#开启loop循环
num_loop:LOOP
#更新更新值
SET v_is_update = (SELECT value FROM djh_sequence WHERE ruleId = _sign )+1;
#判断是否开启新的一轮循环 大于9999就开启新的一轮循环
IF v_is_update>9999 THEN
#当value+1超过9999的时候,重置为0
UPDATE djh_sequence SET value = 0;
#不开启新的一轮循环 则加1
ELSE
#更新
UPDATE djh_sequence SET value = v_is_update;
END IF;
#设置循环条件+1
SET v_is_loop = v_is_loop+1;
#判断是否大于循环次数
IF v_is_loop>_size THEN
LEAVE num_loop;
END IF;
END LOOP;
#返回起始值
SELECT v_seq;
#当SQL异常时执行回滚操作
IF t_error = 1 THEN
ROLLBACK;
ELSE
COMMIT;
END IF;
END