Last active
          March 4, 2021 11:58 
        
      - 
      
- 
        Save 17307/f8fea383d367df1eb25bb359df2d17e5 to your computer and use it in GitHub Desktop. 
    redis stream consumer #redis
  
        
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | import redis | |
| import time | |
| pool = redis.ConnectionPool(host='192.168.137.3', port=6379, password='', decode_responses=True) | |
| r = redis.Redis(connection_pool=pool) | |
| STREAM_NAME = "consumer" | |
| GROUP_NAME = "g1" | |
| CONSUMER_NAME = "c1" | |
| """ | |
| # 判断是否存在某个组 | |
| groups_info = r.xinfo_groups(STREAM_NAME) | |
| # print(groups_info) | |
| group_exist = False | |
| for i in groups_info: | |
| if i["name"] == GROUP_NAME: | |
| # 删除这个组新建 | |
| group_exist = True | |
| if group_exist is False: | |
| # 新建组 | |
| r.xgroup_create(STREAM_NAME, GROUP_NAME, id="0-0") | |
| # 判断消费者是否存在 | |
| consumers_info = r.xinfo_consumers(STREAM_NAME, GROUP_NAME) | |
| consumer_exist = False | |
| for i in consumers_info: | |
| if i["name"] == CONSUMER_NAME: | |
| consumer_exist = True | |
| if consumer_exist is False: | |
| stream_datas = r.xreadgroup(GROUP_NAME, CONSUMER_NAME, {STREAM_NAME: "0"}, count=1, block=0) | |
| print(stream_datas) | |
| """ | |
| # 先删除组,再创建组 | |
| r.xgroup_destroy(STREAM_NAME, GROUP_NAME) | |
| r.xgroup_create(STREAM_NAME, GROUP_NAME, "0-0") | |
| # 先遍历pending | |
| next_id = "0-0" | |
| while True: | |
| data = r.xreadgroup(GROUP_NAME, CONSUMER_NAME, {STREAM_NAME: next_id}, count=1, block=0) | |
| print(data) | |
| try: | |
| r.xack(STREAM_NAME, GROUP_NAME, next_id) | |
| except Exception as e: | |
| print(e) | |
| if data[0][1]: | |
| next_id = data[0][1][0][0] | |
| print(next_id) | |
| else: | |
| break | |
| while True: | |
| data = r.xreadgroup(GROUP_NAME, CONSUMER_NAME, {STREAM_NAME: ">"}, count=1, block=0) | |
| try: | |
| # TODO | |
| time.sleep(1) | |
| pass | |
| if data[0][1]: | |
| next_id = data[0][1][0][0] | |
| print(data) | |
| r.xack(STREAM_NAME, GROUP_NAME, next_id) | |
| except Exception as e: | |
| print(e) | |
| break | 
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | import redis | |
| import time | |
| pool = redis.ConnectionPool(host='192.168.137.3', port=6379, password='', decode_responses=True) | |
| r = redis.Redis(connection_pool=pool) | |
| STREAM_NAME = "consumer" | |
| GROUP_NAME = "g1" | |
| CONSUMER_NAME = "c2" | |
| """ | |
| # 判断是否存在某个组 | |
| groups_info = r.xinfo_groups(STREAM_NAME) | |
| # print(groups_info) | |
| group_exist = False | |
| for i in groups_info: | |
| if i["name"] == GROUP_NAME: | |
| # 删除这个组新建 | |
| group_exist = True | |
| if group_exist is False: | |
| # 新建组 | |
| r.xgroup_create(STREAM_NAME, GROUP_NAME, id="0-0") | |
| # 判断消费者是否存在 | |
| consumers_info = r.xinfo_consumers(STREAM_NAME, GROUP_NAME) | |
| consumer_exist = False | |
| for i in consumers_info: | |
| if i["name"] == CONSUMER_NAME: | |
| consumer_exist = True | |
| if consumer_exist is False: | |
| stream_datas = r.xreadgroup(GROUP_NAME, CONSUMER_NAME, {STREAM_NAME: "0"}, count=1, block=0) | |
| print(stream_datas) | |
| """ | |
| # 先遍历pending | |
| next_id = "0-0" | |
| while True: | |
| data = r.xreadgroup(GROUP_NAME, CONSUMER_NAME, {STREAM_NAME: next_id}, count=1, block=0) | |
| print(data) | |
| try: | |
| r.xack(STREAM_NAME, GROUP_NAME, next_id) | |
| except Exception as e: | |
| print(e) | |
| if data[0][1]: | |
| next_id = data[0][1][0][0] | |
| print(next_id) | |
| else: | |
| break | |
| while True: | |
| data = r.xreadgroup(GROUP_NAME, CONSUMER_NAME, {STREAM_NAME: ">"}, count=1, block=0) | |
| try: | |
| # TODO | |
| time.sleep(1) | |
| pass | |
| if data[0][1]: | |
| next_id = data[0][1][0][0] | |
| print(data) | |
| r.xack(STREAM_NAME, GROUP_NAME, next_id) | |
| except Exception as e: | |
| print(e) | |
| break | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment