Skip to content

Instantly share code, notes, and snippets.

@RicterZ
Last active January 14, 2023 09:40
Show Gist options
  • Select an option

  • Save RicterZ/ed3517b944a04e18c91aacfe3d9aff24 to your computer and use it in GitHub Desktop.

Select an option

Save RicterZ/ed3517b944a04e18c91aacfe3d9aff24 to your computer and use it in GitHub Desktop.

Revisions

  1. RicterZ revised this gist Jan 14, 2023. 1 changed file with 32 additions and 12 deletions.
    44 changes: 32 additions & 12 deletions bilibili-tv.py
    Original file line number Diff line number Diff line change
    @@ -1,5 +1,5 @@
    # Bilibili TV Controller
    #
    #
    # Requirements
    # - qrencode_ascii
    # - zio
    @@ -123,6 +123,14 @@ def bv_decode(code):
    return (r - add) ^ xor


    def restore():
    pass


    def save(obj):
    pass


    class BilibiliTV():

    io = None
    @@ -262,6 +270,7 @@ def dispatch(self):

    if command == 'Command.OnProgress':
    self.position = arguments[0]['position']
    self.playing = True

    elif command == 'Command.SpeedChanged':
    self.speed = arguments[0]['currSpeed']
    @@ -286,6 +295,7 @@ def dispatch(self):
    elif state == 4:
    logger.debug('Video playing resumed')
    elif state == 7:
    self.playing = False
    logger.debug('Video playing stopped')

    elif command == 'Command.OnQnSwitch':
    @@ -295,7 +305,7 @@ def dispatch(self):

    elif command == 'Command.OnEpisodeSwitch':
    self.info = arguments[0]
    logger.debug('Playing video {} (av{})'.format(self.info['title'],
    logger.debug('Playing video {} (av{})'.format(self.info['title'],
    self.info['playItem']['aid']))
    elif command == 'Command.Error':
    logger.warning('Error: {}'.format(arguments[0]['errorCode']))
    @@ -323,7 +333,7 @@ def discover(self):
    resp = requests.get(link).text

    # bilibili
    if 'ZVT9255BGA' in resp:
    if 'Bilibili Inc.' in resp:
    name = re.findall('<friendlyName>(.*)</friendlyName>', resp)
    name = 'Unknown' if not name else name[0]

    @@ -346,10 +356,6 @@ def setup(self, addr=None):
    return

    self.target = addr
    if self.settle:
    logger.info('Already settle')
    return

    logger.info('Initialize connection ...')
    self.io = zio((self.target), print_write=False, print_read=False)
    self.io.write(SETUP_MSG)
    @@ -425,7 +431,7 @@ def play(self, aid=0, biz_id=0, room_id=0, epid=0, season_id=0, cid=0):
    count = 0
    while True:
    if self.info is not None:
    logger.info('Playing video: {}(av{})'.format(self.info['title'],
    logger.info('Playing video: {}(av{})'.format(self.info['title'],
    self.info['playItem']['aid']))

    text = ''
    @@ -490,7 +496,7 @@ def set_speed(self, speed):
    def get_speed(self):
    return self.speed

    @check_settle()
    @check_settle(playing=False)
    def toggle_danmaku(self):
    self.danmaku = not self.danmaku
    logger.info('Toggle danmaku to {} ...'.format(self.danmaku))
    @@ -523,7 +529,7 @@ def parse_command(self, command):
    self.dispatcher_thread.not_stop = False
    raise SystemExit

    elif command in ('pause', 'resume', 'result', 'danmaku', 'stop', 'discover', 'login'):
    elif command in ('pause', 'resume', 'result', 'danmaku', 'stop', 'discover'):
    if command == 'danmaku':
    command = 'toggle_danmaku'

    @@ -560,6 +566,12 @@ def parse_command(self, command):
    elif command == 'debug':
    logger.setLevel(logging.DEBUG)

    elif command == 'login':
    if len(args) > 0:
    self.login(args[0])
    else:
    self.login()

    def interactive(self):
    while True:
    try:
    @@ -570,7 +582,11 @@ def interactive(self):

    self.parse_command(command)

    def login(self):
    def login(self, access_key=None):

    if access_key is not None:
    self.access_key = access_key
    return

    def get_sign(data):
    s = ''
    @@ -608,17 +624,21 @@ def get_sign(data):
    if ret['code'] == 0:
    self.access_key = ret['data']['access_token']
    logger.info('Login successfully with user {}'.format(ret['data']['mid']))
    logger.info('Access key: {}'.format(self.access_key))
    break

    time.sleep(1)



    if __name__ == '__main__':
    logger.setLevel(logging.INFO)
    handler = logging.StreamHandler()
    handler.setFormatter(ColorFormatter())
    logger.addHandler(handler)

    b = BilibiliTV()
    b.setup(('192.168.1.3', 9958))
    # b.discover()
    b.setup(('192.168.31.198', 9958))
    b.login(access_key='xxxxxxxx')
    b.interactive()
  2. RicterZ created this gist Dec 17, 2022.
    624 changes: 624 additions & 0 deletions bilibili-tv.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,624 @@
    # Bilibili TV Controller
    #
    # Requirements
    # - qrencode_ascii
    # - zio

    import threading
    import json
    import time
    import logging
    import sys
    import requests
    import re
    import socket
    import struct
    import random
    import os
    import hashlib
    import qrencode_ascii

    from urllib.parse import urlparse
    from functools import wraps
    from enum import Enum
    from zio import zio, b8, b32


    logger = logging.getLogger()

    SETUP_MSG = '''SETUP /projection NVA/1.0\r
    Session: deadbeef-1337-1337-1337-deadbeef1337\r
    Connection: Keep-Alive\r
    \r
    '''

    class ColorFormatter(logging.Formatter):
    grey = "\x1b[38;20m"
    blue = "\x1b[34;20m"
    yellow = "\x1b[33;20m"
    red = "\x1b[31;20m"
    bold_red = "\x1b[31;1m"
    reset = "\x1b[0m"
    format = "[%(levelname)s]: %(message)s"

    FORMATS = {
    logging.DEBUG: grey + format + reset,
    logging.INFO: blue + format + reset,
    logging.WARNING: yellow + format + reset,
    logging.ERROR: red + format + reset,
    logging.CRITICAL: bold_red + format + reset
    }

    def format(self, record):
    log_fmt = self.FORMATS.get(record.levelno)
    formatter = logging.Formatter(log_fmt)
    return formatter.format(record)


    class Flag(Enum):
    # ping command from TV every 1 second
    PING = b'\xe4'

    # flag of commands sent from client
    COMMAND = b'\xe0'

    # flag of commands sent from TV
    RESPONSE = b'\xc0'


    def check_settle(playing=True):
    def _func(func):
    def _wrapper(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
    if not args[0].settle:
    logger.info('TV not settle, please run setup first')
    return

    if playing and not args[0].playing:
    logger.info('Video not playing, cannot change states')
    return

    return func(*args, **kwargs)
    return wrapper
    return _wrapper(func)

    return _func


    def get_epid_and_ssid(aid):
    url = 'https://www.bilibili.com/video/av{}/'.format(aid)
    headers = requests.head(url, allow_redirects=False).headers

    if not 'location' in headers:
    return 0, 0

    epid = headers['location'].split('/')[-1]
    if not epid.startswith('ep'):
    return 0, 0

    resp = requests.get(headers['location']).text
    ssid = re.findall('https://www.bilibili.com/bangumi/play/ss(\d+)/', resp)
    if ssid:
    return epid[2:], ssid[0]

    return 0, 0


    def bv_decode(code):
    table = 'fZodR9XQDSUm21yCkr6zBqiveYah8bt4xsWpHnJE7jL5VG3guMTKNPAwcF'

    tr = {}
    for i in range(58):
    tr[table[i]] = i

    s = [11, 10, 3, 8, 4, 6]
    xor = 177451812
    add = 8728348608

    r = 0
    for i in range(6):
    r += tr[code[s[i]]] * 58 ** i

    return (r - add) ^ xor


    class BilibiliTV():

    io = None
    target = None
    discovered = []
    danmaku = False

    # is video playing
    playing = False

    # video position
    position = 0

    # video info
    info = None

    # video speed
    speed = 1
    supported_speeds = None

    # video quality
    quality = None
    supported_qualities = None

    # video volume
    volume = 0

    # setup
    settle = False

    dispatcher_thread = None

    command_response = None

    access_key = None

    def __init__(self, addr=None):
    self.target = addr
    self.command_response = {}

    def ping(self):
    msg = Flag.RESPONSE.value
    msg += b'\x00'
    msg += b32(0x1337)

    self.io.write(msg)

    def make_command(self, command, args=None):
    # message flag
    while True:
    serial = random.randint(0, 0xffffffff)
    if serial not in self.command_response:
    break

    logger.debug('Make command {} with serial {} ...'.format(command, serial))
    msg = Flag.COMMAND.value

    # message parts count
    if args is None:
    args = ()
    else:
    # for now, only 1 argument been used
    args = [json.dumps(args)]

    msg += b8(len(args) + 2)

    # message serial number
    msg += b32(serial)
    msg += b'\x01'

    # command string
    msg += b'\x07Command'
    msg += b8(len(command))
    msg += command.encode()

    # arguments
    if not isinstance(args, (list, tuple)):
    args = (args, )

    for arg in args:
    if not isinstance(arg, bytes):
    arg = str(arg).encode()

    msg += b32(len(arg))
    msg += arg

    return serial, msg

    def dispatch(self):
    while getattr(self.dispatcher_thread, 'not_stop', True):
    flag = self.io.read(1)

    if flag not in Flag._value2member_map_:
    logger.error('Unknown message flag: {}'.format(repr(flag)))
    raise Exception('Unknown message flag')

    parts = int.from_bytes(self.io.read(1), 'big')
    serial = int.from_bytes(self.io.read(4), 'big')
    logger.debug('Get {} message with serial {}'.format(Flag._value2member_map_[flag].name, serial))

    if flag == Flag.PING.value:
    self.settle = True

    if parts == 0:
    self.command_response[serial] = None
    continue

    # one more byte in commands from client
    if flag == Flag.COMMAND.value:
    _ = self.io.read(1)

    result = []
    for i in range(parts):
    # skip Command XXXX which length is only 1 byte
    c = 1 if i <= 1 and flag != Flag.RESPONSE.value else 4
    length = int.from_bytes(self.io.read(c), 'big')
    data = self.io.read(length)
    result.append(data)

    self.command_response[serial] = result
    if len(result) == 1:
    ret = json.loads(result[0])
    if 'volume' in ret:
    self.volume = ret['volume']
    else:
    logger.info(ret)
    continue

    command = '{}.{}'.format(result[0].decode(), result[1].decode())
    logger.debug('Received {}'.format(command))

    arguments = []
    if len(result) >= 2:
    for arg in result[2:]:
    arguments.append(json.loads(arg))
    logger.debug('argument: {}'.format(arg.decode()))

    if command == 'Command.OnProgress':
    self.position = arguments[0]['position']

    elif command == 'Command.SpeedChanged':
    self.speed = arguments[0]['currSpeed']
    self.supported_speeds = arguments[0]['supportSpeedList']
    logger.debug('Current speed: {}, supported speeds: {}'.format(
    arguments[0]['currSpeed'],
    ' / '.join(map(str, arguments[0]['supportSpeedList'])))
    )

    elif command == 'Command.OnDanmakuSwitch':
    self.danmaku = arguments[0]['open']
    logger.debug('Danmaku switch is {}'.format(arguments[0]['open']))

    elif command == 'Command.PLAY_SUCCESS':
    self.playing = True
    logger.debug('Start playing video command successfully')

    elif command == 'Command.OnPlayState':
    state = arguments[0]['playState']
    if state == 5:
    logger.debug('Video playing paused')
    elif state == 4:
    logger.debug('Video playing resumed')
    elif state == 7:
    logger.debug('Video playing stopped')

    elif command == 'Command.OnQnSwitch':
    text = []
    self.quality = arguments[0]['curQn']
    self.supported_qualities = arguments[0]['supportQnList']

    elif command == 'Command.OnEpisodeSwitch':
    self.info = arguments[0]
    logger.debug('Playing video {} (av{})'.format(self.info['title'],
    self.info['playItem']['aid']))
    elif command == 'Command.Error':
    logger.warning('Error: {}'.format(arguments[0]['errorCode']))

    else:
    logger.debug('Unimplemented command {}'.format(command))

    def discover(self):
    logger.info('Starting discovering Bilibili TV ...')
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(('239.255.255.250', 1900))
    mreq = struct.pack("4sl", socket.inet_aton('239.255.255.250'), socket.INADDR_ANY)

    sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)

    count = 0
    while True:
    data = sock.recv(1024).decode()
    for line in data.split('\n'):
    line = line.strip()
    if line.lower().startswith('location'):
    link = line.split(':', 1)[1].strip()

    resp = requests.get(link).text

    # bilibili
    if 'ZVT9255BGA' in resp:
    name = re.findall('<friendlyName>(.*)</friendlyName>', resp)
    name = 'Unknown' if not name else name[0]

    udn = re.findall('<UDN>uuid:(.*)</UDN>', resp)[0]
    if udn not in self.discovered:
    self.discovered.append(udn)
    addr = (urlparse(link).hostname, urlparse(link).port)
    logger.info('Found Bilibili TV: {}:{} ({})'.format(*addr, name))
    break

    time.sleep(1)
    count += 1
    if count == 5:
    logger.info('Discover finished')
    break

    def setup(self, addr=None):
    if self.target is None and addr is None:
    logger.error('Target address not be set')
    return

    self.target = addr
    if self.settle:
    logger.info('Already settle')
    return

    logger.info('Initialize connection ...')
    self.io = zio((self.target), print_write=False, print_read=False)
    self.io.write(SETUP_MSG)
    self.io.read_until('\r\n\r\n')
    self.dispatcher_thread = threading.Thread(target=self.dispatch)
    self.dispatcher_thread.start()

    c = 0
    while not self.settle:
    self.ping()
    if c >= 3:
    logger.error('setup failed for no response')
    return

    c += 1
    time.sleep(1)

    self.get_volume()
    logger.info('Setup TV connection successfully')

    @check_settle(playing=False)
    def set_volume(self, volume):
    logger.info('Set the volume to {}'.format(volume))
    if not volume.isdigit():
    return

    self.volume = int(volume)
    _, msg = self.make_command('SetVolume', {'volume': self.volume})
    self.io.write(msg)

    @check_settle(playing=False)
    def get_volume(self):
    serial, msg = self.make_command('GetVolume')
    self.io.write(msg)

    while serial in self.command_response:
    return self.volume

    @check_settle(playing=False)
    def play(self, aid=0, biz_id=0, room_id=0, epid=0, season_id=0, cid=0):
    self.speed = 1
    self.quality = None
    self.supported_speeds = None
    self.supported_qualities = None
    self.info = None

    if epid == 0 and season_id == 0:
    epid, season_id = get_epid_and_ssid(aid)

    content_type = 0 if not epid else 1
    data = {
    'biz_id': biz_id,
    'roomId': room_id,
    'oid': aid,
    'aid': aid,
    'epId': epid,
    'cid': cid,
    'seasonId': season_id,
    'type': 0,
    'contentType': content_type,
    'autoNext': 'true',
    'userDesireQn': '112',
    'accessKey': self.access_key
    }

    if self.access_key is None:
    logger.warning('access_key is not be set, cannot play high quality videos, try login')

    logger.info('Sending play video command ...')
    serial, msg = self.make_command('Play', data)
    self.io.write(msg)

    count = 0
    while True:
    if self.info is not None:
    logger.info('Playing video: {}(av{})'.format(self.info['title'],
    self.info['playItem']['aid']))

    text = ''
    current = ''
    for k in self.info['qnDesc']['supportQnList']:
    if self.info['qnDesc']['curQn'] == k['quality']:
    current = k['description']
    text += '{}({}) / '.format(k['description'], k['quality'])

    logger.info('Current quality: {}'.format(current))
    logger.info('Supported quality: {}'.format(text[:-2]))
    break
    if count == 3:
    logger.error('Play video failed, please check the video ID')
    break
    time.sleep(1)
    count += 1

    @check_settle()
    def stop(self):
    logger.info('Stop playing video ...')
    self.playing = False
    _, msg = self.make_command('Stop')
    self.io.write(msg)
    self.dispatcher_thread.not_stop = False

    @check_settle()
    def pause(self):
    logger.info('Pause video playing ...')
    _, msg = self.make_command('Pause')
    self.io.write(msg)

    @check_settle()
    def resume(self):
    logger.info('Resume video playing ...')
    _, msg = self.make_command('Resume')
    self.io.write(msg)

    @check_settle()
    def seek(self, t):
    logger.info('Seek to {} sec ...'.format(t))
    _, msg = self.make_command('Seek', {'seekTs': t})
    self.io.write(msg)

    def set_seek(self, position):
    self.seek(position)

    def get_seek(self):
    return self.position

    @check_settle()
    def set_speed(self, speed):
    logger.info('Set speed to {} ...'.format(speed))
    if float(speed) not in self.supported_speeds:
    logger.warning('Unsupported speed specified')
    return

    _, msg = self.make_command('SwitchSpeed', {'speed': speed})
    self.io.write(msg)

    @check_settle()
    def get_speed(self):
    return self.speed

    @check_settle()
    def toggle_danmaku(self):
    self.danmaku = not self.danmaku
    logger.info('Toggle danmaku to {} ...'.format(self.danmaku))
    _, msg = self.make_command('SwitchDanmaku', {'open': self.danmaku})
    self.io.write(msg)

    @check_settle()
    def set_quality(self, quality):
    if not self.playing:
    return

    if int(quality) not in map(lambda k: k['quality'], self.supported_qualities):
    logger.warning('Unsupported quality specified')
    return

    logger.info('Set quality to {} ...'.format(quality))
    _, msg = self.make_command('SwitchQn', {'qn': quality})
    self.io.write(msg)

    @check_settle()
    def get_quality(self):
    return self.quality

    def parse_command(self, command):
    args = command[1:]
    command = command[0].lower()

    if command in ('quit', 'exit'):
    if self.dispatcher_thread:
    self.dispatcher_thread.not_stop = False
    raise SystemExit

    elif command in ('pause', 'resume', 'result', 'danmaku', 'stop', 'discover', 'login'):
    if command == 'danmaku':
    command = 'toggle_danmaku'

    self.__getattribute__(command)()

    elif command in ('quality', 'speed', 'volume', 'seek'):
    if len(args) == 0:
    ret = self.__getattribute__('get_{}'.format(command))()
    if ret:
    print(ret)
    else:
    self.__getattribute__('set_{}'.format(command))(*args)

    elif command == 'play':
    if not args:
    print('Usage: play [avXXXXXXX]')
    return

    if args[0].lower().startswith('bv'):
    av = bv_decode(args[0])
    elif args[0].startswith('av'):
    av = args[0][2:]
    else:
    av = args[0]

    self.play(aid=av)
    elif command == 'setup':
    if len(args) < 2:
    print('Usage: setup [host] [port]')
    return

    self.setup((args[0], int(args[1])))

    elif command == 'debug':
    logger.setLevel(logging.DEBUG)

    def interactive(self):
    while True:
    try:
    command = input('📺 >>> ').split(' ')
    except (EOFError, KeyboardInterrupt):
    self.dispatcher_thread.not_stop = False
    break

    self.parse_command(command)

    def login(self):

    def get_sign(data):
    s = ''
    for k, v in data.items():
    s += '{}={}&'.format(k, v)
    s = s[:-1]
    s += '59b43e04ad6965f34319062b478f83dd'
    return hashlib.md5(s.encode()).hexdigest()

    base = 'https://passport.bilibili.com/x/passport-tv-login/qrcode/'
    url = base + 'auth_code'
    data = {
    'appkey': '4409e2ce8ffd12b8',
    'local_id': 0,
    'ts': 0,
    }
    data['sign'] = get_sign(data)
    ret = requests.post(url, data=data).json()

    qrcode = qrencode_ascii.encode(ret['data']['url'])
    logger.info('Please scan qrcode and scan via Bilibili app')
    print(qrcode)

    url = base + '/poll'
    data = {
    'appkey': '4409e2ce8ffd12b8',
    'auth_code': ret['data']['auth_code'],
    'local_id': 0,
    'ts': 0,
    }
    data['sign'] = get_sign(data)

    while True:
    ret = requests.post(url, data=data).json()
    if ret['code'] == 0:
    self.access_key = ret['data']['access_token']
    logger.info('Login successfully with user {}'.format(ret['data']['mid']))
    break

    time.sleep(1)


    if __name__ == '__main__':
    logger.setLevel(logging.INFO)
    handler = logging.StreamHandler()
    handler.setFormatter(ColorFormatter())
    logger.addHandler(handler)

    b = BilibiliTV()
    b.setup(('192.168.1.3', 9958))
    b.interactive()