Skip to content

Instantly share code, notes, and snippets.

@aaarghhh
Created January 5, 2020 10:13
Show Gist options
  • Select an option

  • Save aaarghhh/2d2cdcd5e47d84e848b70fe4dd772cd6 to your computer and use it in GitHub Desktop.

Select an option

Save aaarghhh/2d2cdcd5e47d84e848b70fe4dd772cd6 to your computer and use it in GitHub Desktop.
obswebsocket + APSScheduler POC
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.executors.pool import ProcessPoolExecutor, ThreadPoolExecutor
from obswebsocket import obsws, requests
from pytz import utc
import os
import atexit
def createOBSWS():
host = "127.0.0.1"
port = "4444"
password = "password"
websocket = obsws(host, port, password)
return websocket
def setText(websocket=None,textid=None,scene=None,text=None):
#print("setText")
try:
system = os.name
if (websocket is None):
websocket = createOBSWS()
websocket.connect()
if (system == "Linux" or system == "posix"):
websocket.call(requests.SetTextFreetype2Properties(source=textid, scene_name=scene, text=text))
else:
websocket.call(
requests.SetTextGDIPlusProperties(textid, scene, None, None, None, None, None, None, None, None,
None, None, None, None, text, None, None, None))
time.sleep(.2)
websocket.disconnect()
except:
pass
jobstores = {
'default': MemoryJobStore()
}
executors = {
'default': {'type': 'threadpool', 'max_workers': 30},
'processpool': ProcessPoolExecutor(max_workers=2)
}
job_defaults = {
'coalesce': False,
'max_instances': 20,
'misfire_grace_time': 1
}
def setScene(websocket=None,sceneName=None):
try:
if (websocket is None):
websocket = createOBSWS()
websocket.connect()
websocket.call(requests.SetCurrentScene(sceneName))
### Added some delay before the disconnect
time.sleep(.2)
websocket.disconnect()
except:
pass
def testjob():
setScene(websocket=None, sceneName="TESTSCENE")
# print("2. GRUPPO")
setText(None, "TESTSOBJ1", "TESTSCENE", "Hello")
# print("2. COUNTER")
setText(None, "TESTSOBJ2", "TESTSCENE", "World!")
def main():
pass
if __name__ == "__main__":
print("START")
schedulerjob = BackgroundScheduler(jobstores=jobstores, job_defaults=job_defaults, timezone=utc)
schedulerjob.add_job(testjob, 'interval', seconds=10)
schedulerjob.start()
atexit.register(lambda: schedulerjob.shutdown())
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment