Created
September 24, 2025 14:23
-
-
Save ichux/82ed7ca16de53b6a4b838c9fdd3563de to your computer and use it in GitHub Desktop.
Revisions
-
ichux created this gist
Sep 24, 2025 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,81 @@ import time from datetime import datetime, timedelta, timezone import redis class RedisTimeTracker: def __init__(self): self.RD = redis.Redis( host="127.0.0.1", port=6379, decode_responses=True, db=0, ) self.HS = "hs" if self.RD.ping(): print("Connected to Redis.") else: print("Failed to connect to Redis.") return # Test the methods here print("Initializing test...") # Add current time to the Redis sorted set self.present() # Add yesterday's timestamps self.yesterday() # Prune data older than 24 hours self.prune_last_24_hours() # Show the current data in Redis sorted set self.shown() def past(self): now = datetime.now(timezone.utc) start_of_today = now.replace(hour=0, minute=0, second=0, microsecond=0) start_of_yesterday = start_of_today - timedelta(days=1) start_of_yesterday_timestamp = int(start_of_yesterday.timestamp()) noon_yesterday = start_of_yesterday + timedelta(hours=12) noon_yesterday_timestamp = int(noon_yesterday.timestamp()) end_of_yesterday = start_of_yesterday + timedelta( hours=23, minutes=59, seconds=59 ) end_of_yesterday_timestamp = int(end_of_yesterday.timestamp()) # Return a dictionary that is compatible with `zadd` return { str(start_of_yesterday_timestamp): start_of_yesterday_timestamp, str(noon_yesterday_timestamp): noon_yesterday_timestamp, str(end_of_yesterday_timestamp): end_of_yesterday_timestamp, } def present(self): now = int(time.time()) self.RD.zadd(self.HS, {str(now): now}) def yesterday(self): yesterday_data = self.past() self.RD.zadd(self.HS, yesterday_data) def prune_last_24_hours(self): now = int(time.time()) dft = datetime.fromtimestamp(now, tz=timezone.utc) start_of_day = dft.replace(hour=0, minute=0, second=0, microsecond=0) start_of_day_timestamp = int(start_of_day.timestamp()) self.RD.zremrangebyscore(self.HS, "-inf", start_of_day_timestamp - 1) def shown(self): print(f"Total value: {self.RD.zcard(self.HS)}") for member, score in self.RD.zrange(self.HS, 0, -1, withscores=True): dt = datetime.fromtimestamp(score, tz=timezone.utc) print(f"{member} | {score} | {dt}") # Instantiate the class to test if __name__ == "__main__": RedisTimeTracker()