Created
January 5, 2020 10:13
-
-
Save aaarghhh/2d2cdcd5e47d84e848b70fe4dd772cd6 to your computer and use it in GitHub Desktop.
obswebsocket + APSScheduler POC
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from apscheduler.schedulers.background import BackgroundScheduler | |
| from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore | |
| from apscheduler.jobstores.memory import MemoryJobStore | |
| from apscheduler.executors.pool import ProcessPoolExecutor, ThreadPoolExecutor | |
| from obswebsocket import obsws, requests | |
| from pytz import utc | |
| import os | |
| import atexit | |
| def createOBSWS(): | |
| host = "127.0.0.1" | |
| port = "4444" | |
| password = "password" | |
| websocket = obsws(host, port, password) | |
| return websocket | |
| def setText(websocket=None,textid=None,scene=None,text=None): | |
| #print("setText") | |
| try: | |
| system = os.name | |
| if (websocket is None): | |
| websocket = createOBSWS() | |
| websocket.connect() | |
| if (system == "Linux" or system == "posix"): | |
| websocket.call(requests.SetTextFreetype2Properties(source=textid, scene_name=scene, text=text)) | |
| else: | |
| websocket.call( | |
| requests.SetTextGDIPlusProperties(textid, scene, None, None, None, None, None, None, None, None, | |
| None, None, None, None, text, None, None, None)) | |
| time.sleep(.2) | |
| websocket.disconnect() | |
| except: | |
| pass | |
| jobstores = { | |
| 'default': MemoryJobStore() | |
| } | |
| executors = { | |
| 'default': {'type': 'threadpool', 'max_workers': 30}, | |
| 'processpool': ProcessPoolExecutor(max_workers=2) | |
| } | |
| job_defaults = { | |
| 'coalesce': False, | |
| 'max_instances': 20, | |
| 'misfire_grace_time': 1 | |
| } | |
| def setScene(websocket=None,sceneName=None): | |
| try: | |
| if (websocket is None): | |
| websocket = createOBSWS() | |
| websocket.connect() | |
| websocket.call(requests.SetCurrentScene(sceneName)) | |
| ### Added some delay before the disconnect | |
| time.sleep(.2) | |
| websocket.disconnect() | |
| except: | |
| pass | |
| def testjob(): | |
| setScene(websocket=None, sceneName="TESTSCENE") | |
| # print("2. GRUPPO") | |
| setText(None, "TESTSOBJ1", "TESTSCENE", "Hello") | |
| # print("2. COUNTER") | |
| setText(None, "TESTSOBJ2", "TESTSCENE", "World!") | |
| def main(): | |
| pass | |
| if __name__ == "__main__": | |
| print("START") | |
| schedulerjob = BackgroundScheduler(jobstores=jobstores, job_defaults=job_defaults, timezone=utc) | |
| schedulerjob.add_job(testjob, 'interval', seconds=10) | |
| schedulerjob.start() | |
| atexit.register(lambda: schedulerjob.shutdown()) | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment