Skip to content

Instantly share code, notes, and snippets.

@mstump
Last active April 23, 2019 11:39
Show Gist options
  • Select an option

  • Save mstump/147fc4a29bd265f4328dfdd6096a7336 to your computer and use it in GitHub Desktop.

Select an option

Save mstump/147fc4a29bd265f4328dfdd6096a7336 to your computer and use it in GitHub Desktop.

Revisions

  1. Matt Stump revised this gist Nov 24, 2018. 1 changed file with 26 additions and 6 deletions.
    32 changes: 26 additions & 6 deletions historical_page_views.py
    Original file line number Diff line number Diff line change
    @@ -51,6 +51,10 @@ class PageView(faust.Record, serializer='page_view_pb'):
    def occurred_at_posix(self):
    return calendar.timegm(self.occurred_at.timetuple())

    @staticmethod
    def id_hash(pv):
    return pv.id

    def _pbSupport(*args):

    class PBSupport(codecs.Codec):
    @@ -109,7 +113,22 @@ def read_entries(self, expire_clock=None):
    self.entries = list(itertools.dropwhile(lambda x: x.expired(expire_clock), self.entries))
    return [entry.value for entry in self.entries]

    cache = CacheList()
    class CacheSet():
    def __init__(self, hash_function=hash):
    self.entries = dict()
    self.lock = RLock()
    self.hash_function = hash_function

    def add_entry(self, value, ttl=20, expires_at=None):
    with self.lock:
    self.entries[self.hash_function(value)] = CacheEntry(value, ttl, expires_at)

    def read_entries(self, expire_clock=None):
    with self.lock:
    self.entries = dict(itertools.dropwhile(lambda x: x[1].expired(expire_clock), self.entries.items()))
    return [entry.value for entry in self.entries.values()]

    cache = CacheSet(PageView.id_hash)
    ttl = 10

    app = faust.App(
    @@ -139,7 +158,7 @@ def read_entries(self, expire_clock=None):
    @app.agent(page_view_topic)
    async def print_windowed_events(stream):
    async for page_view in stream:
    cache.add_entry(page_view.user, expires_at=page_view.occurred_at_posix + ttl)
    cache.add_entry(page_view, expires_at=page_view.occurred_at_posix + ttl)
    cache_entries = cache.read_entries(page_view.occurred_at_posix)

    print(f"seconds in the past: {datetime.utcnow() - page_view.occurred_at}")
    @@ -152,16 +171,17 @@ def custom_sleep():
    yield from asyncio.sleep(1)


    async def test_producer():
    async def historical_producer():
    d = datetime.utcnow() - timedelta(seconds=100)
    for i in range(100):
    await page_view_topic.send(value=PageView("foo", "bar", d))
    user = ''.join(random.choices(string.ascii_uppercase + string.digits, k=7))
    page_view = PageView(str(uuid.uuid4()), user, d)
    await page_view_topic.send(value=page_view)
    d += timedelta(seconds=1)
    print('a')
    await custom_sleep()


    if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(test_producer())
    loop.run_until_complete(historical_producer())
    loop.stop()
  2. Matt Stump revised this gist Nov 24, 2018. 1 changed file with 10 additions and 0 deletions.
    10 changes: 10 additions & 0 deletions page_views.proto
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,10 @@
    syntax = "proto3";
    import "google/protobuf/timestamp.proto";

    package page_views;

    message PageView {
    string id = 1;
    string user = 2;
    google.protobuf.Timestamp occurred_at = 3;
    }
  3. Matt Stump created this gist Nov 24, 2018.
    167 changes: 167 additions & 0 deletions historical_page_views.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,167 @@
    import asyncio
    import calendar
    import faust
    import itertools
    import random
    import signal
    import string
    import time
    import uuid

    from contextlib import suppress
    from datetime import datetime, timedelta
    from faust import windows
    from faust.serializers import codecs
    from google.protobuf.timestamp_pb2 import Timestamp
    from threading import Thread, RLock
    from typing import Any

    import page_views_pb2

    def copy_attributes(source, dest):
    fields = None

    if isinstance(source, dict):
    for name in source.keys():
    if hasattr(dest, name):
    setattr(dest, name, source.get(name))

    else:

    if hasattr(source, "_meta"):
    fields = source._meta.fields

    else:
    fields = dir(source)

    for name in fields:
    if not name.startswith('_'):
    value = getattr(source, name)
    setattr(dest, name, value)

    return dest


    class PageView(faust.Record, serializer='page_view_pb'):
    id: str = None
    user: str = None
    occurred_at: datetime = None

    @property
    def occurred_at_posix(self):
    return calendar.timegm(self.occurred_at.timetuple())

    def _pbSupport(*args):

    class PBSupport(codecs.Codec):

    def _dumps(self, obj: Any) -> bytes:
    out_obj = page_views_pb2.PageView()
    out_obj.id = obj["id"]
    out_obj.user = obj["user"]
    out_obj.occurred_at.FromDatetime(obj["occurred_at"])
    return out_obj.SerializeToString()

    def _loads(self, s: bytes) -> Any:
    pb_obj = page_views_pb2.PageView()
    pb_obj.ParseFromString(s)
    out_obj = PageView()
    out_obj.id = pb_obj.id
    out_obj.user = pb_obj.user
    out_obj.occurred_at = pb_obj.occurred_at.ToDatetime()
    return out_obj

    codecs.register('page_view_pb', PBSupport())

    _pbSupport()


    class CacheEntry():
    def __init__(self, value, ttl=20, expires_at=None):
    self.value = value
    if expires_at:
    self.expires_at = expires_at
    else:
    self.expires_at = time.time() + ttl
    self._expired = False

    def expired(self, expire_clock=None):
    if not expire_clock:
    expire_clock = time.time()

    if self._expired is False:
    return (self.expires_at < expire_clock)
    else:
    return self._expired


    class CacheList():
    def __init__(self):
    self.entries = []
    self.lock = RLock()

    def add_entry(self, value, ttl=20, expires_at=None):
    with self.lock:
    self.entries.append(CacheEntry(value, ttl, expires_at))

    def read_entries(self, expire_clock=None):
    with self.lock:
    self.entries = list(itertools.dropwhile(lambda x: x.expired(expire_clock), self.entries))
    return [entry.value for entry in self.entries]

    cache = CacheList()
    ttl = 10

    app = faust.App(
    'page_views',
    broker='kafka://localhost:9092',
    topic_partitions=4,
    )

    page_view_topic = app.topic('page_views', value_type=PageView)

    active_users_table = app.Table(
    'active_users',
    default=None).tumbling(
    ttl,
    expires=timedelta(seconds=30),
    key_index=True
    ).relative_to_field(PageView.occurred_at)


    # @app.timer(interval=2, on_leader=True)
    # async def generator():
    # user = ''.join(random.choices(string.ascii_uppercase + string.digits, k=7))
    # page_view = PageView(str(uuid.uuid4()), user, datetime.utcnow())
    # await page_view_topic.send(value=page_view)


    @app.agent(page_view_topic)
    async def print_windowed_events(stream):
    async for page_view in stream:
    cache.add_entry(page_view.user, expires_at=page_view.occurred_at_posix + ttl)
    cache_entries = cache.read_entries(page_view.occurred_at_posix)

    print(f"seconds in the past: {datetime.utcnow() - page_view.occurred_at}")
    print(f"{len(cache_entries)}, {repr(cache_entries)}")


    @asyncio.coroutine
    def custom_sleep():
    print("SLEEP", datetime.now())
    yield from asyncio.sleep(1)


    async def test_producer():
    d = datetime.utcnow() - timedelta(seconds=100)
    for i in range(100):
    await page_view_topic.send(value=PageView("foo", "bar", d))
    d += timedelta(seconds=1)
    print('a')
    await custom_sleep()


    if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(test_producer())
    loop.stop()