#!/usr/bin/python2 import collections import hashlib import json import logging import os import time import websocket MAX_HEARTBEAT_TIME = 30 class Heartbeatable(object): def __init__(self, func, rate): self.func = func self.rate = rate self.last_call = None def call(self): if self.last_call is None or time.time() - self.last_call > self.rate: self.func() self.last_call = time.time() class PhoenixState(object): """An object that tracks the state of multiple phoenix channels.""" def __init__(self): #self.messages = collections.defaultdict(lambda *args: []) # map of topics (str) to their messages (list) self.callbacks = collections.defaultdict(lambda *args: []) # map of topics (str) to their message callbacks self.heartbeats = {} self.register_heartbeat(self._phoenix_heartbeat, MAX_HEARTBEAT_TIME) def _generate_message(self, topic, event, payload=None): if payload is None: payload = {} return { 'topic': topic, 'event': event, 'payload': payload } def process_message(self, message): """Call this with a dictionary for message to process that message.""" topic = message['topic'] #self.messages[topic].append(message) for callback in self.callbacks[topic]: callback(message) def _phoenix_heartbeat(self): self.send("phoenix", "heartbeat") def register_callback(self, topic, callback): """Register a callback for a given topic.""" self.callbacks[topic].append(callback) def register_heartbeat(self, func, rate): self.heartbeats[func] = Heartbeatable(func, rate) def deregister_heartbeat(self, func): del self.heartbeats[func] def heartbeat(self): """Call this method at least once every 30 seconds to heartbeat the connection.""" for hb in self.heartbeats.values(): hb.call() def join(self, topic, callback): """Join a topic (and register a callback)""" self.register_callback(topic, callback) self.raw_send(self._generate_message( topic=topic, event='phx_join' )) def leave(self, topic): """Leave a topic (and deregister all callbacks)""" self.raw_send(self._generate_message( topic=topic, event='phx_leave' )) self.callbacks[topic] = [] def send(self, topic, event, payload=None): """Send a message to a topic.""" self.raw_send(self._generate_message(topic, event, payload)) class WSClient(object): def __init__(self, url): self.url = url self.conn = None self.ref = 1 def __del__(self): try: self.conn.close() except Exception: pass def raw_connect(self, on_open=None): self.conn = websocket.WebSocketApp(self.url, on_message=self.raw_process_message, on_ping=self.raw_pingpong, on_pong=self.raw_pingpong, on_open=on_open ) def raw_send(self, body): if 'ref' not in body: body['ref'] = self.ref self.ref += 1 bstr = json.dumps(body) logging.debug("SEND %r", bstr) self.conn.send(bstr) def raw_pingpong(self, *args, **kwargs): self.heartbeat() def raw_process_message(self, ws, message): logging.debug("GOT %r", message) self.heartbeat() self.process_message(json.loads(message)) def raw_run_forever(self): self.conn.run_forever(ping_interval=MAX_HEARTBEAT_TIME) class WSPhoenix(PhoenixState, WSClient): def __init__(self, url): WSClient.__init__(self, url) PhoenixState.__init__(self) class EphemeralWS(object): def __init__(self): self.client = WSPhoenix("ws://ephemeralp2p.durazo.us/ws") def get_content(self, chash, callback): """Get the given hash's content from EphemeralP2P (callback)""" topic = 'want:' + chash def ask_func(): # Ask for the content to be sent to us self.client.send(topic, 'content_request', {'hash': chash}) def want_cb(message): if message.get('payload', {}).get('content'): # Deregister the ask func self.client.deregister_heartbeat(ask_func) # Call our callback callback(message['payload']['content']) # Leave the topic self.client.leave(topic) # Join the topic self.client.join(topic, want_cb) # Register the ask func to be retried at heartbeat self.client.register_heartbeat(ask_func, 5) # Heartbeat self.client.heartbeat() def serve_content(self, chash, content): """Register to serve content.""" topic = 'have:' + chash def have_cb(message): if message['event'] == 'content_request': logging.info('serving content for %s', chash) self.client.send(topic, 'content', {'content': content, 'hash': chash}) self.client.join(topic, have_cb) def get_and_serve_content(self, chash, callback=None): """Fetch, then serve a hash.""" def got_func(content): logging.info('got content for hash %s (len: %d bytes)', chash, len(content)) if callback: callback(content) self.serve_content(chash, content) self.get_content(chash, got_func) def connect(self, on_open): self.client.raw_connect(on_open) self.client.raw_run_forever() class CachingEphemeralServer(object): def __init__(self): self.client = EphemeralWS() self.cache_dir = '.ephem_cache' def connect(self, on_open): try: os.mkdir(self.cache_dir) except Exception: pass self.client.connect(on_open) def serve(self, chash): """Serve a given hash. If the content is available in the cache dir, get it from there. If it's not, fetch it from the server, cache it, and serve. """ cache_path = os.path.join(self.cache_dir, chash) if os.path.exists(cache_path): with open(cache_path, 'r') as inp: content = inp.read() # Make sure the hash matches calc_hash = hashlib.sha256(content).hexdigest() if calc_hash != chash: raise ValueError("CACHED: Calculated hash of %s not equal to passed in hash of %s" % (calc_hash, chash)) logging.info('serving hash %s from cache...', chash) # Serve! self.client.serve_content(chash, content) else: def callback(content): # Make sure the hash matches calc_hash = hashlib.sha256(content).hexdigest() if calc_hash != chash: raise ValueError("NET: Calculated hash of %s not equal to passed in hash of %s" % (calc_hash, chash)) logging.info('writing hash %s to cache...', chash) # Write it out to the cache with open(cache_path, 'w') as out: out.write(content) # Fetch the content from the server (p2p network) and serve it self.client.get_and_serve_content(chash, callback) def basic_test(): # enable logging logging.basicConfig() client = EphemeralWS() def on_open(*a, **k): # Fetch, then start serving the "test123" doc client.get_and_serve_content('ecd71870d1963316a97e3ac3408c9835ad8cf0f3c1bc703527c30265534f75ae') # Serve (without fetching) the "ABCdef" doc client.serve_content('057e5833fca53ae19901247bd5e68039100561b8535f346dff7d6a4dcc7bf996', 'ABCdef') # Fetch and serve the intro doc client.get_and_serve_content('2bbbf21959178ef2f935e90fc60e5b6e368d27514fe305ca7dcecc32c0134838') client.connect(on_open) # start mainloop def caching_test(): # enable logging logging.basicConfig(format="[%(asctime)s] %(message)s") logging.getLogger().setLevel(logging.INFO) client = CachingEphemeralServer() HASHES = [ 'ecd71870d1963316a97e3ac3408c9835ad8cf0f3c1bc703527c30265534f75ae', # "test123" '057e5833fca53ae19901247bd5e68039100561b8535f346dff7d6a4dcc7bf996', # "ABCdef" '2bbbf21959178ef2f935e90fc60e5b6e368d27514fe305ca7dcecc32c0134838', # the intro doc for the service ] def on_open(*a, **k): for chash in HASHES: client.serve(chash) # Connect client.connect(on_open) if __name__ == '__main__': caching_test()