{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Python 上下文管理器 (`with` 语句) 与 `contextlib` 模块教程\n", "\n", "欢迎来到 Python 上下文管理器和 `contextlib` 模块的教程!`with` 语句是 Python 中一种优雅且强大的资源管理方式,它可以确保某些操作(如文件关闭、锁的释放)即使在发生异常时也能正确执行。本教程将深入探讨上下文管理器的工作原理、如何自定义上下文管理器,以及如何使用 `contextlib` 模块中的工具来更轻松地创建它们。\n", "\n", "**为什么使用 `with` 语句和上下文管理器?**\n", "\n", "1. **资源管理**:自动获取和释放资源(如文件、网络连接、数据库会话、锁等),确保即使发生错误,清理代码(通常在 `finally` 块中)也能被执行。\n", "2. **代码简洁性**:避免了显式的 `try...finally` 结构,使代码更易读。\n", "3. **封装设置和拆卸逻辑**:将资源的准备和清理逻辑封装在上下文管理器内部。\n", "\n", "**本教程将涵盖:**\n", "\n", "1. **回顾 `with` 语句的基本用法**\n", "2. **上下文管理协议 (Context Management Protocol)**:`__enter__` 和 `__exit__`\n", "3. **创建自定义上下文管理器 (基于类)**\n", "4. **`contextlib` 模块简介**\n", " * `@contextlib.contextmanager` 装饰器:使用生成器创建上下文管理器\n", " * `contextlib.closing`:确保对象的 `close()` 方法被调用\n", " * `contextlib.suppress`:临时抑制指定的异常\n", " * `contextlib.ExitStack` 和 `AsyncExitStack`:动态管理多个上下文管理器\n", "5. **异步上下文管理器 (Async Context Managers)**:`__aenter__` 和 `__aexit__` (简要回顾)\n", "6. **实际应用场景**" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 1. 回顾 `with` 语句的基本用法\n", "\n", "你最常看到的 `with` 语句可能是用于文件操作:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "file_path = \"example_with_statement.txt\"\n", "\n", "# 使用 with 语句打开文件\n", "try:\n", " with open(file_path, \"w\") as f:\n", " print(f\"File object type inside with: {type(f)}\")\n", " f.write(\"Hello from the with statement!\\n\")\n", " print(\"Wrote to file.\")\n", " # 文件 f 会在退出 with 块时自动关闭,即使发生异常\n", " # raise ValueError(\"Simulating an error inside with\") # 取消注释测试异常处理\n", " print(f\"File '{file_path}' closed automatically: {f.closed}\")\n", "except ValueError as e:\n", " print(f\"Caught error: {e}\")\n", " # 即使发生错误,文件仍然会被关闭\n", " # 注意:此时 f 可能未定义,取决于异常发生的位置\n", " # print(f\"File closed after error: {f.closed}\") # 这行可能会报错\n", " # 更好的做法是在 finally 中检查,或者信任 with 语句\n", " pass \n", "\n", "# 清理文件\n", "import os\n", "if os.path.exists(file_path):\n", " os.remove(file_path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "这里的 `open()` 函数返回的文件对象就是一个**上下文管理器 (context manager)**。" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 2. 上下文管理协议 (Context Management Protocol)\n", "\n", "任何实现了以下两个特殊方法的对象都可以作为上下文管理器:\n", "\n", "* **`__enter__(self)`**:\n", " * 在进入 `with` 语句块之前被调用。\n", " * 它的返回值(如果有的话)会赋给 `with ... as variable:` 中的 `variable`。\n", " * 如果不需要在 `as` 子句中接收值,`__enter__` 可以不返回任何东西(即返回 `None`)。\n", "\n", "* **`__exit__(self, exc_type, exc_val, exc_tb)`**:\n", " * 在退出 `with` 语句块时被调用,无论块内代码是正常完成还是发生异常。\n", " * 参数:\n", " * `exc_type`: 异常的类型(如果块内没有异常,则为 `None`)。\n", " * `exc_val`: 异常的实例(如果块内没有异常,则为 `None`)。\n", " * `exc_tb`: 回溯 (traceback) 对象(如果块内没有异常,则为 `None`)。\n", " * **返回值**:\n", " * 如果 `__exit__` 方法返回 `True`,则表示它已经处理了异常,异常会被“抑制”,不会向外传播。\n", " * 如果 `__exit__` 方法返回 `False` (或 `None`,这是默认行为),则发生的任何异常都会在 `__exit__` 方法执行完毕后重新引发,由外部代码处理。" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 3. 创建自定义上下文管理器 (基于类)\n", "\n", "我们可以通过定义一个包含 `__enter__` 和 `__exit__` 方法的类来创建自己的上下文管理器。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import time\n", "\n", "class Timer:\n", " \"\"\"一个简单的计时器上下文管理器\"\"\"\n", " def __init__(self, name=\"Default Timer\"):\n", " self.name = name\n", " self.start_time = None\n", " print(f\"Timer '{self.name}': Initialized.\")\n", "\n", " def __enter__(self):\n", " print(f\"Timer '{self.name}': __enter__ - Starting timer.\")\n", " self.start_time = time.perf_counter()\n", " return self # 返回自身,以便在 with 块内可以访问 Timer 实例的属性 (可选)\n", "\n", " def __exit__(self, exc_type, exc_val, exc_tb):\n", " end_time = time.perf_counter()\n", " elapsed_time = end_time - self.start_time\n", " print(f\"Timer '{self.name}': __exit__ - Elapsed time: {elapsed_time:.4f} seconds.\")\n", " if exc_type:\n", " print(f\" Timer '{self.name}': Exception occurred: {exc_type.__name__} - {exc_val}\")\n", " # return False # 默认行为,不抑制异常\n", "\n", "print(\"--- Testing Timer context manager (normal execution) ---\")\n", "with Timer(\"MyOperation\") as t:\n", " print(f\" Inside with block for timer '{t.name}'. Accessing start_time (approx): {t.start_time}\")\n", " time.sleep(0.5)\n", " print(\" Work inside with block done.\")\n", "\n", "print(\"\\n--- Testing Timer context manager (with exception) ---\")\n", "try:\n", " with Timer(\"RiskyOperation\") as risky_t:\n", " print(f\" Inside with block for timer '{risky_t.name}'.\")\n", " time.sleep(0.2)\n", " raise KeyError(\"Simulated key error!\")\n", "except KeyError as e:\n", " print(f\"Caught expected KeyError outside 'with': {e}\")\n", "\n", "# 示例:抑制异常\n", "class SuppressErrorExample:\n", " def __enter__(self):\n", " print(\"SuppressErrorExample: Entering\")\n", " return self\n", " def __exit__(self, exc_type, exc_val, exc_tb):\n", " print(\"SuppressErrorExample: Exiting\")\n", " if isinstance(exc_val, TypeError):\n", " print(f\" Suppressing TypeError: {exc_val}\")\n", " return True # 抑制 TypeError\n", " return False # 其他异常不抑制\n", "\n", "print(\"\\n--- Testing SuppressErrorExample (suppressing TypeError) ---\")\n", "with SuppressErrorExample():\n", " print(\" Trying to cause a TypeError...\")\n", " _ = \"text\" + 5 # This will raise TypeError\n", "print(\"After with block (TypeError was suppressed)\")\n", "\n", "print(\"\\n--- Testing SuppressErrorExample (not suppressing ValueError) ---\")\n", "try:\n", " with SuppressErrorExample():\n", " print(\" Trying to cause a ValueError...\")\n", " raise ValueError(\"This error won't be suppressed\")\n", "except ValueError as e:\n", " print(f\"Caught ValueError outside 'with' as expected: {e}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 4. `contextlib` 模块简介\n", "\n", "`contextlib` 模块提供了一些实用工具,用于处理上下文管理器和 `with` 语句。" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 4.1 `@contextlib.contextmanager` 装饰器\n", "\n", "这个装饰器允许你使用一个简单的**生成器函数**来创建上下文管理器,而无需编写一个完整的类。\n", "\n", "**要求:**\n", "* 被装饰的生成器函数必须**只 `yield` 一次**。\n", "* `yield` 之前的部分代码相当于 `__enter__` 方法的逻辑。\n", "* `yield` 语句产生的值会赋给 `with ... as variable:` 中的 `variable`。\n", "* `yield` 之后的部分代码(通常在 `try...finally` 块中)相当于 `__exit__` 方法的逻辑,确保即使发生异常也能执行清理。\n", "* 生成器内部发生的异常会正常传播,除非在 `finally` 块中被捕获或处理。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import contextlib\n", "\n", "@contextlib.contextmanager\n", "def managed_file(filename, mode):\n", " print(f\"@contextmanager: Acquiring file '{filename}' in mode '{mode}'\")\n", " f = None\n", " try:\n", " f = open(filename, mode)\n", " yield f # 这是 __enter__ 返回的值,也是生成器暂停的地方\n", " print(f\"@contextmanager: Inside try block after yield (normal exit of 'with' body)\")\n", " except Exception as e:\n", " print(f\"@contextmanager: Exception caught inside generator: {e}\")\n", " raise # 重新引发异常,除非你想抑制它\n", " finally:\n", " if f:\n", " print(f\"@contextmanager: Releasing file '{filename}' (in finally)\")\n", " f.close()\n", "\n", "file_path_cm = \"example_cm.txt\"\n", "\n", "print(\"--- Testing @contextmanager (normal execution) ---\")\n", "with managed_file(file_path_cm, \"w\") as mf:\n", " print(f\" Inside with: File object is {mf}\")\n", " mf.write(\"Hello from @contextmanager!\")\n", " print(f\" Inside with: File closed status: {mf.closed}\") # 应该是 False\n", "print(f\"After with: Is file closed? {mf.closed}\") # 应该是 True\n", "\n", "print(\"\\n--- Testing @contextmanager (with exception) ---\")\n", "try:\n", " with managed_file(file_path_cm, \"r\") as mf_read: # 应该能读到刚才写的内容\n", " print(f\" Inside with: Reading content: {mf_read.read().strip()}\")\n", " raise ZeroDivisionError(\"Simulated error for @contextmanager\")\n", "except ZeroDivisionError as e:\n", " print(f\"Caught expected ZeroDivisionError outside 'with': {e}\")\n", "\n", "# 清理\n", "if os.path.exists(file_path_cm):\n", " os.remove(file_path_cm)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 4.2 `contextlib.closing(thing)`\n", "\n", "如果一个对象 `thing` 提供了 `close()` 方法但没有实现上下文管理协议(即没有 `__enter__` 和 `__exit__`),`closing(thing)` 会返回一个上下文管理器,它在退出时调用 `thing.close()`。\n", "\n", "这对于处理一些旧式的、只提供 `close()` 方法的资源很有用。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class OldResource:\n", " def __init__(self, name):\n", " self.name = name\n", " self.is_closed = False\n", " print(f\"OldResource '{self.name}': Created.\")\n", " \n", " def use(self):\n", " if self.is_closed:\n", " raise ValueError(\"Resource is closed\")\n", " print(f\" OldResource '{self.name}': Being used.\")\n", " \n", " def close(self):\n", " print(f\"OldResource '{self.name}': close() called.\")\n", " self.is_closed = True\n", "\n", "print(\"--- Testing contextlib.closing --- \")\n", "resource = OldResource(\"LegacyDB\")\n", "\n", "with contextlib.closing(resource) as r_closed:\n", " # r_closed 就是 resource 本身\n", " print(f\" Inside with: resource is r_closed: {resource is r_closed}\")\n", " r_closed.use()\n", "print(f\"After with: resource.is_closed = {resource.is_closed}\") # 应该是 True\n", "\n", "print(\"\\n--- Testing contextlib.closing with an error --- \")\n", "resource2 = OldResource(\"LegacySocket\")\n", "try:\n", " with contextlib.closing(resource2) as r2_closed:\n", " r2_closed.use()\n", " raise RuntimeError(\"Error while using legacy resource\")\n", "except RuntimeError as e:\n", " print(f\"Caught expected RuntimeError: {e}\")\n", "print(f\"After with (error): resource2.is_closed = {resource2.is_closed}\") # 仍然应该是 True" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 4.3 `contextlib.suppress(*exceptions)`\n", "\n", "返回一个上下文管理器,用于临时抑制指定的异常类型。在 `with` 块中,如果发生了列出的异常类型,它们会被捕获并静默处理,程序会正常继续执行 `with` 块之后的代码。\n", "\n", "**注意**:应谨慎使用,确保你确实想要忽略这些异常,而不是隐藏了重要的问题。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(\"--- Testing contextlib.suppress --- \")\n", "\n", "print(\"Attempting to delete a non-existent file (will suppress FileNotFoundError):\")\n", "non_existent_file = \"no_such_file.txt\"\n", "with contextlib.suppress(FileNotFoundError, PermissionError):\n", " os.remove(non_existent_file)\n", " print(f\" Code inside 'with' after os.remove attempt.\") # 这行会执行\n", "print(\"After 'with' block, FileNotFoundError was suppressed.\")\n", "\n", "print(\"\\nAttempting an operation that causes TypeError (will not be suppressed):\")\n", "try:\n", " with contextlib.suppress(FileNotFoundError):\n", " result = \"text\" + 10 # Raises TypeError\n", " print(\" This line (inside with) will not be reached if TypeError occurs.\")\n", "except TypeError as e:\n", " print(f\"Caught TypeError outside 'with' as expected: {e}\")\n", "print(\"After 'with' block for TypeError.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 4.4 `contextlib.ExitStack` 和 `AsyncExitStack`\n", "\n", "`ExitStack` 是一个上下文管理器,它允许你以编程方式注册多个上下文管理器或清理函数,并在 `ExitStack` 本身退出时,以注册的相反顺序调用它们的 `__exit__` 方法或清理函数。\n", "这对于动态管理一组不确定数量的资源非常有用。\n", "\n", "`AsyncExitStack` 是其异步版本,用于 `async with`。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class ResourceForStack:\n", " def __init__(self, name):\n", " self.name = name\n", " def __enter__(self):\n", " print(f\"ResourceForStack '{self.name}': Entering\")\n", " return self\n", " def __exit__(self, exc_type, exc_val, exc_tb):\n", " print(f\"ResourceForStack '{self.name}': Exiting\")\n", " def use(self):\n", " print(f\" Using '{self.name}'\")\n", "\n", "def cleanup_function(resource_name):\n", " print(f\"Cleanup function called for '{resource_name}'\")\n", "\n", "print(\"--- Testing contextlib.ExitStack --- \")\n", "resources_to_manage = [\n", " ResourceForStack(\"ResA\"),\n", " ResourceForStack(\"ResB\")\n", "]\n", "\n", "with contextlib.ExitStack() as stack:\n", " print(\"Inside ExitStack 'with' block.\")\n", " \n", " # 动态进入上下文管理器\n", " for res_obj in resources_to_manage:\n", " r = stack.enter_context(res_obj)\n", " r.use()\n", " \n", " # 注册一个简单的清理回调函数\n", " # stack.callback(cleanup_function, \"DynamicResource1\")\n", " # stack.callback(cleanup_function, \"DynamicResource2\")\n", " # 或者使用 push (可以 unregister)\n", " stack.push(lambda: cleanup_function(\"PushedCleanup1\"))\n", " \n", " print(\"Simulating work within ExitStack...\")\n", " # 如果这里发生异常,所有已注册的 __exit__ 和回调仍会以相反顺序调用\n", " # raise ValueError(\"Error inside ExitStack\") \n", "\n", "print(\"After ExitStack 'with' block. Resources should be released in reverse order of entry/push.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 5. 异步上下文管理器 (Async Context Managers) - 简要回顾\n", "\n", "对于异步编程 (`async/await`),上下文管理器需要使用异步版本的方法:\n", "\n", "* **`__aenter__(self)`**: 必须是一个 `async def` 方法,并且应该 `await` 异步操作。它返回的值(通常是 `awaitable` 解析后的值或 `self`)赋给 `async with ... as var` 中的 `var`。\n", "* **`__aexit__(self, exc_type, exc_val, exc_tb)`**: 也必须是一个 `async def` 方法。\n", "\n", "`contextlib` 也提供了异步版本:\n", "* `@contextlib.asynccontextmanager`\n", "* `contextlib.AsyncExitStack`" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import asyncio\n", "\n", "@contextlib.asynccontextmanager\n", "async def async_managed_resource(name):\n", " print(f\"AsyncCM '{name}': Acquiring resource (async)...\", flush=True)\n", " await asyncio.sleep(0.1) # 模拟异步获取\n", " try:\n", " yield name # 值赋给 as 后面的变量\n", " print(f\"AsyncCM '{name}': Inside try after yield (normal async with body exit).\", flush=True)\n", " finally:\n", " print(f\"AsyncCM '{name}': Releasing resource (async, in finally)...\", flush=True)\n", " await asyncio.sleep(0.1) # 模拟异步释放\n", "\n", "async def use_async_cm():\n", " print(\"--- Testing @asynccontextmanager ---\")\n", " async with async_managed_resource(\"AsyncRes1\") as res_name:\n", " print(f\" Inside async with: Got resource name '{res_name}'\", flush=True)\n", " await asyncio.sleep(0.2)\n", " print(\" Inside async with: Work done.\", flush=True)\n", " print(\"After async with block.\", flush=True)\n", "\n", "# 为了在Jupyter中运行asyncio代码\n", "if __name__ == '__main__': # 确保只在直接运行时执行,而不是导入时\n", " # asyncio.run(use_async_cm()) # 标准运行方式\n", " # 在Jupyter中,如果已经有事件循环,可能需要特殊处理。\n", " # 通常,如果IPython版本够新,可以直接await顶层协程。\n", " # 为了简单演示,这里不直接运行,或者需要nest_asyncio。\n", " print(\"Async context manager example defined. Run in an async context.\")\n", " # 如果想在notebook cell中直接运行, 你可以这样做:\n", " # import nest_asyncio\n", " # nest_asyncio.apply()\n", " # asyncio.run(use_async_cm())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 6. 实际应用场景\n", "\n", "上下文管理器在多种场景下都非常有用:\n", "\n", "* **文件操作**:`open()` (最常见的例子)。\n", "* **锁和同步原语**:`threading.Lock`, `multiprocessing.Lock` 等都支持 `with` 语句,确保锁被正确释放。\n", " ```python\n", " # import threading\n", " # my_lock = threading.Lock()\n", " # with my_lock:\n", " # # 访问共享资源\n", " # pass\n", " ```\n", "* **数据库连接和事务**:确保连接关闭和事务提交/回滚。\n", " ```python\n", " # import sqlite3\n", " # with sqlite3.connect(\"mydb.db\") as conn: # 连接对象本身就是上下文管理器\n", " # cursor = conn.cursor()\n", " # # ... 执行SQL ...\n", " # conn.commit() # 事务在正常退出时提交,异常时回滚\n", " ```\n", "* **网络连接**:确保套接字关闭。\n", "* **临时改变状态**:例如,临时改变 `decimal` 模块的精度,或临时改变当前工作目录。\n", " ```python\n", " # import decimal\n", " # @contextlib.contextmanager\n", " # def local_decimal_precision(prec=28):\n", " # ctx = decimal.getcontext()\n", " # original_prec = ctx.prec\n", " # ctx.prec = prec\n", " # try:\n", " # yield\n", " # finally:\n", " # ctx.prec = original_prec\n", " # with local_decimal_precision(50):\n", " # # 这里的 decimal 运算使用 50 位精度\n", " # pass\n", " ```\n", "* **测试装置 (Fixtures)**:在测试中设置和拆卸测试环境。\n", "* **性能计时和剖析**:如本教程中的 `Timer` 示例。\n", "\n", "## 总结\n", "\n", "上下文管理器和 `with` 语句是 Python 中进行健壮资源管理和封装设置/拆卸逻辑的关键工具。通过实现上下文管理协议或使用 `contextlib` 模块,你可以编写出更清晰、更可靠的代码。\n", "\n", "`contextlib` 模块,特别是 `@contextmanager` 装饰器,极大地简化了自定义上下文管理器的创建过程,使其更加易于使用和理解。" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.0" }, "orig_nbformat": 4 }, "nbformat": 4, "nbformat_minor": 5 }