{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Python 并发与并行教程 (`threading` & `multiprocessing`)\n", "\n", "欢迎来到 Python 并发与并行教程!本教程将探讨如何在 Python 中利用多线程 (`threading`) 和多进程 (`multiprocessing`) 来提高程序的效率和响应性,特别是在处理计算密集型和 I/O 密集型任务时。\n", "\n", "**核心概念:**\n", "\n", "* **并发 (Concurrency)**:指系统能够同时处理多个任务的能力。这些任务可能不是真正同时执行,而是在宏观上看起来是同时的,通过快速切换CPU时间片来实现。**一个处理器核心就可以实现并发。**\n", " * 例如:一个Web服务器同时处理多个客户端请求,即使它只有一个CPU核心,它也可以通过快速切换服务不同的请求来实现并发。\n", "* **并行 (Parallelism)**:指系统能够真正同时执行多个任务的能力。这通常需要多个处理器核心。**并行一定是并发的,但并发不一定是并行的。**\n", " * 例如:一个视频编码程序将视频分成多个块,并在多个CPU核心上同时处理这些块。\n", "\n", "* **进程 (Process)**:操作系统分配资源(如内存空间)的基本单位。每个进程都有自己独立的内存空间,进程间通信(IPC)相对复杂且开销较大。\n", "* **线程 (Thread)**:CPU调度的基本单位,是进程内的一个执行流。一个进程可以包含多个线程,它们共享进程的内存空间,线程间通信相对简单且开销较小。\n", "\n", "**Python 中的挑战:全局解释器锁 (GIL - Global Interpreter Lock)**\n", "\n", "CPython(标准的Python解释器)有一个叫做全局解释器锁(GIL)的机制。GIL确保在任何时刻只有一个线程在执行Python字节码。这意味着即使在多核CPU上,Python的多线程程序在CPU密集型任务上通常也无法实现真正的并行,因为只有一个线程能持有GIL并执行Python代码。\n", "\n", "* 对于 **I/O 密集型任务** (大部分时间在等待外部操作,如网络、磁盘读写),`threading` 仍然非常有效,因为当一个线程等待I/O时,GIL会被释放,允许其他线程运行。\n", "* 对于 **CPU 密集型任务** (大部分时间在进行计算),`threading` 带来的性能提升有限(甚至可能因为线程切换开销而降低性能)。在这种情况下,`multiprocessing` 是更好的选择,因为它创建多个进程,每个进程有自己的Python解释器和GIL,从而可以真正利用多核CPU。\n", "\n", "**本教程将涵盖:**\n", "\n", "1. **线程 (`threading` 模块)**\n", "2. **线程同步 (锁、信号量等)**\n", "3. **进程 (`multiprocessing` 模块)**\n", "4. **进程池与线程池 (`concurrent.futures` 模块)**\n", "5. **何时选择线程 vs 进程**" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 1. 线程 (`threading` 模块)\n", "\n", "`threading` 模块允许你创建和管理线程。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import threading\n", "import time\n", "import os # 用于获取进程ID\n", "\n", "def io_bound_task(task_name, duration):\n", " \"\"\"模拟一个I/O密集型任务\"\"\"\n", " print(f\"[{time.strftime('%X')}] PID: {os.getpid()}, Thread: {threading.get_ident()} - Task '{task_name}' starting, will 'wait' for {duration}s.\")\n", " time.sleep(duration) # 模拟I/O等待,此时GIL会被释放\n", " print(f\"[{time.strftime('%X')}] PID: {os.getpid()}, Thread: {threading.get_ident()} - Task '{task_name}' finished.\")\n", "\n", "def cpu_bound_task(task_name, count_to):\n", " \"\"\"模拟一个CPU密集型任务\"\"\"\n", " print(f\"[{time.strftime('%X')}] PID: {os.getpid()}, Thread: {threading.get_ident()} - Task '{task_name}' starting, will count to {count_to:,}.\")\n", " _ = sum(i*i for i in range(count_to)) # 纯计算\n", " print(f\"[{time.strftime('%X')}] PID: {os.getpid()}, Thread: {threading.get_ident()} - Task '{task_name}' finished.\")\n", "\n", "print(\"--- Running I/O bound tasks with threading ---\")\n", "start_time = time.time()\n", "\n", "thread1 = threading.Thread(target=io_bound_task, args=(\"IO Task 1\", 2))\n", "thread2 = threading.Thread(target=io_bound_task, args=(\"IO Task 2\", 2))\n", "\n", "thread1.start() # 启动线程\n", "thread2.start()\n", "\n", "print(f\"[{time.strftime('%X')}] Main thread: Waiting for IO tasks to complete...\")\n", "thread1.join() # 等待线程1完成\n", "thread2.join() # 等待线程2完成\n", "\n", "end_time = time.time()\n", "print(f\"I/O bound tasks (threading) took: {end_time - start_time:.2f} seconds\\n\")\n", "# 预期时间接近 2 秒,因为它们可以并发执行I/O等待\n", "\n", "print(\"--- Running CPU bound tasks with threading ---\")\n", "start_time = time.time()\n", "count = 30_000_000 # 一个较大的计数值\n", "\n", "thread_cpu1 = threading.Thread(target=cpu_bound_task, args=(\"CPU Task 1 (Thread)\", count))\n", "thread_cpu2 = threading.Thread(target=cpu_bound_task, args=(\"CPU Task 2 (Thread)\", count))\n", "\n", "thread_cpu1.start()\n", "thread_cpu2.start()\n", "\n", "print(f\"[{time.strftime('%X')}] Main thread: Waiting for CPU tasks to complete...\")\n", "thread_cpu1.join()\n", "thread_cpu2.join()\n", "\n", "end_time = time.time()\n", "print(f\"CPU bound tasks (threading) took: {end_time - start_time:.2f} seconds\\n\")\n", "# 预期时间接近 2 * (单个任务时间),因为GIL限制了并行计算" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "从输出中可以看到:\n", "* I/O 密集型任务使用线程时,总时间约等于最长任务的时间,表明它们确实并发了(一个等待时另一个在运行)。\n", "* CPU 密集型任务使用线程时,总时间约等于所有任务时间之和,表明 GIL 阻止了真正的并行计算。\n", "* 所有线程都运行在同一个进程ID (`PID`)下,但有不同的线程ID (`threading.get_ident()`)。" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 2. 线程同步\n", "\n", "由于线程共享内存,当多个线程访问和修改共享数据时,可能会发生竞态条件 (Race Conditions),导致数据不一致或其他意外行为。线程同步原语用于协调线程对共享资源的访问。\n", "\n", "常用同步原语:\n", "* **`Lock` (互斥锁)**:最基本的锁。一次只允许一个线程持有锁。如果一个线程尝试获取已被持有的锁,它会阻塞直到锁被释放。\n", "* **`RLock` (可重入锁)**:允许同一个线程多次获取锁而不会死锁。线程需要释放锁的次数与获取次数相同。\n", "* **`Semaphore` (信号量)**:维护一个计数器。`acquire()` 使计数器减一,`release()` 使计数器加一。如果计数器为零,`acquire()` 会阻塞。可以用来限制同时访问某个资源的线程数量。\n", "* **`Event`**:一个简单的线程通信机制。一个线程可以等待某个事件发生(通过 `wait()`),而另一个线程可以设置该事件(通过 `set()`)。\n", "* **`Condition` (条件变量)**:比 `Event` 更高级,允许线程等待某个复杂条件变为真。通常与一个 `Lock` 关联使用。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "shared_counter = 0\n", "counter_lock = threading.Lock()\n", "num_increments = 1_000_000\n", "\n", "def increment_counter(with_lock: bool):\n", " global shared_counter\n", " for _ in range(num_increments):\n", " if with_lock:\n", " with counter_lock: # 上下文管理器自动获取和释放锁\n", " shared_counter += 1\n", " else:\n", " shared_counter += 1 # 没有锁保护,可能导致竞态条件\n", "\n", "print(\"--- Testing counter without lock ---\")\n", "shared_counter = 0\n", "thread_nolock1 = threading.Thread(target=increment_counter, args=(False,))\n", "thread_nolock2 = threading.Thread(target=increment_counter, args=(False,))\n", "thread_nolock1.start()\n", "thread_nolock2.start()\n", "thread_nolock1.join()\n", "thread_nolock2.join()\n", "print(f\"Counter value (without lock): {shared_counter}, Expected: {2 * num_increments}\") \n", "# 结果很可能小于预期值,因为 `shared_counter += 1` 不是原子操作\n", "\n", "print(\"\\n--- Testing counter with lock ---\")\n", "shared_counter = 0\n", "thread_lock1 = threading.Thread(target=increment_counter, args=(True,))\n", "thread_lock2 = threading.Thread(target=increment_counter, args=(True,))\n", "thread_lock1.start()\n", "thread_lock2.start()\n", "thread_lock1.join()\n", "thread_lock2.join()\n", "print(f\"Counter value (with lock): {shared_counter}, Expected: {2 * num_increments}\")\n", "# 结果应该是正确的,因为锁保护了临界区" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "操作 `shared_counter += 1` 实际上包含三个步骤:\n", "1. 读取 `shared_counter` 的当前值。\n", "2. 计算新值 (当前值 + 1)。\n", "3. 将新值写回 `shared_counter`。\n", "如果没有锁,一个线程可能在另一个线程完成这三个步骤之前就插入执行,导致更新丢失。" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 3. 进程 (`multiprocessing` 模块)\n", "\n", "`multiprocessing` 模块通过创建新进程来绕过 GIL,从而实现 CPU 密集型任务的真正并行。\n", "它的 API 与 `threading` 模块非常相似。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import multiprocessing\n", "\n", "# 注意:在Jupyter Notebook或某些环境中,multiprocessing 的代码,\n", "# 特别是定义在顶层并被子进程引用的函数,有时需要放在 if __name__ == '__main__': 块中,\n", "# 或者将函数定义在一个可以被导入的 .py 文件中,以避免序列化和子进程创建问题。\n", "# 对于简单的函数,直接在单元格定义通常也可以,但复杂的场景可能需要注意。\n", "\n", "def cpu_bound_task_mp(task_name, count_to):\n", " \"\"\"模拟一个CPU密集型任务 (给 multiprocessing 使用)\"\"\"\n", " print(f\"[{time.strftime('%X')}] PID: {os.getpid()}, Process Name: {multiprocessing.current_process().name} - Task '{task_name}' starting, will count to {count_to:,}.\")\n", " _ = sum(i*i for i in range(count_to)) # 纯计算\n", " print(f\"[{time.strftime('%X')}] PID: {os.getpid()}, Process Name: {multiprocessing.current_process().name} - Task '{task_name}' finished.\")\n", "\n", "if __name__ == '__main__': # 这一行在脚本中是必要的,在Jupyter中通常不是,但加上无害\n", " print(\"\\n--- Running CPU bound tasks with multiprocessing ---\")\n", " start_time_mp = time.time()\n", " count_mp = 30_000_000\n", "\n", " process1 = multiprocessing.Process(target=cpu_bound_task_mp, args=(\"CPU Task 1 (Process)\", count_mp))\n", " process2 = multiprocessing.Process(target=cpu_bound_task_mp, args=(\"CPU Task 2 (Process)\", count_mp))\n", "\n", " process1.start()\n", " process2.start()\n", "\n", " print(f\"[{time.strftime('%X')}] Main process: Waiting for CPU tasks (processes) to complete...\")\n", " process1.join()\n", " process2.join()\n", "\n", " end_time_mp = time.time()\n", " print(f\"CPU bound tasks (multiprocessing) took: {end_time_mp - start_time_mp:.2f} seconds\\n\")\n", " # 预期时间接近 (单个任务时间),如果你的CPU有至少2个核心,因为它们可以并行执行。\n", " # 如果是单核CPU,时间可能仍然是串行的两倍,但没有GIL的额外开销。\n", "\n", " # multiprocessing 对于I/O密集型任务也有效,但开销比线程大\n", " print(\"--- Running I/O bound tasks with multiprocessing ---\")\n", " # 为避免命名冲突,我们这里重用之前的 io_bound_task 函数\n", " # 在实际应用中,确保函数对于多进程是安全的 (例如,不依赖于全局变量的特定状态)\n", " start_time_io_mp = time.time()\n", " process_io1 = multiprocessing.Process(target=io_bound_task, args=(\"IO Task 1 (Process)\", 2))\n", " process_io2 = multiprocessing.Process(target=io_bound_task, args=(\"IO Task 2 (Process)\", 2))\n", "\n", " process_io1.start()\n", " process_io2.start()\n", " process_io1.join()\n", " process_io2.join()\n", " end_time_io_mp = time.time()\n", " print(f\"I/O bound tasks (multiprocessing) took: {end_time_io_mp - start_time_io_mp:.2f} seconds\\n\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "观察 `multiprocessing` 的输出:\n", "* CPU 密集型任务的总时间,如果有多核,应该会显著少于线程版本的总时间,接近单个任务的执行时间。\n", "* 每个进程有自己独立的 `PID`。\n", "\n", "**进程间通信 (IPC)**:\n", "由于进程有独立的内存空间,它们不能像线程那样直接共享数据。`multiprocessing` 提供了多种 IPC 机制:\n", "* **`Queue`**:一个进程安全的队列,类似于 `queue.Queue`,但用于进程间。\n", "* **`Pipe`**:返回一对连接对象,代表管道的两端,可用于双向通信。\n", "* **共享内存 (`Value`, `Array`)**:允许在进程间共享简单的基本数据类型或数组。\n", "* **`Manager`**:提供更高级的共享数据结构(如 list, dict, Lock, RLock, Semaphore, Condition, Event, Queue 等),这些数据结构由一个服务器进程管理,其他进程通过代理访问它们。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def worker_ipc(q_in, q_out):\n", " pid = os.getpid()\n", " while True:\n", " try:\n", " item = q_in.get(timeout=1) # 等待1秒获取数据\n", " if item is None: # 终止信号\n", " print(f\"PID {pid}: Received None, exiting.\")\n", " q_out.put(f\"Worker {pid} processed termination.\")\n", " break\n", " result = item * item\n", " print(f\"PID {pid}: Processing {item}, result {result}\")\n", " q_out.put((item, result))\n", " except Exception: # 例如 queue.Empty if timeout\n", " # print(f\"PID {pid}: Queue empty or other issue, continuing...\")\n", " break # 简单处理,实际应用可能需要更健壮的逻辑\n", " \n", "if __name__ == '__main__':\n", " print(\"\\n--- Testing multiprocessing IPC with Queue ---\")\n", " input_queue = multiprocessing.Queue()\n", " output_queue = multiprocessing.Queue()\n", "\n", " num_workers = 2\n", " processes = []\n", " for i in range(num_workers):\n", " p = multiprocessing.Process(target=worker_ipc, args=(input_queue, output_queue))\n", " processes.append(p)\n", " p.start()\n", "\n", " # 发送任务数据\n", " for i in range(5):\n", " input_queue.put(i)\n", "\n", " # 发送终止信号\n", " for _ in range(num_workers):\n", " input_queue.put(None)\n", "\n", " # 收集结果 (这里需要确保所有任务和终止信号都被处理)\n", " # 一个简单的方法是等待一定数量的结果,或者等待进程结束\n", " results_collected = 0\n", " expected_results = 5 + num_workers # 5 data items + num_workers termination messages\n", " \n", " # 超时以防万一\n", " timeout_collection = time.time() + 10 # 10秒超时\n", " final_results = []\n", " while results_collected < expected_results and time.time() < timeout_collection:\n", " try:\n", " res = output_queue.get(timeout=1)\n", " print(f\"Main: Got result from output_queue: {res}\")\n", " final_results.append(res)\n", " results_collected +=1\n", " except Exception: # queue.Empty\n", " pass\n", " \n", " for p in processes:\n", " p.join(timeout=5) # 等待进程结束,设置超时\n", " if p.is_alive():\n", " print(f\"Process {p.pid} did not terminate, terminating now.\")\n", " p.terminate() # 强制终止\n", " p.join()\n", "\n", " print(\"Final collected results:\", final_results)\n", " print(\"--- Finished multiprocessing IPC with Queue ---\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 4. 进程池与线程池 (`concurrent.futures` 模块)\n", "\n", "管理大量的线程或进程可能很麻烦。`concurrent.futures` 模块提供了一个高级接口来异步执行可调用对象。\n", "\n", "* **`ThreadPoolExecutor`**:使用线程池来执行任务。\n", "* **`ProcessPoolExecutor`**:使用进程池来执行任务。\n", "\n", "它们都提供了 `submit(fn, *args, **kwargs)` 方法来提交任务,该方法返回一个 `Future` 对象。`Future` 对象代表异步操作的最终结果,你可以用它来检查任务是否完成、获取结果或异常。\n", "还有一个 `map(func, *iterables, timeout=None, chunksize=1)` 方法,类似于内置的 `map` 函数,但并发执行。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed\n", "\n", "def task_for_executor(task_id, duration, is_io_bound=True):\n", " pid = os.getpid()\n", " thread_id = threading.get_ident()\n", " print(f\"[{time.strftime('%X')}] PID:{pid} T:{thread_id} - Task {task_id} starting, duration {duration}s\")\n", " if is_io_bound:\n", " time.sleep(duration)\n", " else: # CPU bound simulation\n", " _ = sum(i*i for i in range(int(duration * 10_000_000))) # 调整这个乘数以匹配实际耗时\n", " result = f\"Task {task_id} (PID:{pid} T:{thread_id}) done in {duration}s\"\n", " print(f\"[{time.strftime('%X')}] {result}\")\n", " return result\n", "\n", "if __name__ == '__main__':\n", " print(\"\\n--- Testing ThreadPoolExecutor for I/O bound tasks ---\")\n", " io_tasks_data = [(\"IO_A\", 2), (\"IO_B\", 1), (\"IO_C\", 3)]\n", " start_pool_io = time.time()\n", " with ThreadPoolExecutor(max_workers=3) as executor:\n", " futures_io = [executor.submit(task_for_executor, name, dur, True) for name, dur in io_tasks_data]\n", " for future in as_completed(futures_io):\n", " try:\n", " print(f\"Main (ThreadPool): Result - {future.result()}\")\n", " except Exception as e:\n", " print(f\"Main (ThreadPool): Task generated an exception: {e}\")\n", " end_pool_io = time.time()\n", " print(f\"ThreadPoolExecutor (I/O) took: {end_pool_io - start_pool_io:.2f}s\\n\")\n", "\n", " print(\"--- Testing ProcessPoolExecutor for CPU bound tasks ---\")\n", " # 注意:CPU任务的duration参数在这里被用来控制计算量,而不是实际sleep时间\n", " # 实际效果取决于CPU核心数和计算量\n", " cpu_tasks_data = [(\"CPU_X\", 1), (\"CPU_Y\", 1.5), (\"CPU_Z\", 0.5)] \n", " # 根据你的CPU调整这里的 'duration' (计算量) 以看到并行效果\n", " # 例如,如果单个任务耗时1秒,3个任务在3核CPU上理想情况约1-1.5秒完成\n", " start_pool_cpu = time.time()\n", " with ProcessPoolExecutor(max_workers=min(3, os.cpu_count() or 1)) as executor:\n", " futures_cpu = [executor.submit(task_for_executor, name, dur, False) for name, dur in cpu_tasks_data]\n", " for future in as_completed(futures_cpu):\n", " try:\n", " print(f\"Main (ProcessPool): Result - {future.result()}\")\n", " except Exception as e:\n", " print(f\"Main (ProcessPool): Task generated an exception: {e}\")\n", " end_pool_cpu = time.time()\n", " print(f\"ProcessPoolExecutor (CPU) took: {end_pool_cpu - start_pool_cpu:.2f}s\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`as_completed(futures)` 是一个有用的函数,它返回一个迭代器,在任何 future 完成(或被取消)时产生它们。" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 5. 何时选择线程 vs 进程\n", "\n", "| 特性 | `threading` (线程) | `multiprocessing` (进程) |\n", "|------------------|-----------------------------------------------------|-----------------------------------------------------------|\n", "| **GIL 影响** | 受 GIL 限制,CPU密集型任务无法真正并行 | 不受 GIL 限制 (每个进程有自己的解释器和GIL),CPU密集型任务可并行 |\n", "| **适用场景** | I/O 密集型任务 (网络、文件读写等) | CPU 密集型任务 (大量计算) |\n", "| **内存共享** | 共享内存空间,数据共享简单,但需注意线程安全 | 独立内存空间,数据共享需IPC (开销较大) |\n", "| **创建开销** | 较小 | 较大 (创建完整进程上下文) |\n", "| **上下文切换** | 较快 | 较慢 |\n", "| **鲁棒性** | 一个线程崩溃可能导致整个进程崩溃 | 一个子进程崩溃通常不影响主进程或其他子进程 |\n", "| **可扩展性** | 受限于单进程的资源(如内存) | 更易于扩展到多台机器 (分布式计算的基础) |\n", "\n", "**简而言之:**\n", "\n", "* 如果你的任务主要是等待外部操作(网络请求、文件读写、用户输入),**使用 `threading` 或 `asyncio`**(异步编程是另一种处理I/O并发的强大范式,本教程未详细展开)。线程开销小,上下文切换快。\n", "* 如果你的任务主要是进行大量计算,并且你想利用多核CPU,**使用 `multiprocessing`**。它能真正实现并行计算。\n", "* `concurrent.futures` 提供了一个统一的高级接口,可以方便地在线程池和进程池之间切换,通常是现代并发编程的首选。\n", "\n", "## 总结\n", "\n", "Python 提供了强大的工具来实现并发和并行。理解 GIL 的影响以及线程和进程的特性,是选择正确工具的关键。\n", "\n", "* `threading` 适合 I/O 密集型任务,通过并发执行提高响应性。\n", "* `multiprocessing` 适合 CPU 密集型任务,通过并行执行提高计算吞吐量。\n", "* 线程同步原语对于保护共享数据至关重要。\n", "* `concurrent.futures` 提供了易于使用的线程池和进程池。\n", "\n", "在实际应用中,你可能需要结合使用这些技术,或者考虑 `asyncio` 来构建更复杂的并发系统。始终要对你的代码进行性能分析,以确保你选择的并发策略确实带来了预期的性能提升。" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.12" } }, "nbformat": 4, "nbformat_minor": 5 }