Skip to content

Instantly share code, notes, and snippets.

@lachesis
Created July 17, 2023 15:10
Show Gist options
  • Select an option

  • Save lachesis/2e481f695c21de0c3f6d5a67e6461f23 to your computer and use it in GitHub Desktop.

Select an option

Save lachesis/2e481f695c21de0c3f6d5a67e6461f23 to your computer and use it in GitHub Desktop.

Revisions

  1. lachesis created this gist Jul 17, 2023.
    274 changes: 274 additions & 0 deletions ephemp2p.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,274 @@
    #!/usr/bin/python2
    import collections
    import hashlib
    import json
    import logging
    import os
    import time

    import websocket

    MAX_HEARTBEAT_TIME = 30

    class Heartbeatable(object):
    def __init__(self, func, rate):
    self.func = func
    self.rate = rate
    self.last_call = None

    def call(self):
    if self.last_call is None or time.time() - self.last_call > self.rate:
    self.func()
    self.last_call = time.time()

    class PhoenixState(object):
    """An object that tracks the state of multiple phoenix channels."""
    def __init__(self):
    #self.messages = collections.defaultdict(lambda *args: []) # map of topics (str) to their messages (list)
    self.callbacks = collections.defaultdict(lambda *args: []) # map of topics (str) to their message callbacks
    self.heartbeats = {}
    self.register_heartbeat(self._phoenix_heartbeat, MAX_HEARTBEAT_TIME)

    def _generate_message(self, topic, event, payload=None):
    if payload is None:
    payload = {}
    return {
    'topic': topic,
    'event': event,
    'payload': payload
    }

    def process_message(self, message):
    """Call this with a dictionary for message to process that message."""
    topic = message['topic']
    #self.messages[topic].append(message)
    for callback in self.callbacks[topic]:
    callback(message)

    def _phoenix_heartbeat(self):
    self.send("phoenix", "heartbeat")

    def register_callback(self, topic, callback):
    """Register a callback for a given topic."""
    self.callbacks[topic].append(callback)

    def register_heartbeat(self, func, rate):
    self.heartbeats[func] = Heartbeatable(func, rate)

    def deregister_heartbeat(self, func):
    del self.heartbeats[func]

    def heartbeat(self):
    """Call this method at least once every 30 seconds to heartbeat the connection."""
    for hb in self.heartbeats.values():
    hb.call()

    def join(self, topic, callback):
    """Join a topic (and register a callback)"""
    self.register_callback(topic, callback)

    self.raw_send(self._generate_message(
    topic=topic,
    event='phx_join'
    ))

    def leave(self, topic):
    """Leave a topic (and deregister all callbacks)"""
    self.raw_send(self._generate_message(
    topic=topic,
    event='phx_leave'
    ))

    self.callbacks[topic] = []

    def send(self, topic, event, payload=None):
    """Send a message to a topic."""
    self.raw_send(self._generate_message(topic, event, payload))

    class WSClient(object):
    def __init__(self, url):
    self.url = url
    self.conn = None
    self.ref = 1

    def __del__(self):
    try: self.conn.close()
    except Exception: pass

    def raw_connect(self, on_open=None):
    self.conn = websocket.WebSocketApp(self.url,
    on_message=self.raw_process_message,
    on_ping=self.raw_pingpong,
    on_pong=self.raw_pingpong,
    on_open=on_open
    )

    def raw_send(self, body):
    if 'ref' not in body:
    body['ref'] = self.ref
    self.ref += 1

    bstr = json.dumps(body)
    logging.debug("SEND %r", bstr)
    self.conn.send(bstr)

    def raw_pingpong(self, *args, **kwargs):
    self.heartbeat()

    def raw_process_message(self, ws, message):
    logging.debug("GOT %r", message)
    self.heartbeat()
    self.process_message(json.loads(message))

    def raw_run_forever(self):
    self.conn.run_forever(ping_interval=MAX_HEARTBEAT_TIME)

    class WSPhoenix(PhoenixState, WSClient):
    def __init__(self, url):
    WSClient.__init__(self, url)
    PhoenixState.__init__(self)

    class EphemeralWS(object):
    def __init__(self):
    self.client = WSPhoenix("ws://ephemeralp2p.durazo.us/ws")

    def get_content(self, chash, callback):
    """Get the given hash's content from EphemeralP2P (callback)"""
    topic = 'want:' + chash

    def ask_func():
    # Ask for the content to be sent to us
    self.client.send(topic, 'content_request', {'hash': chash})

    def want_cb(message):
    if message.get('payload', {}).get('content'):
    # Deregister the ask func
    self.client.deregister_heartbeat(ask_func)

    # Call our callback
    callback(message['payload']['content'])

    # Leave the topic
    self.client.leave(topic)

    # Join the topic
    self.client.join(topic, want_cb)

    # Register the ask func to be retried at heartbeat
    self.client.register_heartbeat(ask_func, 5)

    # Heartbeat
    self.client.heartbeat()

    def serve_content(self, chash, content):
    """Register to serve content."""
    topic = 'have:' + chash
    def have_cb(message):
    if message['event'] == 'content_request':
    logging.info('serving content for %s', chash)
    self.client.send(topic, 'content', {'content': content, 'hash': chash})
    self.client.join(topic, have_cb)

    def get_and_serve_content(self, chash, callback=None):
    """Fetch, then serve a hash."""
    def got_func(content):
    logging.info('got content for hash %s (len: %d bytes)', chash, len(content))
    if callback:
    callback(content)
    self.serve_content(chash, content)
    self.get_content(chash, got_func)

    def connect(self, on_open):
    self.client.raw_connect(on_open)
    self.client.raw_run_forever()

    class CachingEphemeralServer(object):
    def __init__(self):
    self.client = EphemeralWS()
    self.cache_dir = '.ephem_cache'

    def connect(self, on_open):
    try:
    os.mkdir(self.cache_dir)
    except Exception:
    pass
    self.client.connect(on_open)

    def serve(self, chash):
    """Serve a given hash.
    If the content is available in the cache dir, get it from there.
    If it's not, fetch it from the server, cache it, and serve.
    """
    cache_path = os.path.join(self.cache_dir, chash)
    if os.path.exists(cache_path):
    with open(cache_path, 'r') as inp:
    content = inp.read()

    # Make sure the hash matches
    calc_hash = hashlib.sha256(content).hexdigest()
    if calc_hash != chash:
    raise ValueError("CACHED: Calculated hash of %s not equal to passed in hash of %s" % (calc_hash, chash))

    logging.info('serving hash %s from cache...', chash)

    # Serve!
    self.client.serve_content(chash, content)

    else:
    def callback(content):
    # Make sure the hash matches
    calc_hash = hashlib.sha256(content).hexdigest()
    if calc_hash != chash:
    raise ValueError("NET: Calculated hash of %s not equal to passed in hash of %s" % (calc_hash, chash))

    logging.info('writing hash %s to cache...', chash)

    # Write it out to the cache
    with open(cache_path, 'w') as out:
    out.write(content)

    # Fetch the content from the server (p2p network) and serve it
    self.client.get_and_serve_content(chash, callback)

    def basic_test():
    # enable logging
    logging.basicConfig()

    client = EphemeralWS()

    def on_open(*a, **k):
    # Fetch, then start serving the "test123" doc
    client.get_and_serve_content('ecd71870d1963316a97e3ac3408c9835ad8cf0f3c1bc703527c30265534f75ae')

    # Serve (without fetching) the "ABCdef" doc
    client.serve_content('057e5833fca53ae19901247bd5e68039100561b8535f346dff7d6a4dcc7bf996', 'ABCdef')

    # Fetch and serve the intro doc
    client.get_and_serve_content('2bbbf21959178ef2f935e90fc60e5b6e368d27514fe305ca7dcecc32c0134838')

    client.connect(on_open)

    # start mainloop

    def caching_test():
    # enable logging
    logging.basicConfig(format="[%(asctime)s] %(message)s")
    logging.getLogger().setLevel(logging.INFO)

    client = CachingEphemeralServer()
    HASHES = [
    'ecd71870d1963316a97e3ac3408c9835ad8cf0f3c1bc703527c30265534f75ae', # "test123"
    '057e5833fca53ae19901247bd5e68039100561b8535f346dff7d6a4dcc7bf996', # "ABCdef"
    '2bbbf21959178ef2f935e90fc60e5b6e368d27514fe305ca7dcecc32c0134838', # the intro doc for the service
    ]

    def on_open(*a, **k):
    for chash in HASHES:
    client.serve(chash)

    # Connect
    client.connect(on_open)

    if __name__ == '__main__':
    caching_test()