Skip to content

Instantly share code, notes, and snippets.

@17307
Last active March 4, 2021 11:58
Show Gist options
  • Save 17307/f8fea383d367df1eb25bb359df2d17e5 to your computer and use it in GitHub Desktop.
Save 17307/f8fea383d367df1eb25bb359df2d17e5 to your computer and use it in GitHub Desktop.
redis stream consumer #redis
import redis
import time
pool = redis.ConnectionPool(host='192.168.137.3', port=6379, password='', decode_responses=True)
r = redis.Redis(connection_pool=pool)
STREAM_NAME = "consumer"
GROUP_NAME = "g1"
CONSUMER_NAME = "c1"
"""
# 判断是否存在某个组
groups_info = r.xinfo_groups(STREAM_NAME)
# print(groups_info)
group_exist = False
for i in groups_info:
if i["name"] == GROUP_NAME:
# 删除这个组新建
group_exist = True
if group_exist is False:
# 新建组
r.xgroup_create(STREAM_NAME, GROUP_NAME, id="0-0")
# 判断消费者是否存在
consumers_info = r.xinfo_consumers(STREAM_NAME, GROUP_NAME)
consumer_exist = False
for i in consumers_info:
if i["name"] == CONSUMER_NAME:
consumer_exist = True
if consumer_exist is False:
stream_datas = r.xreadgroup(GROUP_NAME, CONSUMER_NAME, {STREAM_NAME: "0"}, count=1, block=0)
print(stream_datas)
"""
# 先删除组,再创建组
r.xgroup_destroy(STREAM_NAME, GROUP_NAME)
r.xgroup_create(STREAM_NAME, GROUP_NAME, "0-0")
# 先遍历pending
next_id = "0-0"
while True:
data = r.xreadgroup(GROUP_NAME, CONSUMER_NAME, {STREAM_NAME: next_id}, count=1, block=0)
print(data)
try:
r.xack(STREAM_NAME, GROUP_NAME, next_id)
except Exception as e:
print(e)
if data[0][1]:
next_id = data[0][1][0][0]
print(next_id)
else:
break
while True:
data = r.xreadgroup(GROUP_NAME, CONSUMER_NAME, {STREAM_NAME: ">"}, count=1, block=0)
try:
# TODO
time.sleep(1)
pass
if data[0][1]:
next_id = data[0][1][0][0]
print(data)
r.xack(STREAM_NAME, GROUP_NAME, next_id)
except Exception as e:
print(e)
break
import redis
import time
pool = redis.ConnectionPool(host='192.168.137.3', port=6379, password='', decode_responses=True)
r = redis.Redis(connection_pool=pool)
STREAM_NAME = "consumer"
GROUP_NAME = "g1"
CONSUMER_NAME = "c2"
"""
# 判断是否存在某个组
groups_info = r.xinfo_groups(STREAM_NAME)
# print(groups_info)
group_exist = False
for i in groups_info:
if i["name"] == GROUP_NAME:
# 删除这个组新建
group_exist = True
if group_exist is False:
# 新建组
r.xgroup_create(STREAM_NAME, GROUP_NAME, id="0-0")
# 判断消费者是否存在
consumers_info = r.xinfo_consumers(STREAM_NAME, GROUP_NAME)
consumer_exist = False
for i in consumers_info:
if i["name"] == CONSUMER_NAME:
consumer_exist = True
if consumer_exist is False:
stream_datas = r.xreadgroup(GROUP_NAME, CONSUMER_NAME, {STREAM_NAME: "0"}, count=1, block=0)
print(stream_datas)
"""
# 先遍历pending
next_id = "0-0"
while True:
data = r.xreadgroup(GROUP_NAME, CONSUMER_NAME, {STREAM_NAME: next_id}, count=1, block=0)
print(data)
try:
r.xack(STREAM_NAME, GROUP_NAME, next_id)
except Exception as e:
print(e)
if data[0][1]:
next_id = data[0][1][0][0]
print(next_id)
else:
break
while True:
data = r.xreadgroup(GROUP_NAME, CONSUMER_NAME, {STREAM_NAME: ">"}, count=1, block=0)
try:
# TODO
time.sleep(1)
pass
if data[0][1]:
next_id = data[0][1][0][0]
print(data)
r.xack(STREAM_NAME, GROUP_NAME, next_id)
except Exception as e:
print(e)
break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment