Skip to content

Instantly share code, notes, and snippets.

@aaarghhh
Created January 5, 2020 10:13
Show Gist options
  • Select an option

  • Save aaarghhh/2d2cdcd5e47d84e848b70fe4dd772cd6 to your computer and use it in GitHub Desktop.

Select an option

Save aaarghhh/2d2cdcd5e47d84e848b70fe4dd772cd6 to your computer and use it in GitHub Desktop.

Revisions

  1. aaarghhh created this gist Jan 5, 2020.
    80 changes: 80 additions & 0 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,80 @@
    from apscheduler.schedulers.background import BackgroundScheduler
    from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
    from apscheduler.jobstores.memory import MemoryJobStore
    from apscheduler.executors.pool import ProcessPoolExecutor, ThreadPoolExecutor
    from obswebsocket import obsws, requests
    from pytz import utc
    import os
    import atexit

    def createOBSWS():
    host = "127.0.0.1"
    port = "4444"
    password = "password"
    websocket = obsws(host, port, password)
    return websocket

    def setText(websocket=None,textid=None,scene=None,text=None):
    #print("setText")
    try:
    system = os.name
    if (websocket is None):
    websocket = createOBSWS()
    websocket.connect()
    if (system == "Linux" or system == "posix"):
    websocket.call(requests.SetTextFreetype2Properties(source=textid, scene_name=scene, text=text))
    else:
    websocket.call(
    requests.SetTextGDIPlusProperties(textid, scene, None, None, None, None, None, None, None, None,
    None, None, None, None, text, None, None, None))
    time.sleep(.2)
    websocket.disconnect()
    except:
    pass

    jobstores = {
    'default': MemoryJobStore()
    }

    executors = {
    'default': {'type': 'threadpool', 'max_workers': 30},
    'processpool': ProcessPoolExecutor(max_workers=2)
    }

    job_defaults = {
    'coalesce': False,
    'max_instances': 20,
    'misfire_grace_time': 1
    }


    def setScene(websocket=None,sceneName=None):
    try:
    if (websocket is None):
    websocket = createOBSWS()
    websocket.connect()
    websocket.call(requests.SetCurrentScene(sceneName))
    ### Added some delay before the disconnect
    time.sleep(.2)
    websocket.disconnect()
    except:
    pass


    def testjob():
    setScene(websocket=None, sceneName="TESTSCENE")
    # print("2. GRUPPO")
    setText(None, "TESTSOBJ1", "TESTSCENE", "Hello")
    # print("2. COUNTER")
    setText(None, "TESTSOBJ2", "TESTSCENE", "World!")

    def main():
    pass

    if __name__ == "__main__":
    print("START")
    schedulerjob = BackgroundScheduler(jobstores=jobstores, job_defaults=job_defaults, timezone=utc)
    schedulerjob.add_job(testjob, 'interval', seconds=10)
    schedulerjob.start()
    atexit.register(lambda: schedulerjob.shutdown())
    main()