Skip to content

Instantly share code, notes, and snippets.

@ichux
Created September 24, 2025 14:23
Show Gist options
  • Select an option

  • Save ichux/82ed7ca16de53b6a4b838c9fdd3563de to your computer and use it in GitHub Desktop.

Select an option

Save ichux/82ed7ca16de53b6a4b838c9fdd3563de to your computer and use it in GitHub Desktop.
import time
from datetime import datetime, timedelta, timezone
import redis
class RedisTimeTracker:
def __init__(self):
self.RD = redis.Redis(
host="127.0.0.1",
port=6379,
decode_responses=True,
db=0,
)
self.HS = "hs"
if self.RD.ping():
print("Connected to Redis.")
else:
print("Failed to connect to Redis.")
return
# Test the methods here
print("Initializing test...")
# Add current time to the Redis sorted set
self.present()
# Add yesterday's timestamps
self.yesterday()
# Prune data older than 24 hours
self.prune_last_24_hours()
# Show the current data in Redis sorted set
self.shown()
def past(self):
now = datetime.now(timezone.utc)
start_of_today = now.replace(hour=0, minute=0, second=0, microsecond=0)
start_of_yesterday = start_of_today - timedelta(days=1)
start_of_yesterday_timestamp = int(start_of_yesterday.timestamp())
noon_yesterday = start_of_yesterday + timedelta(hours=12)
noon_yesterday_timestamp = int(noon_yesterday.timestamp())
end_of_yesterday = start_of_yesterday + timedelta(
hours=23, minutes=59, seconds=59
)
end_of_yesterday_timestamp = int(end_of_yesterday.timestamp())
# Return a dictionary that is compatible with `zadd`
return {
str(start_of_yesterday_timestamp): start_of_yesterday_timestamp,
str(noon_yesterday_timestamp): noon_yesterday_timestamp,
str(end_of_yesterday_timestamp): end_of_yesterday_timestamp,
}
def present(self):
now = int(time.time())
self.RD.zadd(self.HS, {str(now): now})
def yesterday(self):
yesterday_data = self.past()
self.RD.zadd(self.HS, yesterday_data)
def prune_last_24_hours(self):
now = int(time.time())
dft = datetime.fromtimestamp(now, tz=timezone.utc)
start_of_day = dft.replace(hour=0, minute=0, second=0, microsecond=0)
start_of_day_timestamp = int(start_of_day.timestamp())
self.RD.zremrangebyscore(self.HS, "-inf", start_of_day_timestamp - 1)
def shown(self):
print(f"Total value: {self.RD.zcard(self.HS)}")
for member, score in self.RD.zrange(self.HS, 0, -1, withscores=True):
dt = datetime.fromtimestamp(score, tz=timezone.utc)
print(f"{member} | {score} | {dt}")
# Instantiate the class to test
if __name__ == "__main__":
RedisTimeTracker()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment