import sys import logging import tty import termios import argparse import pychromecast import threading from pychromecast.controllers.youtube import YouTubeController from urllib.parse import urlparse from urllib.parse import parse_qs class ThreadJob(threading.Thread): def __init__(self, callback, interval): self.callback = callback self.event = threading.Event() self.interval = interval self._stopped = False super(ThreadJob, self).__init__() def run(self): while not self.event.wait(self.interval): cont = self.callback() if not cont or self._stopped: break def stop(self): self._stopped = True def cast_run(videoId, host=None, name=None): cast = None if host is not None: cast = pychromecast._get_chromecast_from_host(host) elif name is not None: chromecasts = pychromecast.get_chromecasts() cast = next( cc for cc in chromecasts if cc.device.friendly_name == name) else: chromecasts = pychromecast.get_chromecasts() cast = chromecasts[0] cast.wait() yt = YouTubeController() cast.register_handler(yt) yt.play_video(videoId) def cleanup(): if cast.media_controller.status.idle_reason is not None: cast.quit_app() return False return True job = ThreadJob(cleanup, 1.0) job.start() return cast, yt, job class ReadChar(): def __enter__(self): self.fd = sys.stdin.fileno() self.old_settings = termios.tcgetattr(self.fd) tty.setraw(sys.stdin.fileno()) return sys.stdin.read(1) def __exit__(self, type, value, traceback): termios.tcsetattr(self.fd, termios.TCSADRAIN, self.old_settings) def set_seek(cast, by): if cast.media_controller.status.supports_seek is False: return cast.media_controller.update_status() position = cast.media_controller.status.current_time + by if by >= 0: if position > cast.media_controller.status.duration: position = cast.media_controller.status.duration - by else: if position < 0: position = 0 cast.media_controller.seek(position) def set_volume(cast, by): volume = cast.status.volume_level + by if by >= 0: if volume > 1.0: volume = 1.0 else: if volume < 0.0: volume = 0.0 cast.set_volume(volume) def toggle_play(cast): if cast.media_controller.status.supports_pause is False: return if cast.media_controller.status.player_state == "PLAYING": cast.media_controller.pause() else: cast.media_controller.play() def parse_video_id(value): """ https://stackoverflow.com/a/7936523 Examples: - http://youtu.be/SA2iWivDJiE - http://www.youtube.com/watch?v=_oPAwA_Udwc&feature=feedu - http://www.youtube.com/embed/SA2iWivDJiE - http://www.youtube.com/v/SA2iWivDJiE?version=3&hl=en_US """ query = urlparse(value) if query.hostname == 'youtu.be': return query.path[1:] if query.hostname in ('www.youtube.com', 'youtube.com'): if query.path == '/watch': p = parse_qs(query.query) return p['v'][0] if query.path[:7] == '/embed/': return query.path.split('/')[2] if query.path[:3] == '/v/': return query.path.split('/')[2] # fail? return None def main(args): cast, yt, job = None, None, None video_id = parse_video_id(args.url) if args.address is not None: cast, yt, job = cast_run(video_id, host=(args.address, args.port, '1-2', 'dev12', "Some TV")) elif args.name is not None: cast, yt, job = cast_run(video_id, name=args.name) else: cast, yt, job = cast_run(video_id) print('casting ...') if args.control is False: return print('use keys to control') lastOrdChars = [] while True: with ReadChar() as rc: char = rc while len(lastOrdChars) >= 3: del lastOrdChars[0] lastOrdChars += [ord(rc)] if ord(char) == 3 or char == "q": job.stop() cast.quit_app() sys.exit() elif char == "0": set_volume(cast, 0.1) elif char == "9": set_volume(cast, -0.1) elif ord(char) == 32: toggle_play(cast) elif lastOrdChars == [27, 91, 67]: # right set_seek(cast, 5.0) elif lastOrdChars == [27, 91, 68]: # left set_seek(cast, -5.0) elif lastOrdChars == [27, 91, 65]: # up set_seek(cast, 60.0) elif lastOrdChars == [27, 91, 66]: # down set_seek(cast, -60.0) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('url', help='youtube url') parser.add_argument('--control', help='control by keyboard', action='store_const', const=True, default=False) parser.add_argument('--name', help='friendly chromecast name') parser.add_argument('--address', help='ip address of chromecast') parser.add_argument('--port', help='port of chromecast', default=8009) args = parser.parse_args() main(args)