from PyQt5.QtWidgets import QApplication, QMainWindow, QLabel, QVBoxLayout, QPushButton, QWidget from PyQt5.QtCore import QObject, QRunnable, QThreadPool,pyqtSlot, pyqtSignal import time import traceback, sys class WorkerSignals(QObject): ''' Defines the signals available from a running worker thread. Supported signals are: - finished: No data - error:`tuple` (exctype, value, traceback.format_exc() ) - result: `object` data returned from processing, anything - progress: `tuple` indicating progress metadata ''' finished = pyqtSignal() error = pyqtSignal(tuple) result = pyqtSignal(object) progress = pyqtSignal(tuple) class Worker(QRunnable): ''' Worker thread Inherits from QRunnable to handler worker thread setup, signals and wrap-up. ''' def __init__(self, fn, *args, **kwargs): super(Worker, self).__init__() # Store constructor arguments (re-used for processing) self.fn = fn self.args = args self.kwargs = kwargs self.signals = WorkerSignals() # Add the callback to our kwargs self.kwargs['progress_callback'] = self.signals.progress @pyqtSlot() def run(self): ''' Initialise the runner function with passed args, kwargs. ''' # Retrieve args/kwargs here; and fire processing using them try: result = self.fn(*self.args, **self.kwargs) except: traceback.print_exc() exctype, value = sys.exc_info()[:2] self.signals.error.emit((exctype, value, traceback.format_exc())) else: self.signals.result.emit(result) # Return the result of the processing finally: self.signals.finished.emit() # Done class MainWindow(QMainWindow): def __init__(self, *args, **kwargs): super(MainWindow, self).__init__(*args, **kwargs) self.counter = 0 layout = QVBoxLayout() self.l = QLabel("Start") b = QPushButton("DANGER!") b.pressed.connect(self.oh_no) layout.addWidget(self.l) layout.addWidget(b) w = QWidget() w.setLayout(layout) self.setCentralWidget(w) self.show() self.threadpool = QThreadPool() print("Multithreading with maximum %d threads" % self.threadpool.maxThreadCount()) def progress_fn(self, progress): p, m = (progress) print("%d%% done %s" % (p, m)) def execute_this_fn(self, progress_callback): for n in range(0, 5): time.sleep(1) progress_callback.emit((n*100/4, 'blabla')) return "Done." def print_output(self, s): print(s) def thread_complete(self): print("THREAD COMPLETE!") def oh_no(self): # Pass the function to execute worker = Worker(self.execute_this_fn) # Any other args, kwargs are passed to the run function worker.signals.result.connect(self.print_output) worker.signals.finished.connect(self.thread_complete) worker.signals.progress.connect(self.progress_fn) # Execute self.threadpool.start(worker) app = QApplication([]) window = MainWindow() app.exec_()