Skip to content

Instantly share code, notes, and snippets.

@Andrew-Chen-Wang
Last active December 3, 2024 15:30
Show Gist options
  • Save Andrew-Chen-Wang/67c68b2392001d486551e1e6660b538f to your computer and use it in GitHub Desktop.
Save Andrew-Chen-Wang/67c68b2392001d486551e1e6660b538f to your computer and use it in GitHub Desktop.

Revisions

  1. Andrew-Chen-Wang revised this gist Dec 3, 2024. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion openai.py
    Original file line number Diff line number Diff line change
    @@ -70,7 +70,7 @@ def parse_time_left(time_str: str) -> float:
    "days": r"(\d+)d",
    "hours": r"(\d+)h",
    "minutes": r"(\d+)m(?![s])", # Negative lookahead to exclude 'ms'
    "seconds": r"(\d+)s(?![m])", # Negative lookahead to exclude 'ms'
    "seconds": r"(\d+\.?\d*)s(?![m])", # Negative lookahead and handles decimals
    "milliseconds": r"(\d+)ms",
    }

  2. Andrew-Chen-Wang revised this gist Feb 24, 2024. 1 changed file with 33 additions and 9 deletions.
    42 changes: 33 additions & 9 deletions openai.py
    Original file line number Diff line number Diff line change
    @@ -97,31 +97,56 @@ def parse_time_left(time_str: str) -> float:


    # We need to set all our values first
    has_executed_once: ContextVar[bool] = ContextVar("HasExecutedOnce", default=False)
    first_execution_lock = asyncio.Lock()


    @retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6))
    async def call_openai(
    async_func: Callable[[Any, Any], Awaitable[Any]], params: dict
    async_func: Callable[[Any, Any], Awaitable[Any]], params: dict, event: asyncio.Event
    ) -> LegacyAPIResponse:
    response: LegacyAPIResponse

    if not has_executed_once.get():
    response: LegacyAPIResponse | None = None

    async def run():
    try:
    return await async_func(**params)
    except openai.BadRequestError as e:
    if e.code == "context_length_exceeded":
    if params["model"] == "gpt-4-0125-preview":
    raise e
    params["model"] = "gpt-4-0125-preview"
    return await async_func(**params)
    raise e

    if not event.is_set():
    should_run_without_lock = True
    async with first_execution_lock:
    response = await async_func(**params)
    has_executed_once.set(True)
    # Only run the OpenAI call for the first to lock
    if not event.is_set():
    response = await run()
    should_run_without_lock = False
    event.set()
    if should_run_without_lock:
    response = await run()
    else:
    async with openai_lock:
    if rateLimitRemainingRequests.get() < 3:
    print(
    f"Rate limit reached for requests with {rateLimitRemainingRequests.get()} "
    f"requests remaining. Sleeping for {rateLimitResetRequests.get()}s."
    )
    await asyncio.sleep(0.1 + rateLimitResetRequests.get())
    rateLimitRemainingRequests.set(rateLimitRequestsMax.get())
    if rateLimitRemainingTokens.get() < 12000:
    print(
    f"Rate limit reached for tokens with {rateLimitRemainingTokens.get()} tokens "
    f"remaining. Sleeping for {rateLimitResetTokens.get()}s."
    )
    await asyncio.sleep(0.1 + rateLimitResetTokens.get())
    rateLimitRemainingTokens.set(rateLimitTokensMax.get())

    response = await async_func(**params)
    response = await run()

    response = cast(LegacyAPIResponse, response)
    async with openai_lock:
    rateLimitTokensMax.set(int(response.headers["x-ratelimit-limit-tokens"]))
    rateLimitRequestsMax.set(int(response.headers["x-ratelimit-limit-requests"]))
    @@ -132,4 +157,3 @@ async def call_openai(
    rateLimitResetRequests.set(parse_time_left(response.headers["x-ratelimit-reset-requests"]))
    rateLimitResetTokens.set(parse_time_left(response.headers["x-ratelimit-reset-tokens"]))
    return response

  3. Andrew-Chen-Wang revised this gist Feb 24, 2024. 1 changed file with 8 additions and 0 deletions.
    8 changes: 8 additions & 0 deletions openai.py
    Original file line number Diff line number Diff line change
    @@ -11,6 +11,14 @@
    client.chat.completions.with_raw_response.create
    To get the typical ChatCompletion response object, simply run:
    ```python
    from openai.types.chat import ChatCompletion
    r = await call_openai(client.chat.completions.with_raw_response.create, params)
    r.parse(to=ChatCompletion)
    ```
    Warning:
    This is meant for a single threaded application. If you have multiple servers,
    you'll want to replace this locking mechanism with either:
  4. Andrew-Chen-Wang revised this gist Feb 24, 2024. 1 changed file with 3 additions and 2 deletions.
    5 changes: 3 additions & 2 deletions openai.py
    Original file line number Diff line number Diff line change
    @@ -14,8 +14,9 @@
    Warning:
    This is meant for a single threaded application. If you have multiple servers,
    you'll want to replace this locking mechanism with either:
    1. A dedicated server that handles all your OpenAI calls e.g. API Gateway (recommended)
    2. A distributed lock stored somewhere like Redis
    1. To continue using this script, a dedicated server that handles all your OpenAI calls
    e.g. API Gateway (recommended approach)
    2. A distributed lock stored somewhere like Redis (must rewrite the locking mechanism here)
    """
    import asyncio
    import re
  5. Andrew-Chen-Wang revised this gist Feb 24, 2024. 1 changed file with 6 additions and 0 deletions.
    6 changes: 6 additions & 0 deletions openai.py
    Original file line number Diff line number Diff line change
    @@ -10,6 +10,12 @@
    You must call with with_raw_response to get rate limit headers like:
    client.chat.completions.with_raw_response.create
    Warning:
    This is meant for a single threaded application. If you have multiple servers,
    you'll want to replace this locking mechanism with either:
    1. A dedicated server that handles all your OpenAI calls e.g. API Gateway (recommended)
    2. A distributed lock stored somewhere like Redis
    """
    import asyncio
    import re
  6. Andrew-Chen-Wang created this gist Feb 24, 2024.
    120 changes: 120 additions & 0 deletions openai.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,120 @@
    """
    Created by: @Andrew-Chen-Wang
    Parallel calling OpenAI with rate limit handling.
    Required packages:
    - openai
    - tenacity (optional if you remove the decorators. Useful for 500 errors)
    Usage:
    You must call with with_raw_response to get rate limit headers like:
    client.chat.completions.with_raw_response.create
    """
    import asyncio
    import re
    from contextvars import ContextVar
    from typing import Any, Awaitable, Callable, TypedDict

    import openai
    from openai._legacy_response import LegacyAPIResponse
    from tenacity import (
    retry,
    stop_after_attempt,
    wait_random_exponential,
    )


    # Rate limiting
    rateLimitRequestsMax: ContextVar[int] = ContextVar("RateLimitRequestsMax", default=5000)
    rateLimitTokensMax: ContextVar[int] = ContextVar("RateLimitTokensMax", default=160000)
    rateLimitRemainingRequests: ContextVar[int] = ContextVar("RateLimitRemainingRequests", default=5000)
    rateLimitRemainingTokens: ContextVar[int] = ContextVar("RateLimitRemainingTokens", default=160000)
    # in seconds
    rateLimitResetRequests: ContextVar[float] = ContextVar("RateLimitResetRequests", default=0)
    # in seconds
    rateLimitResetTokens: ContextVar[float] = ContextVar("RateLimitResetTokens", default=0)

    # Ensuring only one modification happens for OpenAI results at a time
    openai_lock = asyncio.Lock()


    def parse_time_left(time_str: str) -> float:
    """
    Returns time left in seconds. Allowed formats:
    1s
    12ms
    6m12s
    13m0s1ms
    1d2h3m4s
    1d4s3ms
    """
    # Define regex patterns for days, hours, minutes, seconds, and milliseconds
    patterns = {
    "days": r"(\d+)d",
    "hours": r"(\d+)h",
    "minutes": r"(\d+)m(?![s])", # Negative lookahead to exclude 'ms'
    "seconds": r"(\d+)s(?![m])", # Negative lookahead to exclude 'ms'
    "milliseconds": r"(\d+)ms",
    }

    # Initialize total time in seconds
    total_seconds = 0

    # Loop through each time unit, find matches, and convert to seconds
    for unit, pattern in patterns.items():
    match = re.search(pattern, time_str)
    if match:
    value = int(match.group(1))
    if unit == "days":
    total_seconds += value * 86400 # 1 day = 86400 seconds
    elif unit == "hours":
    total_seconds += value * 3600 # 1 hour = 3600 seconds
    elif unit == "minutes":
    total_seconds += value * 60 # 1 minute = 60 seconds
    elif unit == "seconds":
    total_seconds += value
    elif unit == "milliseconds":
    total_seconds += value / 1000 # 1 millisecond = 0.001 seconds

    return total_seconds


    # We need to set all our values first
    has_executed_once: ContextVar[bool] = ContextVar("HasExecutedOnce", default=False)
    first_execution_lock = asyncio.Lock()


    @retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6))
    async def call_openai(
    async_func: Callable[[Any, Any], Awaitable[Any]], params: dict
    ) -> LegacyAPIResponse:
    response: LegacyAPIResponse

    if not has_executed_once.get():
    async with first_execution_lock:
    response = await async_func(**params)
    has_executed_once.set(True)
    else:
    async with openai_lock:
    if rateLimitRemainingRequests.get() < 3:
    await asyncio.sleep(0.1 + rateLimitResetRequests.get())
    rateLimitRemainingRequests.set(rateLimitRequestsMax.get())
    if rateLimitRemainingTokens.get() < 12000:
    await asyncio.sleep(0.1 + rateLimitResetTokens.get())
    rateLimitRemainingTokens.set(rateLimitTokensMax.get())

    response = await async_func(**params)

    async with openai_lock:
    rateLimitTokensMax.set(int(response.headers["x-ratelimit-limit-tokens"]))
    rateLimitRequestsMax.set(int(response.headers["x-ratelimit-limit-requests"]))

    rateLimitRemainingRequests.set(int(response.headers["x-ratelimit-remaining-requests"]))
    rateLimitRemainingTokens.set(int(response.headers["x-ratelimit-remaining-tokens"]))

    rateLimitResetRequests.set(parse_time_left(response.headers["x-ratelimit-reset-requests"]))
    rateLimitResetTokens.set(parse_time_left(response.headers["x-ratelimit-reset-tokens"]))
    return response