Last active
April 23, 2019 11:39
-
-
Save mstump/147fc4a29bd265f4328dfdd6096a7336 to your computer and use it in GitHub Desktop.
Revisions
-
Matt Stump revised this gist
Nov 24, 2018 . 1 changed file with 26 additions and 6 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -51,6 +51,10 @@ class PageView(faust.Record, serializer='page_view_pb'): def occurred_at_posix(self): return calendar.timegm(self.occurred_at.timetuple()) @staticmethod def id_hash(pv): return pv.id def _pbSupport(*args): class PBSupport(codecs.Codec): @@ -109,7 +113,22 @@ def read_entries(self, expire_clock=None): self.entries = list(itertools.dropwhile(lambda x: x.expired(expire_clock), self.entries)) return [entry.value for entry in self.entries] class CacheSet(): def __init__(self, hash_function=hash): self.entries = dict() self.lock = RLock() self.hash_function = hash_function def add_entry(self, value, ttl=20, expires_at=None): with self.lock: self.entries[self.hash_function(value)] = CacheEntry(value, ttl, expires_at) def read_entries(self, expire_clock=None): with self.lock: self.entries = dict(itertools.dropwhile(lambda x: x[1].expired(expire_clock), self.entries.items())) return [entry.value for entry in self.entries.values()] cache = CacheSet(PageView.id_hash) ttl = 10 app = faust.App( @@ -139,7 +158,7 @@ def read_entries(self, expire_clock=None): @app.agent(page_view_topic) async def print_windowed_events(stream): async for page_view in stream: cache.add_entry(page_view, expires_at=page_view.occurred_at_posix + ttl) cache_entries = cache.read_entries(page_view.occurred_at_posix) print(f"seconds in the past: {datetime.utcnow() - page_view.occurred_at}") @@ -152,16 +171,17 @@ def custom_sleep(): yield from asyncio.sleep(1) async def historical_producer(): d = datetime.utcnow() - timedelta(seconds=100) for i in range(100): user = ''.join(random.choices(string.ascii_uppercase + string.digits, k=7)) page_view = PageView(str(uuid.uuid4()), user, d) await page_view_topic.send(value=page_view) d += timedelta(seconds=1) await custom_sleep() if __name__ == "__main__": loop = asyncio.get_event_loop() loop.run_until_complete(historical_producer()) loop.stop() -
Matt Stump revised this gist
Nov 24, 2018 . 1 changed file with 10 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,10 @@ syntax = "proto3"; import "google/protobuf/timestamp.proto"; package page_views; message PageView { string id = 1; string user = 2; google.protobuf.Timestamp occurred_at = 3; } -
Matt Stump created this gist
Nov 24, 2018 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,167 @@ import asyncio import calendar import faust import itertools import random import signal import string import time import uuid from contextlib import suppress from datetime import datetime, timedelta from faust import windows from faust.serializers import codecs from google.protobuf.timestamp_pb2 import Timestamp from threading import Thread, RLock from typing import Any import page_views_pb2 def copy_attributes(source, dest): fields = None if isinstance(source, dict): for name in source.keys(): if hasattr(dest, name): setattr(dest, name, source.get(name)) else: if hasattr(source, "_meta"): fields = source._meta.fields else: fields = dir(source) for name in fields: if not name.startswith('_'): value = getattr(source, name) setattr(dest, name, value) return dest class PageView(faust.Record, serializer='page_view_pb'): id: str = None user: str = None occurred_at: datetime = None @property def occurred_at_posix(self): return calendar.timegm(self.occurred_at.timetuple()) def _pbSupport(*args): class PBSupport(codecs.Codec): def _dumps(self, obj: Any) -> bytes: out_obj = page_views_pb2.PageView() out_obj.id = obj["id"] out_obj.user = obj["user"] out_obj.occurred_at.FromDatetime(obj["occurred_at"]) return out_obj.SerializeToString() def _loads(self, s: bytes) -> Any: pb_obj = page_views_pb2.PageView() pb_obj.ParseFromString(s) out_obj = PageView() out_obj.id = pb_obj.id out_obj.user = pb_obj.user out_obj.occurred_at = pb_obj.occurred_at.ToDatetime() return out_obj codecs.register('page_view_pb', PBSupport()) _pbSupport() class CacheEntry(): def __init__(self, value, ttl=20, expires_at=None): self.value = value if expires_at: self.expires_at = expires_at else: self.expires_at = time.time() + ttl self._expired = False def expired(self, expire_clock=None): if not expire_clock: expire_clock = time.time() if self._expired is False: return (self.expires_at < expire_clock) else: return self._expired class CacheList(): def __init__(self): self.entries = [] self.lock = RLock() def add_entry(self, value, ttl=20, expires_at=None): with self.lock: self.entries.append(CacheEntry(value, ttl, expires_at)) def read_entries(self, expire_clock=None): with self.lock: self.entries = list(itertools.dropwhile(lambda x: x.expired(expire_clock), self.entries)) return [entry.value for entry in self.entries] cache = CacheList() ttl = 10 app = faust.App( 'page_views', broker='kafka://localhost:9092', topic_partitions=4, ) page_view_topic = app.topic('page_views', value_type=PageView) active_users_table = app.Table( 'active_users', default=None).tumbling( ttl, expires=timedelta(seconds=30), key_index=True ).relative_to_field(PageView.occurred_at) # @app.timer(interval=2, on_leader=True) # async def generator(): # user = ''.join(random.choices(string.ascii_uppercase + string.digits, k=7)) # page_view = PageView(str(uuid.uuid4()), user, datetime.utcnow()) # await page_view_topic.send(value=page_view) @app.agent(page_view_topic) async def print_windowed_events(stream): async for page_view in stream: cache.add_entry(page_view.user, expires_at=page_view.occurred_at_posix + ttl) cache_entries = cache.read_entries(page_view.occurred_at_posix) print(f"seconds in the past: {datetime.utcnow() - page_view.occurred_at}") print(f"{len(cache_entries)}, {repr(cache_entries)}") @asyncio.coroutine def custom_sleep(): print("SLEEP", datetime.now()) yield from asyncio.sleep(1) async def test_producer(): d = datetime.utcnow() - timedelta(seconds=100) for i in range(100): await page_view_topic.send(value=PageView("foo", "bar", d)) d += timedelta(seconds=1) print('a') await custom_sleep() if __name__ == "__main__": loop = asyncio.get_event_loop() loop.run_until_complete(test_producer()) loop.stop()