Skip to content

Instantly share code, notes, and snippets.

@Omar-AE
Forked from amirasaran/BaseThreading
Created January 21, 2021 14:00
Show Gist options
  • Select an option

  • Save Omar-AE/018065049891a98257c61a3f68e6c7ce to your computer and use it in GitHub Desktop.

Select an option

Save Omar-AE/018065049891a98257c61a3f68e6c7ce to your computer and use it in GitHub Desktop.

Revisions

  1. @amirasaran amirasaran created this gist Oct 27, 2016.
    40 changes: 40 additions & 0 deletions BaseThreading
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,40 @@
    import time
    import threading


    class BaseThread(threading.Thread):
    def __init__(self, callback=None, callback_args=None, *args, **kwargs):
    target = kwargs.pop('target')
    super(BaseThread, self).__init__(target=self.target_with_callback, *args, **kwargs)
    self.callback = callback
    self.method = target
    self.callback_args = callback_args

    def target_with_callback(self):
    self.method()
    if self.callback is not None:
    self.callback(*self.callback_args)


    def my_thread_job():
    # do any things here
    print "thread start successfully and sleep for 5 seconds"
    time.sleep(5)
    print "thread ended successfully!"


    def cb(param1, param2):
    # this is run after your thread end
    print "callback function called"
    print "{} {}".format(param1, param2)


    # example using BaseThread with callback
    thread = BaseThread(
    name='test',
    target=my_thread_job,
    callback=cb,
    callback_args=("hello", "world")
    )

    thread.start()