Created
September 24, 2025 14:23
-
-
Save ichux/82ed7ca16de53b6a4b838c9fdd3563de to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import time | |
| from datetime import datetime, timedelta, timezone | |
| import redis | |
| class RedisTimeTracker: | |
| def __init__(self): | |
| self.RD = redis.Redis( | |
| host="127.0.0.1", | |
| port=6379, | |
| decode_responses=True, | |
| db=0, | |
| ) | |
| self.HS = "hs" | |
| if self.RD.ping(): | |
| print("Connected to Redis.") | |
| else: | |
| print("Failed to connect to Redis.") | |
| return | |
| # Test the methods here | |
| print("Initializing test...") | |
| # Add current time to the Redis sorted set | |
| self.present() | |
| # Add yesterday's timestamps | |
| self.yesterday() | |
| # Prune data older than 24 hours | |
| self.prune_last_24_hours() | |
| # Show the current data in Redis sorted set | |
| self.shown() | |
| def past(self): | |
| now = datetime.now(timezone.utc) | |
| start_of_today = now.replace(hour=0, minute=0, second=0, microsecond=0) | |
| start_of_yesterday = start_of_today - timedelta(days=1) | |
| start_of_yesterday_timestamp = int(start_of_yesterday.timestamp()) | |
| noon_yesterday = start_of_yesterday + timedelta(hours=12) | |
| noon_yesterday_timestamp = int(noon_yesterday.timestamp()) | |
| end_of_yesterday = start_of_yesterday + timedelta( | |
| hours=23, minutes=59, seconds=59 | |
| ) | |
| end_of_yesterday_timestamp = int(end_of_yesterday.timestamp()) | |
| # Return a dictionary that is compatible with `zadd` | |
| return { | |
| str(start_of_yesterday_timestamp): start_of_yesterday_timestamp, | |
| str(noon_yesterday_timestamp): noon_yesterday_timestamp, | |
| str(end_of_yesterday_timestamp): end_of_yesterday_timestamp, | |
| } | |
| def present(self): | |
| now = int(time.time()) | |
| self.RD.zadd(self.HS, {str(now): now}) | |
| def yesterday(self): | |
| yesterday_data = self.past() | |
| self.RD.zadd(self.HS, yesterday_data) | |
| def prune_last_24_hours(self): | |
| now = int(time.time()) | |
| dft = datetime.fromtimestamp(now, tz=timezone.utc) | |
| start_of_day = dft.replace(hour=0, minute=0, second=0, microsecond=0) | |
| start_of_day_timestamp = int(start_of_day.timestamp()) | |
| self.RD.zremrangebyscore(self.HS, "-inf", start_of_day_timestamp - 1) | |
| def shown(self): | |
| print(f"Total value: {self.RD.zcard(self.HS)}") | |
| for member, score in self.RD.zrange(self.HS, 0, -1, withscores=True): | |
| dt = datetime.fromtimestamp(score, tz=timezone.utc) | |
| print(f"{member} | {score} | {dt}") | |
| # Instantiate the class to test | |
| if __name__ == "__main__": | |
| RedisTimeTracker() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment