Skip to content

Instantly share code, notes, and snippets.

@ichux
Created September 24, 2025 14:23
Show Gist options
  • Select an option

  • Save ichux/82ed7ca16de53b6a4b838c9fdd3563de to your computer and use it in GitHub Desktop.

Select an option

Save ichux/82ed7ca16de53b6a4b838c9fdd3563de to your computer and use it in GitHub Desktop.

Revisions

  1. ichux created this gist Sep 24, 2025.
    81 changes: 81 additions & 0 deletions RedisTimeTracker.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,81 @@
    import time
    from datetime import datetime, timedelta, timezone

    import redis


    class RedisTimeTracker:
    def __init__(self):
    self.RD = redis.Redis(
    host="127.0.0.1",
    port=6379,
    decode_responses=True,
    db=0,
    )
    self.HS = "hs"

    if self.RD.ping():
    print("Connected to Redis.")
    else:
    print("Failed to connect to Redis.")
    return

    # Test the methods here
    print("Initializing test...")

    # Add current time to the Redis sorted set
    self.present()

    # Add yesterday's timestamps
    self.yesterday()

    # Prune data older than 24 hours
    self.prune_last_24_hours()

    # Show the current data in Redis sorted set
    self.shown()

    def past(self):
    now = datetime.now(timezone.utc)
    start_of_today = now.replace(hour=0, minute=0, second=0, microsecond=0)
    start_of_yesterday = start_of_today - timedelta(days=1)
    start_of_yesterday_timestamp = int(start_of_yesterday.timestamp())
    noon_yesterday = start_of_yesterday + timedelta(hours=12)
    noon_yesterday_timestamp = int(noon_yesterday.timestamp())
    end_of_yesterday = start_of_yesterday + timedelta(
    hours=23, minutes=59, seconds=59
    )
    end_of_yesterday_timestamp = int(end_of_yesterday.timestamp())

    # Return a dictionary that is compatible with `zadd`
    return {
    str(start_of_yesterday_timestamp): start_of_yesterday_timestamp,
    str(noon_yesterday_timestamp): noon_yesterday_timestamp,
    str(end_of_yesterday_timestamp): end_of_yesterday_timestamp,
    }

    def present(self):
    now = int(time.time())
    self.RD.zadd(self.HS, {str(now): now})

    def yesterday(self):
    yesterday_data = self.past()
    self.RD.zadd(self.HS, yesterday_data)

    def prune_last_24_hours(self):
    now = int(time.time())
    dft = datetime.fromtimestamp(now, tz=timezone.utc)
    start_of_day = dft.replace(hour=0, minute=0, second=0, microsecond=0)
    start_of_day_timestamp = int(start_of_day.timestamp())
    self.RD.zremrangebyscore(self.HS, "-inf", start_of_day_timestamp - 1)

    def shown(self):
    print(f"Total value: {self.RD.zcard(self.HS)}")
    for member, score in self.RD.zrange(self.HS, 0, -1, withscores=True):
    dt = datetime.fromtimestamp(score, tz=timezone.utc)
    print(f"{member} | {score} | {dt}")


    # Instantiate the class to test
    if __name__ == "__main__":
    RedisTimeTracker()