Skip to content

Instantly share code, notes, and snippets.

@xcuzalex
Created February 17, 2022 03:47
Show Gist options
  • Select an option

  • Save xcuzalex/df1d8906e2a107be457e6b91b0c13c0b to your computer and use it in GitHub Desktop.

Select an option

Save xcuzalex/df1d8906e2a107be457e6b91b0c13c0b to your computer and use it in GitHub Desktop.
python-snowflake
#!/usr/bin/env python3
import time
from uuid import getnode as get_mac
import flask
"""
基于twitter的雪花算法生成不重复,且具有自增的流水号
该算法参考 https://github.com/twitter/snowflake
"""
app = flask.Flask(__name__)
@app.route('/', methods=['GET', 'POST'])
def index():
sn = SnowFlake()
return str(sn.nextId())
class SnowFlake:
def __init__(self, dataCenterId=1, machineId=10):
"""
dataCenterId #数据中心
machineId #机器标识
"""
# 数据初始化
# 起始的时间戳
self.START_STMP = 1480166465631
# #每一部分占用的位数
self.SEQUENCE_BIT = 6 # 序列号占用的位数
self.MACHINE_BIT = 5 # 机器标识占用的位数
self.DATACENTER_BIT = 5 # 数据中心占用的位数
# 每一部分的最大值
self.MAX_DATACENTER_NUM = -1 ^ (-1 << self.DATACENTER_BIT) # 31
self.MAX_MACHINE_NUM = -1 ^ (-1 << self.MACHINE_BIT) # 31
self.MAX_SEQUENCE = -1 ^ (-1 << self.SEQUENCE_BIT) # 63
# 每一部分向左的位移
self.MACHINE_LEFT = self.SEQUENCE_BIT
self.DATACENTER_LEFT = self.SEQUENCE_BIT + self.MACHINE_BIT
self.TIMESTMP_LEFT = self.DATACENTER_LEFT + self.DATACENTER_BIT
# 上一次时间戳
self.lastStmp = -1
# 序列号
self.sequence = 0
self.maxDataCenterId = -1 ^ (-1 << self.MACHINE_BIT)
self.maxWorkerId = -1 ^ (-1 << self.DATACENTER_BIT)
if not machineId:
# 获得默认的machineID
machineId = self.getMachineId()
if (dataCenterId > self.MAX_DATACENTER_NUM or dataCenterId < 0):
print("dataCenterId can't be greater than MAX_DATACENTER_NUM or less than 0")
return False
if (machineId > self.MAX_MACHINE_NUM or machineId < 0):
print("machineId can't be greater than MAX_MACHINE_NUM or less than 0")
return False
self.dataCenterId = dataCenterId
self.machineId = machineId
# 产生下一个ID
def nextId(self):
currStmp = self.getNewstmp()
if (currStmp < self.lastStmp):
print("Clock moved backwards. Refusing to generate id")
return False
if (currStmp == self.lastStmp):
# 相同毫秒内,序列号自增
self.sequence = (self.sequence + 1) & self.MAX_SEQUENCE
# 同一毫秒的序列数已经达到最大
if (self.sequence == 0):
currStmp = self.getNextMill()
else:
# 不同毫秒内,序列号置为0
self.sequence = 0
self.lastStmp = currStmp
# 时间戳部分 | #数据中心部分 | #机器标识部分 | #序列号部分
return (currStmp - self.START_STMP) << self.TIMESTMP_LEFT | self.dataCenterId << self.DATACENTER_LEFT | self.machineId << self.MACHINE_LEFT | self.sequence
def getNextMill(self):
mill = self.getNewstmp()
while (mill <= self.lastStmp):
mill = self.getNewstmp()
return mill
def getNewstmp(self):
return int(time.time() * 1000)
def getMaxWorkerId(self, dataCenterId, maxWorkerId):
mpid = []
mpid.append(dataCenterId)
mpid.append('mxname')
return (mpid.toString().hashCode() & 0xffff) % (maxWorkerId + 1)
# 根据机器所在的Mac地址计算除MachineID,保证不重复
def getMachineId(self):
mac = get_mac()
mid = (0x000000FF & (mac // 10) |
(0x0000FF00 & ((mac // 100) << 8))) >> 6
return mid
if __name__ == '__main__':
app.run(host='0.0.0.0', port=9090)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment