Skip to content

Instantly share code, notes, and snippets.

@loopyd
Last active October 9, 2025 07:51
Show Gist options
  • Save loopyd/581977be7040ffa5796458afb3c96bd2 to your computer and use it in GitHub Desktop.
Save loopyd/581977be7040ffa5796458afb3c96bd2 to your computer and use it in GitHub Desktop.

Revisions

  1. loopyd revised this gist Nov 26, 2023. 1 changed file with 256 additions and 184 deletions.
    440 changes: 256 additions & 184 deletions civitai_scraper.py
    Original file line number Diff line number Diff line change
    @@ -1,5 +1,5 @@
    """ =======================================================================================
    civitai-scraper.py v1.0.2 deitydurg
    civitai-scraper.py v1.0.3 deitydurg
    =======================================================================================
    This script is used to scrape CivitAI (https://civitai.com) for models and creators.
    It will save the results to a json file, which can be used to bulk-download the models.
    @@ -21,15 +21,63 @@
    from pathlib import Path
    import asyncio
    from argparse import ArgumentParser
    from typing import Any, Dict, List
    from typing import Any, Dict, List, Optional
    from pydantic import BaseModel
    from aiolimiter import AsyncLimiter
    import aiohttp
    import aiofiles
    import logging
    import brotli
    import dotenv
    from colorama import Fore, Back, Style, init

    def logbuilder(logger, log_file=None, log_level=logging.INFO, colorize=False):
    def singleton(cls):
    instances = {}
    def wrapper(*args, **kwargs):
    if cls not in instances:
    instances[cls] = cls(*args, **kwargs)
    return instances[cls]
    return wrapper

    @singleton
    class AppConfig(BaseModel):
    class Config:
    arbitrary_types_allowed = True
    protected_namespaces = ()

    api_max_retries: int = 3
    api_retry_delay: int = 5
    api_retry_period: int = 180
    api_retry_limit: int = 100
    threads: int = 5
    creator_limit: int = 100
    start_page: int = -1
    save_interval: int = 60
    no_skip: bool = False
    log_level: str = "info"
    log_file: str = f"{Path(__file__).parent.absolute()}{os.sep}civitai-scraper.log"
    colorize: bool = False
    db_path: str = f"{Path(__file__).parent.absolute()}{os.sep}civitai-db.bin"
    AppConfig = AppConfig()

    @singleton
    class AppData(BaseModel):
    class Config:
    arbitrary_types_allowed = True
    protected_namespaces = ()
    creators: List[Dict] = []
    models: List[Dict] = []
    logger: Optional[logging.Logger] = None
    limiter: Optional[AsyncLimiter] = None
    creator_queue: Optional[asyncio.Queue] = None
    model_queue: Optional[asyncio.Queue] = None
    page_queue: Optional[asyncio.Queue] = None
    save_queue: Optional[asyncio.Queue] = None
    controller_complete: Optional[asyncio.Event] = None
    save_padlock: Optional[asyncio.Event] = None
    AppData = AppData()

    def log_builder(logger, log_file=None, log_level=logging.INFO, colorize=False):
    """
    Configures the provided logger with specified settings.
    @@ -98,7 +146,7 @@ def format(self, record):
    file_handler.setFormatter(file_formatter)
    logger.addHandler(file_handler)

    logger.setLevel(logging.DEBUG)
    logger.setLevel(log_level.upper())

    def log(loglevel: int, message: str) -> None:
    """
    @@ -113,15 +161,15 @@ def log(loglevel: int, message: str) -> None:
    None
    """
    log_func = {
    'info': logging.info,
    'warning': logging.warning,
    'error': logging.error,
    'critical': logging.critical,
    'debug': logging.debug
    }.get(logging.getLevelName(loglevel).lower(), logging.info)
    'INFO': AppData.logger.info,
    'WARNING': AppData.logger.warning,
    'ERROR': AppData.logger.error,
    'CRITICAL': AppData.logger.critical,
    'DEBUG': AppData.logger.debug
    }.get(str(logging.getLevelName(loglevel)).upper(), logging.info)
    log_func(msg=message, stacklevel=2)

    async def api_call(url: str, method: str = 'GET', data: Any = None, retries: int = 3, delay: int = 5) -> str:
    async def api_call(url: str, method: str = 'GET', data: Any = None) -> str:
    """
    api_call: Make an API call to the given URL.
    @@ -139,48 +187,84 @@ async def api_call(url: str, method: str = 'GET', data: Any = None, retries: int
    """

    attempt = 0
    while attempt < retries:
    while attempt < AppConfig.api_retry_limit:
    try:
    log(logging.DEBUG, f"{method} {url}")
    async with limiter:
    async with AppData.limiter:
    async with aiohttp.ClientSession() as session:
    async with session.request(method, url, data=data) as response:
    response.raise_for_status()
    return await response.text()
    except aiohttp.ClientError as e:
    log(logging.WARNING, f"Error on attempt {attempt + 1}/{retries} ({e})")
    await asyncio.sleep(delay)
    log(logging.WARNING, f"Error on attempt {attempt + 1}/{AppConfig.api_retry_limit} ({e})")
    await asyncio.sleep(AppConfig.api_retry_delay)
    attempt += 1

    log(logging.ERROR, "Failed to retrieve data after retries.")
    return None

    async def controller(page_queue: asyncio.Queue, creator_queue: asyncio.Queue, model_queue: asyncio.Queue, save_queue: asyncio.Queue, controller_complete: asyncio.Event, save_padlock: asyncio.Event, page_limit: int, start_page: int) -> None:
    async def save_db_json(creators: List[Dict], models: List[Dict], file_path: str) -> None:
    """
    save_db_json: Save the given creators and models to the given JSON file using Brotli compression.
    Args:
    creators (list of dict): The creators object to save.
    models (list of dict): The models object to save.
    file_path (str): The path to the json file.
    Returns:
    None
    """
    json_data = {'creators': creators, 'models': models}
    minified_json = json.dumps(json_data, ensure_ascii=False, separators=(',', ':'))
    compressed_data = brotli.compress(minified_json.encode('utf-8'))

    async with aiofiles.open(file_path, 'wb') as file:
    await file.write(compressed_data)
    log(logging.DEBUG, f"Saved {len(creators)} creators and {len(models)} models to {file_path}")

    async def load_db_json(file_path: str) -> (List[Dict], List[Dict]):
    """
    load_db_json: Load the JSON database file at the given path using Brotli decompression.
    Args:
    file_path (str): The path to the json file.
    Returns:
    list of dict: The creators object.
    list of dict: The models object.
    """
    if not os.path.exists(file_path):
    return [], []

    async with aiofiles.open(file_path, 'rb') as file:
    compressed_data = await file.read()
    data = json.loads(brotli.decompress(compressed_data).decode('utf-8'))

    log(logging.DEBUG, f"Loaded {len(data['creators'])} creators and {len(data['models'])} models from: {file_path}")
    return data['creators'], data['models']

    async def controller() -> None:
    """
    controller: The main controller for the CivitAI scraping system.
    Args:
    page_queue (asyncio.Queue): The queue to emit the page numbers to.
    creator_queue (asyncio.Queue): The queue to emit the creator objects to.
    model_queue (asyncio.Queue): The queue to emit the model objects to.
    save_queue (asyncio.Queue): The queue to emit the save events to.
    page_limit (int): The maximum number of pages to emit.
    start_page (int): The page of creators to start scraping from.
    None (uses AppConfig and AppData singletons)
    Returns:
    None
    """
    page = start_page
    while page < start_page + page_limit:
    await page_queue.put(page)
    page = AppConfig.start_page
    while page < AppConfig.start_page + AppConfig.creator_limit:
    await AppData.page_queue.put(page)
    log(logging.DEBUG, f"Emitted page: {page}")
    page += 1

    retries = 0
    while True:
    condition = page_queue.empty() == False or creator_queue.empty() == False or model_queue.empty() == False or save_queue.empty() == False
    condition = AppData.page_queue.empty() == False or AppData.creator_queue.empty() == False or AppData.model_queue.empty() == False or AppData.save_queue.empty() == False
    if condition == False:
    retries += 1
    if retries >= 5:
    @@ -195,29 +279,25 @@ async def controller(page_queue: asyncio.Queue, creator_queue: asyncio.Queue, mo
    await asyncio.sleep(1.0)
    continue

    controller_complete.set()
    AppData.controller_complete.set()

    async def creator_emitter(creator_queue: asyncio.Queue, creators_json: List[Dict], page_queue: asyncio.Queue, controller_complete: asyncio.Event, save_padlock: asyncio.Event, noskip: bool = False) -> None:
    async def creator_emitter() -> None:
    """
    creator_emitter: Emits creator objects to the given queue
    Args:
    creator_queue (asyncio.Queue): The queue to emit the creator objects to.
    creators_json (list of dict): The creators object to append the creators to.
    page (int, optional): The page on the user's models listing to scrape from.
    noskip (bool, optional): If True, do not skip creators that are already in the database. Defaults to False.
    None (uses AppConfig and AppData singletons)
    Returns:
    None
    """

    retries = 0
    while controller_complete.is_set() == False or page_queue.empty() == False:
    while AppData.controller_complete.is_set() == False or AppData.page_queue.empty() == False:
    try:
    await asyncio.sleep(random.uniform(0.1, 0.3))
    page = page_queue.get_nowait()
    page = AppData.page_queue.get_nowait()
    retries = 0
    except asyncio.QueueEmpty:
    retries += 1
    @@ -229,54 +309,52 @@ async def creator_emitter(creator_queue: asyncio.Queue, creators_json: List[Dict
    continue

    log(logging.DEBUG, f"Consumed page: {page}")
    creators = await api_call(f"https://civitai.com/api/v1/creators?page={page}")
    creators = await api_call(method='GET', url=f"https://civitai.com/api/v1/creators?page={page}")
    creators = json.loads(creators)
    if not creators or len(creators['items']) == 0:
    log(logging.INFO, f"No creators found on page: {page}")
    page_queue.task_done()
    AppData.page_queue.task_done()
    break

    for creator in creators.get('items', []):
    creator_username = creator['username']
    creator_link = creator['link']
    if len(creators_json) > 0:
    in_creators_json = any(c['creator'] == creator_username for c in creators_json)
    if len(AppData.creators) > 0:
    in_creators = any(c['creator'] == creator_username for c in AppData.creators)
    else:
    in_creators_json = False
    if in_creators_json and noskip == False:
    in_creators = False
    if in_creators and AppConfig.no_skip == False:
    log(logging.DEBUG, f"Skipping creator: {creator_username}")
    continue

    creator_json = {'creator': creator_username, 'link': creator_link, 'page': page}
    await creator_queue.put(creator_json)
    await save_padlock.wait()
    creators_json = list_append(creators_json, creator_json, 'creator', True)
    creator_object = {'creator': creator_username, 'link': creator_link, 'page': page}
    await AppData.creator_queue.put(creator_object)
    await AppData.save_padlock.wait()
    AppData.creators = list_append(AppData.creators, creator_object, 'creator', True)
    log(logging.DEBUG, f"Emitted creator: {creator_username}")

    page_queue.task_done()
    AppData.page_queue.task_done()

    log(logging.DEBUG, "Creator emitter thread exited.")

    async def model_emitter(creator_queue: asyncio.Queue, model_queue: asyncio.Queue, controller_complete: asyncio.Event) -> None:
    async def model_emitter() -> None:
    """
    model_emitter: Consume creator objects from the given queue and scrape their models.
    Args:
    creator_queue (asyncio.Queue): The queue to consume the creator objects from.
    model_queue (asyncio.Queue): The queue to emit the model objects to.
    controller_complete (asyncio.Event): The event to set when the generator is complete.
    None (uses AppConfig and AppData singletons)
    Returns:
    None
    """

    retries = 0
    while controller_complete.is_set() == False or creator_queue.empty() == False:
    while AppData.controller_complete.is_set() == False or AppData.creator_queue.empty() == False:
    try:
    await asyncio.sleep(random.uniform(0.1, 0.3))
    creator_json = creator_queue.get_nowait()
    creator_object = AppData.creator_queue.get_nowait()
    retries = 0
    except asyncio.QueueEmpty:
    retries += 1
    @@ -289,24 +367,24 @@ async def model_emitter(creator_queue: asyncio.Queue, model_queue: asyncio.Queue

    page = 1
    while True:
    page_models = await api_call(f"{creator_json['link']}&page={page}")
    page_models = await api_call(method='GET', url=f"{creator_object['link']}&page={page}")
    page_models = json.loads(page_models)

    if not page_models or len(page_models['items']) == 0:
    log(logging.DEBUG, f"No models found for creator: {creator_json['creator']} on page: {page}")
    log(logging.DEBUG, f"No models found for creator: {creator_object['creator']} on page: {page}")
    break

    models_added = 0
    for model_type in ["LORA", "Checkpoint", "Controlnet"]:
    filtered_models = [m["modelVersions"] for m in page_models['items'] if m['type'] == model_type]
    if len(filtered_models) == 0:
    log(logging.DEBUG, f"No models found for {creator_json['creator']} with type: {model_type}")
    log(logging.DEBUG, f"No models found for {creator_object['creator']} with type: {model_type}")
    continue
    models_to_add = [
    {
    'filename': f"{creator_json['creator']}-{model_file['name']}",
    'filename': f"{creator_object['creator']}-{model_file['name']}",
    'url': model_file['downloadUrl'],
    'creator': creator_json['creator'],
    'creator': creator_object['creator'],
    'type': model_type,
    'page': page
    }
    @@ -318,38 +396,35 @@ async def model_emitter(creator_queue: asyncio.Queue, model_queue: asyncio.Queue
    for model in models_to_add:
    models_added += 1
    log(logging.DEBUG, f"Emitted model: {model['filename']}")
    await model_queue.put(model)
    await AppData.model_queue.put(model)

    if models_added == 0:
    log(logging.DEBUG, f"No models emitted for creator: {creator_json['creator']}")
    log(logging.DEBUG, f"No models emitted for creator: {creator_object['creator']}")
    break

    page += 1

    creator_queue.task_done()
    AppData.creator_queue.task_done()

    log(logging.DEBUG, "Model emitter thread exited.")

    async def model_consumer(model_queue: asyncio.Queue, models_json: List[Dict], controller_complete: asyncio.Event, save_padlock: asyncio.Event ) -> None:
    async def model_consumer() -> None:
    """
    model_consumer: Consume model objects from the given queue and add them to the given json string.
    Args:
    model_queue (asyncio.Queue): The queue to consume the model objects from.
    models_json (list of dicts): The models object to append the models to.
    controller_complete (asyncio.Event): The event to set when the generator is complete.
    None (uses AppConfig and AppData singletons)
    Returns:
    str: The json string with the models appended.
    """
    retries = 0
    while controller_complete.is_set() == False or model_queue.empty() == False:
    while AppData.controller_complete.is_set() == False or AppData.model_queue.empty() == False:
    try:
    await asyncio.sleep(random.uniform(0.1, 0.3))
    model_json = model_queue.get_nowait()
    model_object = AppData.model_queue.get_nowait()
    retries = 0
    except asyncio.QueueEmpty:
    retries += 1
    @@ -360,46 +435,43 @@ async def model_consumer(model_queue: asyncio.Queue, models_json: List[Dict], co
    log(logging.DEBUG, "Model consumer thread is idling.")
    continue

    model_filename = model_json['filename']
    await save_padlock.wait()
    models_json = list_append(models_json, model_json, 'filename', True)
    model_filename = model_object['filename']
    await AppData.save_padlock.wait()
    AppData.models = list_append(AppData.models, model_object, 'filename', True)
    log(logging.INFO, f"Processed model: {model_filename}")
    model_queue.task_done()

    AppData.model_queue.task_done()
    log(logging.DEBUG, "Model consumer thread exited.")

    async def save_db_emitter(creators_json: List[Dict], models_json: List[Dict], file_path: str, save_queue: asyncio.Queue, controller_complete: asyncio.Event, save_padlock: asyncio.Event, save_interval: int = 60) -> None:
    async def save_db_emitter() -> None:
    """
    save_db_emitter: Emit save events to the given queue.
    Args:
    creators_json (list of dict): The creators object to save.
    models_json (list of dict): The models object to save.
    file_path (str): The path to the database file.
    save_queue (asyncio.Queue): The queue to emit the save events to.
    controller_complete (asyncio.Event): The event to set when the generator is complete.
    save_padlock (asyncio.Event): The event to set when the save event is emitted. Await this event before saving anything to the in-memory database.
    save_interval (int, optional): The number of seconds to wait between save events. Defaults to 60.
    None (uses AppConfig and AppData singletons)
    Returns:
    None
    """
    retries = 0
    while controller_complete.is_set() == False:
    while AppData.controller_complete.is_set() == False:
    retries += 1
    if retries >= save_interval:
    if retries >= AppConfig.save_interval:
    retries = 0
    save_padlock.clear()
    data = {'creators': creators_json, 'models': models_json, 'file_path': file_path}
    await save_queue.put(data)
    AppData.save_padlock.clear()
    data = {'creators': AppData.creators, 'models': AppData.models, 'file_path': AppConfig.db_path}
    await AppData.save_queue.put(data)
    log(logging.DEBUG, f"Emitted save event")
    save_padlock.set()
    AppData.save_padlock.set()
    await asyncio.sleep(1.0)

    log(logging.DEBUG, f"Emitted final save event")
    data = {'creators': creators_json, 'models': models_json, 'file_path': file_path}
    await save_queue.put(data)
    data = {'creators': AppData.creators, 'models': AppData.models, 'file_path': AppConfig.db_path}
    await AppData.save_queue.put(data)
    log(logging.DEBUG, "Save emitter thread exited.")

    async def save_db_consumer(save_queue: asyncio.Queue, controller_complete: asyncio.Event, save_interval: int = 60) -> None:
    async def save_db_consumer() -> None:
    """
    save_db_consumer: Consume save events from the given queue.
    @@ -408,14 +480,14 @@ async def save_db_consumer(save_queue: asyncio.Queue, controller_complete: async
    save_queue (asyncio.Queue): The queue to consume the save events from.
    """
    retries = 0
    while controller_complete.is_set() == False or save_queue.empty() == False:
    while AppData.controller_complete.is_set() == False or AppData.save_queue.empty() == False:
    try:
    await asyncio.sleep(random.uniform(0.1, 0.3))
    data = save_queue.get_nowait()
    data = AppData.save_queue.get_nowait()
    retries = 0
    except asyncio.QueueEmpty:
    retries += 1
    if retries >= save_interval + 1:
    if retries >= AppConfig.save_interval + 1:
    log(logging.DEBUG, "Save consumer thread exiting.")
    break
    await asyncio.sleep(1.0)
    @@ -426,50 +498,10 @@ async def save_db_consumer(save_queue: asyncio.Queue, controller_complete: async
    models_json = data['models']
    file_path = data['file_path']
    await save_db_json(creators_json, models_json, file_path)
    save_queue.task_done()
    AppData.save_queue.task_done()

    log(logging.DEBUG, "Save consumer thread exited.")

    async def save_db_json(creators: List[Dict], models: List[Dict], file_path: str) -> None:
    """
    save_db_json: Save the given creators and models to the given JSON file using Brotli compression.
    Args:
    creators (list of dict): The creators object to save.
    models (list of dict): The models object to save.
    file_path (str): The path to the json file.
    Returns:
    None
    """
    json_data = {'creators': creators, 'models': models}
    minified_json = json.dumps(json_data, ensure_ascii=False, separators=(',', ':'))
    compressed_data = brotli.compress(minified_json.encode('utf-8'))

    async with aiofiles.open(file_path, 'wb') as file:
    await file.write(compressed_data)
    log(logging.DEBUG, f"Saved {len(creators)} creators and {len(models)} models to {file_path}")

    async def load_db_json(file_path: str) -> (List[Dict], List[Dict]):
    """
    load_db_json: Load the JSON database file at the given path using Brotli decompression.
    Args:
    file_path (str): The path to the json file.
    Returns:
    list of dict: The creators object.
    list of dict: The models object.
    """
    if not os.path.exists(file_path):
    return [], []

    async with aiofiles.open(file_path, 'rb') as file:
    compressed_data = await file.read()
    data = json.loads(brotli.decompress(compressed_data).decode('utf-8'))

    log(logging.DEBUG, f"Loaded {len(data['creators'])} creators and {len(data['models'])} models from: {file_path}")
    return data['creators'], data['models']
    def last_key(data: List[Dict], key: str, default: Any = None) -> Any:
    """
    last_key: Get the last value of the given key in the given json data.
    @@ -515,14 +547,28 @@ def list_append(data: List[Dict], new_item: Dict, unique_key: str = None, update

    return data

    async def main() -> None:
    async def init_app() -> None:
    """
    main: The main function.
    Returns:
    None
    init_app: Load the configuration from the .env file, if it exists, and then from the command line arguments. Command line arguments take precedence.
    """

    # Pass-through the environment variables from the .env file to the AppConfig singleton.
    dotenv.load_dotenv()
    AppConfig.api_max_retries = int(os.getenv("RETRIES", AppConfig.api_max_retries))
    AppConfig.api_retry_delay = int(os.getenv("RETRY_DELAY", AppConfig.api_retry_delay))
    AppConfig.api_retry_period = int(os.getenv("API_PERIOD", AppConfig.api_retry_period))
    AppConfig.api_retry_limit = int(os.getenv("API_RETRY_LIMIT", AppConfig.api_retry_limit))
    AppConfig.threads = int(os.getenv("THREADS", AppConfig.threads))
    AppConfig.creator_limit = int(os.getenv("CREATOR_LIMIT", AppConfig.creator_limit))
    AppConfig.start_page = int(os.getenv("START_PAGE", AppConfig.start_page))
    AppConfig.save_interval = int(os.getenv("SAVE_INTERVAL", AppConfig.save_interval))
    AppConfig.no_skip = bool(os.getenv("NO_SKIP", AppConfig.no_skip))
    AppConfig.log_level = os.getenv("LOG_LEVEL", AppConfig.log_level)
    AppConfig.log_file = os.getenv("LOG_FILE", AppConfig.log_file)
    AppConfig.colorize = bool(os.getenv("COLORIZE", AppConfig.colorize))
    AppConfig.db_path = os.getenv("DB", AppConfig.db_path)

    # Parse the command line arguments with argparse.
    parser = ArgumentParser(
    prog="civitai_scraper.py",
    description="Scrape CivitAI for models and creators.",
    @@ -531,81 +577,107 @@ async def main() -> None:
    epilog="Tool created by: deitydurg | If any questions, ask on Discord for assistance.")

    logging_group = parser.add_argument_group("Logging")
    logging_group.add_argument('-x', '--log-level', type=str, default="info", dest="loglevel", choices=["info", "warning", "error", "critical", "debug"], help="The logging level to use. If you want to see debug messages, set this to 'debug'.")
    logging_group.add_argument('-y', '--log-file', type=str, default=f"{Path(__file__).parent.absolute()}{os.sep}civitai-scraper.log", dest="logfile", help="The path to the log file where logs will be saved.")
    logging_group.add_argument('-x', '--log-level', type=str, dest="loglevel", nargs='?', default=None, choices=["info", "warning", "error", "critical", "debug", None], help="The logging level to use. If you want to see debug messages, set this to 'debug'.")
    logging_group.add_argument('-y', '--log-file', type=str, dest="logfile", nargs='?', default=None, help="The path to the log file where logs will be saved.")
    logging_group.add_argument('-z', '--no-color', action="store_false", dest="colorize", help="If specified, do not colorize the log output.")
    ratelimit_group = parser.add_argument_group("Rate Limits & Performance")
    ratelimit_group.add_argument('-p', '--api-period', type=int, default=180, dest="apiperiod", help="The period of time to limit API calls (in seconds). WARNING: Setting this value too low may result in a ban from Civiarti API, or you being temporarily ratelimited.")
    ratelimit_group.add_argument('-l', '--api-limit', type=int, default=100, dest="apilimit", help="The number of API calls to allow per period. WARNING: Setting this value too low may result in a ban from Civiarti API, or you being temporarily ratelimited.")
    ratelimit_group.add_argument('-t', '--threads', type=int, default=5, dest="threads", help="The maximum number of concurrent/asynchronous threads to run. This can help out with entering retries for making too many requests at once, but will slow down the tool. If you are seeing retry messages often, try lowering this value from its default of 5.")
    ratelimit_group.add_argument('-p', '--api-period', type=int, nargs='?', default=None, dest="apiperiod", help="The period of time to limit API calls (in seconds). WARNING: Setting this value too low may result in a ban from Civiarti API, or you being temporarily ratelimited.")
    ratelimit_group.add_argument('-l', '--api-limit', type=int, nargs='?', default=None, dest="apilimit", help="The number of API calls to allow per period. WARNING: Setting this value too low may result in a ban from Civiarti API, or you being temporarily ratelimited.")
    ratelimit_group.add_argument('-t', '--threads', type=int, nargs='?', default=None, dest="threads", help="The maximum number of concurrent/asynchronous threads to run. This can help out with entering retries for making too many requests at once, but will slow down the tool. If you are seeing retry messages often, try lowering this value from its default of 5.")
    ratelimit_group.add_argument('-r', '--retry-delay', type=int, nargs='?', default=None, dest="retrydelay", help="The number of seconds to wait between retries. This can help out with crashes due to too many requests retrying.")
    ratelimit_group.add_argument('-k', '--retry-limit', type=int, nargs='?', default=None, dest="retrylimit", help="The number of times to retry a request before giving up. This can help out with crashes due to too many requests retrying. Set this value high to ensure reliability.")
    scraping_group = parser.add_argument_group("Scraping Options")
    scraping_group.add_argument('-c', '--creator-limit', type=int, default=100, dest="creatorlimit", help="The maximum number of creators to scrape.")
    scraping_group.add_argument('-s', '--start-page', type=int, default=-1, dest="startpage", help="The page of creators to start scraping from. You can use this to resume a previous scraping session. If this is set to -1, it will start from the last page scraped.")
    scraping_group.add_argument('-n', '--no-skip', action="store_true", dest="noskip", help="Do not skip creators that are already in the database. This will cause the tool to scrape all encountered creators, even if they are already in the database -- updating their models.")
    scraping_group.add_argument('-c', '--creator-limit', type=int, nargs='?', default=None, dest="creatorlimit", help="The maximum number of creators to scrape.")
    scraping_group.add_argument('-s', '--start-page', type=int, nargs='?', default=None, dest="startpage", help="The page of creators to start scraping from. You can use this to resume a previous scraping session. If this is set to -1, it will start from the last page scraped.")
    scraping_group.add_argument('-n', '--no-skip', action="store_true", dest="noskip", default=None, help="Do not skip creators that are already in the database. This will cause the tool to scrape all encountered creators, even if they are already in the database -- updating their models.")
    database_group = parser.add_argument_group("Database Options")
    database_group.add_argument('-j', '--json', type=str, default=f"{Path(__file__).parent.absolute()}{os.sep}civitai-db.bin", dest="db", help="The path to the json file used as the database. If the file does not exist, it will be created.")
    database_group.add_argument('-i', '--save-interval', type=int, default=60, dest="saveinterval", help="The number of seconds to wait between saving the database to disk. This can help with performance, but setting it too low may result in data loss.")
    database_group.add_argument('-j', '--json', type=str, nargs='?', default=None, dest="db",help="The path to the json file used as the database. If the file does not exist, it will be created.")
    database_group.add_argument('-i', '--save-interval', type=int, nargs='?', default=None, dest="saveinterval", help="The number of seconds to wait between saving the database to disk. This can help with performance, but setting it too low may result in data loss.")
    misc_group = parser.add_argument_group("Miscellaneous")
    misc_group.add_argument('-v', '--version', action="version", version="%(prog)s 1.0.1", help="Show the version of this tool.")
    misc_group.add_argument('-v', '--version', action="version", version="%(prog)s 1.0.3", help="Show the version of this tool.")
    misc_group.add_argument('-h', '--help', action="help", help="Show this help message and exit.")
    argv = parser.parse_args()

    logger = logging.root
    logbuilder(logger, argv.logfile, argv.loglevel, argv.colorize)
    argv.creatorlimit = max(1, argv.creatorlimit)
    log(logging.DEBUG, f"Scraping up to {argv.creatorlimit} creators from Civitai")
    global limiter
    limiter = AsyncLimiter(argv.apiperiod, argv.apilimit)
    log(logging.DEBUG, f"Set API rate limit to {argv.apilimit} calls per {argv.apiperiod} second(s).")

    logging.getLogger().setLevel(argv.loglevel.upper())
    logging.getLogger().addHandler(logging.FileHandler(argv.logfile))

    no_skip = argv.noskip
    if no_skip == False:
    # Pass-through the command line arguments to the AppConfig singleton.
    AppConfig.api_max_retries = AppConfig.api_max_retries if argv.retrylimit == None else argv.retrylimit
    AppConfig.api_retry_delay = AppConfig.api_retry_delay if argv.retrydelay == None else argv.retrydelay
    AppConfig.api_retry_period = AppConfig.api_retry_period if argv.apiperiod == None else argv.apiperiod
    AppConfig.api_retry_limit = AppConfig.api_retry_limit if argv.apilimit == None else argv.apilimit
    AppConfig.threads = AppConfig.threads if argv.threads == None else argv.threads
    AppConfig.creator_limit = AppConfig.creator_limit if argv.creatorlimit == None else argv.creatorlimit
    AppConfig.start_page = AppConfig.start_page if argv.startpage == None else argv.startpage
    AppConfig.save_interval = AppConfig.save_interval if argv.saveinterval == None else argv.saveinterval
    AppConfig.no_skip = AppConfig.no_skip if argv.noskip == None else argv.noskip
    AppConfig.log_level = AppConfig.log_level if argv.loglevel == None else argv.loglevel
    AppConfig.log_file = AppConfig.log_file if argv.logfile == None else argv.logfile
    AppConfig.colorize = AppConfig.colorize if argv.colorize == None else argv.colorize
    AppConfig.db_path = AppConfig.db_path if argv.db == None else argv.db

    # Configure the logging system with the new settings.
    AppData.logger = logging.root
    log_builder(AppData.logger, log_file=AppConfig.log_file, log_level=AppConfig.log_level, colorize=AppConfig.colorize)

    # Adjust the AppConfig singleton to ensure that the values are valid.
    AppConfig.creator_limit = max(1, AppConfig.creator_limit)
    AppData.limiter = AsyncLimiter(AppConfig.api_retry_period, AppConfig.api_retry_limit)

    # Load the database from disk.
    AppData.creators, AppData.models = await load_db_json(AppConfig.db_path)
    AppConfig.start_page = AppConfig.start_page if AppConfig.start_page != -1 else (last_key(AppData.creators, 'page', 1) + 1 if len(AppData.creators) > 0 and last_key(AppData.creators, 'page', 1) == 1 else last_key(AppData.creators, 'page', 1) + 1)

    # Log the configuration.
    if AppConfig.no_skip == False:
    log(logging.DEBUG, "Skipping creators that are already in the database.")
    else:
    log(logging.DEBUG, "Not skipping creators that are already in the database.")

    log(logging.DEBUG, f"Starting from page: {AppConfig.start_page }")
    pass

    async def main() -> None:
    """
    main: The main function.
    Returns:
    None
    """
    await init_app()

    ###############################
    # Start of the main program...#
    ###############################

    creators_json, models_json = await load_db_json(argv.db)

    startpage = argv.startpage if argv.startpage != -1 else (last_key(creators_json, 'page', 1) + 1 if len(creators_json) > 0 and last_key(creators_json, 'page', 1) == 1 else last_key(creators_json, 'page', 1) + 1)
    log(logging.DEBUG, f"Starting from page: {startpage}")

    controller_complete = asyncio.Event()
    save_padlock = asyncio.Event()
    creator_queue = asyncio.Queue(argv.threads)
    model_queue = asyncio.Queue(argv.threads)
    page_queue = asyncio.Queue(argv.threads)
    save_queue = asyncio.Queue(1)
    AppData.controller_complete = asyncio.Event()
    AppData.save_padlock = asyncio.Event()
    AppData.creator_queue = asyncio.Queue(AppConfig.threads)
    AppData.model_queue = asyncio.Queue(AppConfig.threads)
    AppData.page_queue = asyncio.Queue(AppConfig.threads)
    AppData.save_queue = asyncio.Queue(1)

    save_padlock.set()
    AppData.save_padlock.set()

    tasks = [
    *[
    asyncio.create_task(controller(page_queue, creator_queue, model_queue, save_queue, controller_complete, save_padlock, argv.creatorlimit, startpage))
    asyncio.create_task(controller())
    ],
    *[
    asyncio.create_task(creator_emitter(creator_queue, creators_json, page_queue, controller_complete, save_padlock, no_skip))
    for _ in range(argv.threads)
    asyncio.create_task(creator_emitter())
    for _ in range(AppConfig.threads)
    ],
    *[
    asyncio.create_task(model_emitter(creator_queue, model_queue, controller_complete))
    for _ in range(argv.threads)
    asyncio.create_task(model_emitter())
    for _ in range(AppConfig.threads)
    ],
    *[
    asyncio.create_task(model_consumer(model_queue, models_json, controller_complete, save_padlock))
    for _ in range(argv.threads)
    asyncio.create_task(model_consumer())
    for _ in range(AppConfig.threads)
    ],
    *[
    asyncio.create_task(save_db_emitter(creators_json, models_json, argv.db, save_queue, controller_complete, save_padlock, argv.saveinterval))
    asyncio.create_task(save_db_emitter())
    ],
    *[
    asyncio.create_task(save_db_consumer(save_queue, controller_complete, argv.saveinterval))
    asyncio.create_task(save_db_consumer())
    ]
    ]

  2. loopyd revised this gist Nov 25, 2023. 1 changed file with 107 additions and 41 deletions.
    148 changes: 107 additions & 41 deletions civitai_scraper.py
    Original file line number Diff line number Diff line change
    @@ -1,5 +1,5 @@
    """ =======================================================================================
    civitai-scraper.py v1.0.1 deitydurg
    civitai-scraper.py v1.0.2 deitydurg
    =======================================================================================
    This script is used to scrape CivitAI (https://civitai.com) for models and creators.
    It will save the results to a json file, which can be used to bulk-download the models.
    @@ -26,15 +26,79 @@
    import aiohttp
    import aiofiles
    import logging
    import brotli
    from colorama import Fore, Back, Style, init

    logging.basicConfig(
    level=logging.DEBUG,
    format='[%(levelname)s] [%(asctime)s.%(msecs)03d] %(message)s',
    datefmt='%Y/%m/%d %H:%M:%S',
    handlers=[
    logging.StreamHandler(stream=sys.stdout)
    ]
    )
    def logbuilder(logger, log_file=None, log_level=logging.INFO, colorize=False):
    """
    Configures the provided logger with specified settings.
    Args:
    logger: The logging object to configure.
    log_file (str, optional): The path to the log file. No file logging if None.
    log_level (int, optional): The logging level to use.
    colorize (bool): If True, colorizes the log output.
    """
    class ColorizedFormatter(logging.Formatter):
    def format(self, record):
    levelname = record.levelname
    if colorize:
    if levelname == "DEBUG":
    levelname_color = Fore.WHITE + Style.BRIGHT
    func_color = Fore.LIGHTBLACK_EX + Style.BRIGHT
    time_color = Fore.LIGHTBLACK_EX
    seperator_color = Fore.WHITE
    elif levelname == "INFO":
    levelname_color = Fore.LIGHTBLUE_EX + Style.BRIGHT
    func_color = Fore.BLUE + Style.BRIGHT
    time_color = Fore.BLUE
    seperator_color = Fore.BLUE + Style.BRIGHT
    elif levelname == "WARNING":
    levelname_color = Fore.LIGHTYELLOW_EX + Style.BRIGHT
    func_color = Fore.YELLOW + Style.BRIGHT
    time_color = Fore.YELLOW
    seperator_color = Fore.YELLOW + Style.BRIGHT
    elif levelname == "ERROR":
    levelname_color = Fore.LIGHTRED_EX + Style.BRIGHT
    func_color = Fore.RED + Style.BRIGHT
    time_color = Fore.RED
    seperator_color = Fore.RED + Style.BRIGHT
    elif levelname == "CRITICAL":
    levelname_color = Fore.WHITE + Style.BRIGHT + Back.RED
    func_color = Fore.RED + Style.BRIGHT
    time_color = Fore.RED
    seperator_color = Fore.RED + Style.BRIGHT

    record.levelname = f"{seperator_color}[{Style.RESET_ALL}{levelname_color}{record.levelname}{Style.RESET_ALL}{seperator_color}]{Style.RESET_ALL}"
    record.funcName = f"{seperator_color}[{Style.RESET_ALL}{func_color}{record.funcName}{Style.RESET_ALL}{seperator_color}]{Style.RESET_ALL}"
    record.msecs = f"{seperator_color}[{Style.RESET_ALL}{time_color}{self.formatTime(record, self.datefmt)}.{int(record.msecs):03d}{Style.RESET_ALL}{seperator_color}]{Style.RESET_ALL}"
    else:
    record.levelname = f"[{record.levelname}]"
    record.funcName = f"[{record.funcName}]"
    record.msecs = f"[{self.formatTime(record, self.datefmt)}.{int(record.msecs):03d}]"

    return super().format(record)

    if colorize:
    init(autoreset=True)

    formatter = ColorizedFormatter(fmt="%(levelname)s %(msecs)s %(funcName)s: %(message)s",
    datefmt="%Y/%m/%d %H:%M:%S")

    logger.handlers.clear()

    console_handler = logging.StreamHandler(stream=sys.stdout)
    console_handler.setFormatter(formatter)
    logger.addHandler(console_handler)

    if log_file:
    file_handler = logging.FileHandler(log_file)
    file_formatter = logging.Formatter("[%(levelname)s] [%(msecs)s] [%(funcName)s]: %(message)s",
    "%Y-%m-%d %H:%M:%S")
    file_handler.setFormatter(file_formatter)
    logger.addHandler(file_handler)

    logger.setLevel(logging.DEBUG)

    def log(loglevel: int, message: str) -> None:
    """
    @@ -48,16 +112,14 @@ def log(loglevel: int, message: str) -> None:
    None
    """
    func = sys._getframe(1).f_code.co_name
    message = f"[{func}]: {message}"
    log_func = {
    'info': logging.info,
    'warning': logging.warning,
    'error': logging.error,
    'critical': logging.critical,
    'debug': logging.debug
    }.get(logging.getLevelName(loglevel).lower(), logging.info)
    log_func(message)
    log_func(msg=message, stacklevel=2)

    async def api_call(url: str, method: str = 'GET', data: Any = None, retries: int = 3, delay: int = 5) -> str:
    """
    @@ -368,45 +430,46 @@ async def save_db_consumer(save_queue: asyncio.Queue, controller_complete: async

    log(logging.DEBUG, "Save consumer thread exited.")

    async def load_db_json(file_path: str) -> (List[Dict], List[Dict]):
    async def save_db_json(creators: List[Dict], models: List[Dict], file_path: str) -> None:
    """
    load_db_json: Load the JSON database file at the given path.
    save_db_json: Save the given creators and models to the given JSON file using Brotli compression.
    Args:
    creators (list of dict): The creators object to save.
    models (list of dict): The models object to save.
    file_path (str): The path to the json file.
    Returns:
    list of dict: The creators object.
    list of dict: The models object.
    None
    """
    if not os.path.exists(file_path):
    return [], []
    async with aiofiles.open(file_path, 'r', encoding="utf-8") as file:
    data = json.loads(await file.read())
    log(logging.DEBUG, f"Loaded {len(data['creators'])} creators and {len(data['models'])} models from: {file_path}")
    return data['creators'], data['models']
    json_data = {'creators': creators, 'models': models}
    minified_json = json.dumps(json_data, ensure_ascii=False, separators=(',', ':'))
    compressed_data = brotli.compress(minified_json.encode('utf-8'))

    async with aiofiles.open(file_path, 'wb') as file:
    await file.write(compressed_data)
    log(logging.DEBUG, f"Saved {len(creators)} creators and {len(models)} models to {file_path}")

    async def save_db_json(creators: List[Dict], models: List[Dict], file_path: str) -> None:
    async def load_db_json(file_path: str) -> (List[Dict], List[Dict]):
    """
    save_db_json: Save the given creators and models to the given JSON file.
    load_db_json: Load the JSON database file at the given path using Brotli decompression.
    Args:
    creators (list of dict): The creators object to save.
    models (list of dict): The models object to save.
    file_path (str): The path to the json file.
    Returns:
    None
    list of dict: The creators object.
    list of dict: The models object.
    """
    json_data = {'creators': creators, 'models': models}
    async with aiofiles.open(file_path, 'w', encoding="utf-8") as file:
    await file.write(json.dumps(json_data, ensure_ascii=False, indent=4))
    log(logging.DEBUG, f"Saved {len(creators)} creators and {len(models)} models to {file_path}")
    if not os.path.exists(file_path):
    return [], []

    async with aiofiles.open(file_path, 'rb') as file:
    compressed_data = await file.read()
    data = json.loads(brotli.decompress(compressed_data).decode('utf-8'))

    log(logging.DEBUG, f"Loaded {len(data['creators'])} creators and {len(data['models'])} models from: {file_path}")
    return data['creators'], data['models']
    def last_key(data: List[Dict], key: str, default: Any = None) -> Any:
    """
    last_key: Get the last value of the given key in the given json data.
    @@ -469,7 +532,8 @@ async def main() -> None:

    logging_group = parser.add_argument_group("Logging")
    logging_group.add_argument('-x', '--log-level', type=str, default="info", dest="loglevel", choices=["info", "warning", "error", "critical", "debug"], help="The logging level to use. If you want to see debug messages, set this to 'debug'.")
    logging_group.add_argument('-y', '--log-file', type=str, default=f"{Path(__file__).parent.absolute()}{os.sep}civarti-scraper.log", dest="logfile", help="The path to the log file where logs will be saved.")
    logging_group.add_argument('-y', '--log-file', type=str, default=f"{Path(__file__).parent.absolute()}{os.sep}civitai-scraper.log", dest="logfile", help="The path to the log file where logs will be saved.")
    logging_group.add_argument('-z', '--no-color', action="store_false", dest="colorize", help="If specified, do not colorize the log output.")
    ratelimit_group = parser.add_argument_group("Rate Limits & Performance")
    ratelimit_group.add_argument('-p', '--api-period', type=int, default=180, dest="apiperiod", help="The period of time to limit API calls (in seconds). WARNING: Setting this value too low may result in a ban from Civiarti API, or you being temporarily ratelimited.")
    ratelimit_group.add_argument('-l', '--api-limit', type=int, default=100, dest="apilimit", help="The number of API calls to allow per period. WARNING: Setting this value too low may result in a ban from Civiarti API, or you being temporarily ratelimited.")
    @@ -479,13 +543,15 @@ async def main() -> None:
    scraping_group.add_argument('-s', '--start-page', type=int, default=-1, dest="startpage", help="The page of creators to start scraping from. You can use this to resume a previous scraping session. If this is set to -1, it will start from the last page scraped.")
    scraping_group.add_argument('-n', '--no-skip', action="store_true", dest="noskip", help="Do not skip creators that are already in the database. This will cause the tool to scrape all encountered creators, even if they are already in the database -- updating their models.")
    database_group = parser.add_argument_group("Database Options")
    database_group.add_argument('-j', '--json', type=str, default=f"{Path(__file__).parent.absolute()}{os.sep}civarti-db.json", dest="db", help="The path to the json file used as the database. If the file does not exist, it will be created.")
    database_group.add_argument('-j', '--json', type=str, default=f"{Path(__file__).parent.absolute()}{os.sep}civitai-db.bin", dest="db", help="The path to the json file used as the database. If the file does not exist, it will be created.")
    database_group.add_argument('-i', '--save-interval', type=int, default=60, dest="saveinterval", help="The number of seconds to wait between saving the database to disk. This can help with performance, but setting it too low may result in data loss.")
    misc_group = parser.add_argument_group("Miscellaneous")
    misc_group.add_argument('-v', '--version', action="version", version="%(prog)s 1.0.1", help="Show the version of this tool.")
    misc_group.add_argument('-h', '--help', action="help", help="Show this help message and exit.")
    argv = parser.parse_args()

    logger = logging.root
    logbuilder(logger, argv.logfile, argv.loglevel, argv.colorize)
    argv.creatorlimit = max(1, argv.creatorlimit)
    log(logging.DEBUG, f"Scraping up to {argv.creatorlimit} creators from Civitai")
    global limiter
  3. loopyd revised this gist Nov 25, 2023. 1 changed file with 2 additions and 2 deletions.
    4 changes: 2 additions & 2 deletions civitai_scraper.py
    Original file line number Diff line number Diff line change
    @@ -231,14 +231,14 @@ async def model_emitter(creator_queue: asyncio.Queue, model_queue: asyncio.Queue
    page_models = json.loads(page_models)

    if not page_models or len(page_models['items']) == 0:
    log(logging.INFO, f"No models found for creator: {creator_json['creator']}")
    log(logging.DEBUG, f"No models found for creator: {creator_json['creator']} on page: {page}")
    break

    models_added = 0
    for model_type in ["LORA", "Checkpoint", "Controlnet"]:
    filtered_models = [m["modelVersions"] for m in page_models['items'] if m['type'] == model_type]
    if len(filtered_models) == 0:
    log(logging.INFO, f"No models found for {creator_json['creator']} with type: {model_type}")
    log(logging.DEBUG, f"No models found for {creator_json['creator']} with type: {model_type}")
    continue
    models_to_add = [
    {
  4. loopyd revised this gist Nov 25, 2023. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion civitai_scraper.py
    Original file line number Diff line number Diff line change
    @@ -166,7 +166,7 @@ async def creator_emitter(creator_queue: asyncio.Queue, creators_json: List[Dict
    log(logging.DEBUG, "Creator emitter thread is idling.")
    continue

    log(logging.INFO, f"Consumed page: {page}")
    log(logging.DEBUG, f"Consumed page: {page}")
    creators = await api_call(f"https://civitai.com/api/v1/creators?page={page}")
    creators = json.loads(creators)
    if not creators or len(creators['items']) == 0:
  5. loopyd revised this gist Nov 25, 2023. 1 changed file with 2 additions and 2 deletions.
    4 changes: 2 additions & 2 deletions civitai_scraper.py
    Original file line number Diff line number Diff line change
    @@ -1,5 +1,5 @@
    """ =======================================================================================
    civitai-scraper.py v1.0.0 deitydurg
    civitai-scraper.py v1.0.1 deitydurg
    =======================================================================================
    This script is used to scrape CivitAI (https://civitai.com) for models and creators.
    It will save the results to a json file, which can be used to bulk-download the models.
    @@ -482,7 +482,7 @@ async def main() -> None:
    database_group.add_argument('-j', '--json', type=str, default=f"{Path(__file__).parent.absolute()}{os.sep}civarti-db.json", dest="db", help="The path to the json file used as the database. If the file does not exist, it will be created.")
    database_group.add_argument('-i', '--save-interval', type=int, default=60, dest="saveinterval", help="The number of seconds to wait between saving the database to disk. This can help with performance, but setting it too low may result in data loss.")
    misc_group = parser.add_argument_group("Miscellaneous")
    misc_group.add_argument('-v', '--version', action="version", version="%(prog)s 1.0.0", help="Show the version of this tool.")
    misc_group.add_argument('-v', '--version', action="version", version="%(prog)s 1.0.1", help="Show the version of this tool.")
    misc_group.add_argument('-h', '--help', action="help", help="Show this help message and exit.")
    argv = parser.parse_args()

  6. loopyd revised this gist Nov 25, 2023. 1 changed file with 102 additions and 37 deletions.
    139 changes: 102 additions & 37 deletions civitai_scraper.py
    Original file line number Diff line number Diff line change
    @@ -1,5 +1,5 @@
    """ =======================================================================================
    civitai-scraper.py v1.0.1 deitydurg
    civitai-scraper.py v1.0.0 deitydurg
    =======================================================================================
    This script is used to scrape CivitAI (https://civitai.com) for models and creators.
    It will save the results to a json file, which can be used to bulk-download the models.
    @@ -17,6 +17,7 @@
    """
    import json
    import os, sys
    import random
    from pathlib import Path
    import asyncio
    from argparse import ArgumentParser
    @@ -92,13 +93,15 @@ async def api_call(url: str, method: str = 'GET', data: Any = None, retries: int
    log(logging.ERROR, "Failed to retrieve data after retries.")
    return None

    async def controller(page_queue: asyncio.Queue, controller_complete: asyncio.Event, save_padlock: asyncio.Event, page_limit: int, start_page: int) -> None:
    async def controller(page_queue: asyncio.Queue, creator_queue: asyncio.Queue, model_queue: asyncio.Queue, save_queue: asyncio.Queue, controller_complete: asyncio.Event, save_padlock: asyncio.Event, page_limit: int, start_page: int) -> None:
    """
    controller: The main controller for the CivitAI scraping system.
    Args:
    page_queue (asyncio.Queue): The queue to emit the page numbers to.
    creator_queue (asyncio.Queue): The queue to emit the creator objects to.
    model_queue (asyncio.Queue): The queue to emit the model objects to.
    save_queue (asyncio.Queue): The queue to emit the save events to.
    page_limit (int): The maximum number of pages to emit.
    start_page (int): The page of creators to start scraping from.
    @@ -108,20 +111,30 @@ async def controller(page_queue: asyncio.Queue, controller_complete: asyncio.Eve
    None
    """
    page = start_page
    while page <= start_page + page_limit:
    while page < start_page + page_limit:
    await page_queue.put(page)
    log(logging.DEBUG, f"Emitted page: {page}")
    page += 1

    while not page_queue.empty():
    await asyncio.sleep(1)

    while not save_padlock.is_set():
    await asyncio.sleep(1)

    retries = 0
    while True:
    condition = page_queue.empty() == False or creator_queue.empty() == False or model_queue.empty() == False or save_queue.empty() == False
    if condition == False:
    retries += 1
    if retries >= 5:
    log(logging.DEBUG, "Controller thread exiting.")
    break
    log(logging.DEBUG, "Controller thread is idling.")
    await asyncio.sleep(1.0)
    continue
    else:
    retries = 0
    log(logging.DEBUG, "Controller thread detects work.")
    await asyncio.sleep(1.0)
    continue

    controller_complete.set()
    log(logging.DEBUG, "Controller thread exited.")


    async def creator_emitter(creator_queue: asyncio.Queue, creators_json: List[Dict], page_queue: asyncio.Queue, controller_complete: asyncio.Event, save_padlock: asyncio.Event, noskip: bool = False) -> None:
    """
    creator_emitter: Emits creator objects to the given queue
    @@ -138,14 +151,28 @@ async def creator_emitter(creator_queue: asyncio.Queue, creators_json: List[Dict
    None
    """

    while controller_complete.is_set() == False or not page_queue.empty():
    page = await page_queue.get()
    retries = 0
    while controller_complete.is_set() == False or page_queue.empty() == False:
    try:
    await asyncio.sleep(random.uniform(0.1, 0.3))
    page = page_queue.get_nowait()
    retries = 0
    except asyncio.QueueEmpty:
    retries += 1
    if retries >= 60:
    log(logging.DEBUG, "Creator emitter thread exiting.")
    break
    await asyncio.sleep(1.0)
    log(logging.DEBUG, "Creator emitter thread is idling.")
    continue

    log(logging.INFO, f"Consumed page: {page}")
    creators = await api_call(f"https://civitai.com/api/v1/creators?page={page}")
    creators = json.loads(creators)
    if not creators or len(creators['items']) == 0:
    log(logging.INFO, f"No creators found on page: {page}")
    return
    page_queue.task_done()
    break

    for creator in creators.get('items', []):
    creator_username = creator['username']
    @@ -182,8 +209,21 @@ async def model_emitter(creator_queue: asyncio.Queue, model_queue: asyncio.Queue
    None
    """
    while controller_complete.is_set() == False or not creator_queue.empty():
    creator_json = await creator_queue.get()

    retries = 0
    while controller_complete.is_set() == False or creator_queue.empty() == False:
    try:
    await asyncio.sleep(random.uniform(0.1, 0.3))
    creator_json = creator_queue.get_nowait()
    retries = 0
    except asyncio.QueueEmpty:
    retries += 1
    if retries >= 60:
    log(logging.DEBUG, "Model emitter thread exiting.")
    break
    await asyncio.sleep(1.0)
    log(logging.DEBUG, "Model emitter thread is idling.")
    continue

    page = 1
    while True:
    @@ -243,8 +283,21 @@ async def model_consumer(model_queue: asyncio.Queue, models_json: List[Dict], co
    str: The json string with the models appended.
    """
    while controller_complete.is_set() == False or not model_queue.empty():
    model_json = await model_queue.get()
    retries = 0
    while controller_complete.is_set() == False or model_queue.empty() == False:
    try:
    await asyncio.sleep(random.uniform(0.1, 0.3))
    model_json = model_queue.get_nowait()
    retries = 0
    except asyncio.QueueEmpty:
    retries += 1
    if retries >= 60:
    log(logging.DEBUG, "Model consumer thread exiting.")
    break
    await asyncio.sleep(1.0)
    log(logging.DEBUG, "Model consumer thread is idling.")
    continue

    model_filename = model_json['filename']
    await save_padlock.wait()
    models_json = list_append(models_json, model_json, 'filename', True)
    @@ -267,29 +320,46 @@ async def save_db_emitter(creators_json: List[Dict], models_json: List[Dict], fi
    save_padlock (asyncio.Event): The event to set when the save event is emitted. Await this event before saving anything to the in-memory database.
    save_interval (int, optional): The number of seconds to wait between save events. Defaults to 60.
    """
    while controller_complete.is_set() == False or not save_queue.empty():
    await asyncio.sleep(save_interval)
    save_padlock.clear()
    data = {'creators': creators_json, 'models': models_json, 'file_path': file_path}
    await save_queue.put(data)
    log(logging.DEBUG, f"Emitted save event")
    save_padlock.set()
    retries = 0
    while controller_complete.is_set() == False:
    retries += 1
    if retries >= save_interval:
    retries = 0
    save_padlock.clear()
    data = {'creators': creators_json, 'models': models_json, 'file_path': file_path}
    await save_queue.put(data)
    log(logging.DEBUG, f"Emitted save event")
    save_padlock.set()
    await asyncio.sleep(1.0)

    log(logging.DEBUG, f"Emitted final save event")
    data = {'creators': creators_json, 'models': models_json, 'file_path': file_path}
    await save_queue.put(data)
    log(logging.DEBUG, "Save emitter thread exited.")

    async def save_db_consumer(save_queue: asyncio.Queue, controller_complete: asyncio.Event) -> None:
    async def save_db_consumer(save_queue: asyncio.Queue, controller_complete: asyncio.Event, save_interval: int = 60) -> None:
    """
    save_db_consumer: Consume save events from the given queue.
    Args:
    save_queue (asyncio.Queue): The queue to consume the save events from.
    """
    while controller_complete.is_set() == False or not save_queue.empty():
    data = await save_queue.get()
    retries = 0
    while controller_complete.is_set() == False or save_queue.empty() == False:
    try:
    await asyncio.sleep(random.uniform(0.1, 0.3))
    data = save_queue.get_nowait()
    retries = 0
    except asyncio.QueueEmpty:
    retries += 1
    if retries >= save_interval + 1:
    log(logging.DEBUG, "Save consumer thread exiting.")
    break
    await asyncio.sleep(1.0)
    log(logging.DEBUG, "Save consumer thread is idling.")
    continue

    creators_json = data['creators']
    models_json = data['models']
    file_path = data['file_path']
    @@ -412,7 +482,7 @@ async def main() -> None:
    database_group.add_argument('-j', '--json', type=str, default=f"{Path(__file__).parent.absolute()}{os.sep}civarti-db.json", dest="db", help="The path to the json file used as the database. If the file does not exist, it will be created.")
    database_group.add_argument('-i', '--save-interval', type=int, default=60, dest="saveinterval", help="The number of seconds to wait between saving the database to disk. This can help with performance, but setting it too low may result in data loss.")
    misc_group = parser.add_argument_group("Miscellaneous")
    misc_group.add_argument('-v', '--version', action="version", version="%(prog)s 1.0.1", help="Show the version of this tool.")
    misc_group.add_argument('-v', '--version', action="version", version="%(prog)s 1.0.0", help="Show the version of this tool.")
    misc_group.add_argument('-h', '--help', action="help", help="Show this help message and exit.")
    argv = parser.parse_args()

    @@ -451,7 +521,7 @@ async def main() -> None:

    tasks = [
    *[
    asyncio.create_task(controller(page_queue, controller_complete, save_padlock, argv.creatorlimit, startpage))
    asyncio.create_task(controller(page_queue, creator_queue, model_queue, save_queue, controller_complete, save_padlock, argv.creatorlimit, startpage))
    ],
    *[
    asyncio.create_task(creator_emitter(creator_queue, creators_json, page_queue, controller_complete, save_padlock, no_skip))
    @@ -469,17 +539,12 @@ async def main() -> None:
    asyncio.create_task(save_db_emitter(creators_json, models_json, argv.db, save_queue, controller_complete, save_padlock, argv.saveinterval))
    ],
    *[
    asyncio.create_task(save_db_consumer(save_queue, controller_complete))
    asyncio.create_task(save_db_consumer(save_queue, controller_complete, argv.saveinterval))
    ]
    ]

    await asyncio.gather(*tasks)

    await page_queue.join()
    await creator_queue.join()
    await model_queue.join()
    await save_queue.join()


    for task in tasks:
    task.cancel()

  7. loopyd revised this gist Nov 25, 2023. 1 changed file with 2 additions and 2 deletions.
    4 changes: 2 additions & 2 deletions civitai_scraper.py
    Original file line number Diff line number Diff line change
    @@ -1,5 +1,5 @@
    """ =======================================================================================
    civitai-scraper.py v1.0.0 deitydurg
    civitai-scraper.py v1.0.1 deitydurg
    =======================================================================================
    This script is used to scrape CivitAI (https://civitai.com) for models and creators.
    It will save the results to a json file, which can be used to bulk-download the models.
    @@ -412,7 +412,7 @@ async def main() -> None:
    database_group.add_argument('-j', '--json', type=str, default=f"{Path(__file__).parent.absolute()}{os.sep}civarti-db.json", dest="db", help="The path to the json file used as the database. If the file does not exist, it will be created.")
    database_group.add_argument('-i', '--save-interval', type=int, default=60, dest="saveinterval", help="The number of seconds to wait between saving the database to disk. This can help with performance, but setting it too low may result in data loss.")
    misc_group = parser.add_argument_group("Miscellaneous")
    misc_group.add_argument('-v', '--version', action="version", version="%(prog)s 1.0.0", help="Show the version of this tool.")
    misc_group.add_argument('-v', '--version', action="version", version="%(prog)s 1.0.1", help="Show the version of this tool.")
    misc_group.add_argument('-h', '--help', action="help", help="Show this help message and exit.")
    argv = parser.parse_args()

  8. loopyd revised this gist Nov 25, 2023. 1 changed file with 153 additions and 43 deletions.
    196 changes: 153 additions & 43 deletions civitai_scraper.py
    Original file line number Diff line number Diff line change
    @@ -92,7 +92,37 @@ async def api_call(url: str, method: str = 'GET', data: Any = None, retries: int
    log(logging.ERROR, "Failed to retrieve data after retries.")
    return None

    async def creator_emitter(creator_queue: asyncio.Queue, creators_json: List[Dict], page: int = 1, noskip: bool = False) -> None:
    async def controller(page_queue: asyncio.Queue, controller_complete: asyncio.Event, save_padlock: asyncio.Event, page_limit: int, start_page: int) -> None:
    """
    controller: The main controller for the CivitAI scraping system.
    Args:
    page_queue (asyncio.Queue): The queue to emit the page numbers to.
    save_queue (asyncio.Queue): The queue to emit the save events to.
    page_limit (int): The maximum number of pages to emit.
    start_page (int): The page of creators to start scraping from.
    Returns:
    None
    """
    page = start_page
    while page <= start_page + page_limit:
    await page_queue.put(page)
    log(logging.DEBUG, f"Emitted page: {page}")
    page += 1

    while not page_queue.empty():
    await asyncio.sleep(1)

    while not save_padlock.is_set():
    await asyncio.sleep(1)

    controller_complete.set()
    log(logging.DEBUG, "Controller thread exited.")

    async def creator_emitter(creator_queue: asyncio.Queue, creators_json: List[Dict], page_queue: asyncio.Queue, controller_complete: asyncio.Event, save_padlock: asyncio.Event, noskip: bool = False) -> None:
    """
    creator_emitter: Emits creator objects to the given queue
    @@ -107,44 +137,52 @@ async def creator_emitter(creator_queue: asyncio.Queue, creators_json: List[Dict
    None
    """

    creators = await api_call(f"https://civitai.com/api/v1/creators?page={page}")
    creators = json.loads(creators)
    if not creators or len(creators['items']) == 0:
    log(logging.INFO, "No creators found on page: {page}")
    return

    while controller_complete.is_set() == False or not page_queue.empty():
    page = await page_queue.get()
    log(logging.INFO, f"Consumed page: {page}")
    creators = await api_call(f"https://civitai.com/api/v1/creators?page={page}")
    creators = json.loads(creators)
    if not creators or len(creators['items']) == 0:
    log(logging.INFO, f"No creators found on page: {page}")
    return

    for creator in creators.get('items', []):
    creator_username = creator['username']
    creator_link = creator['link']
    if len(creators_json) > 0:
    in_creators_json = any(c['creator'] == creator_username for c in creators_json)
    else:
    in_creators_json = False
    if in_creators_json and noskip == False:
    log(logging.DEBUG, f"Skipping creator: {creator_username}")
    continue
    for creator in creators.get('items', []):
    creator_username = creator['username']
    creator_link = creator['link']
    if len(creators_json) > 0:
    in_creators_json = any(c['creator'] == creator_username for c in creators_json)
    else:
    in_creators_json = False
    if in_creators_json and noskip == False:
    log(logging.DEBUG, f"Skipping creator: {creator_username}")
    continue

    creator_json = {'creator': creator_username, 'link': creator_link, 'page': page}
    await creator_queue.put(creator_json)
    creators_json = list_append(creators_json, creator_json, 'creator', True)
    log(logging.INFO, f"Emitted creator: {creator_username}")
    creator_json = {'creator': creator_username, 'link': creator_link, 'page': page}
    await creator_queue.put(creator_json)
    await save_padlock.wait()
    creators_json = list_append(creators_json, creator_json, 'creator', True)
    log(logging.DEBUG, f"Emitted creator: {creator_username}")

    page_queue.task_done()

    log(logging.DEBUG, "Creator emitter thread exited.")

    async def model_emitter(creator_queue: asyncio.Queue, model_queue: asyncio.Queue, generator_complete: asyncio.Event) -> None:
    async def model_emitter(creator_queue: asyncio.Queue, model_queue: asyncio.Queue, controller_complete: asyncio.Event) -> None:
    """
    model_emitter: Consume creator objects from the given queue and scrape their models.
    Args:
    creator_queue (asyncio.Queue): The queue to consume the creator objects from.
    model_queue (asyncio.Queue): The queue to emit the model objects to.
    generator_complete (asyncio.Event): The event to set when the generator is complete.
    controller_complete (asyncio.Event): The event to set when the generator is complete.
    Returns:
    None
    """
    while generator_complete.is_set() == False or not creator_queue.empty():
    while controller_complete.is_set() == False or not creator_queue.empty():
    creator_json = await creator_queue.get()

    page = 1
    @@ -160,7 +198,7 @@ async def model_emitter(creator_queue: asyncio.Queue, model_queue: asyncio.Queue
    for model_type in ["LORA", "Checkpoint", "Controlnet"]:
    filtered_models = [m["modelVersions"] for m in page_models['items'] if m['type'] == model_type]
    if len(filtered_models) == 0:
    log(logging.DEBUG, f"No models found for {creator_json['creator']} with type: {model_type}")
    log(logging.INFO, f"No models found for {creator_json['creator']} with type: {model_type}")
    continue
    models_to_add = [
    {
    @@ -177,7 +215,7 @@ async def model_emitter(creator_queue: asyncio.Queue, model_queue: asyncio.Queue
    ]
    for model in models_to_add:
    models_added += 1
    log(logging.INFO, f"Emitted model: {model['filename']}")
    log(logging.DEBUG, f"Emitted model: {model['filename']}")
    await model_queue.put(model)

    if models_added == 0:
    @@ -187,28 +225,78 @@ async def model_emitter(creator_queue: asyncio.Queue, model_queue: asyncio.Queue
    page += 1

    creator_queue.task_done()

    log(logging.DEBUG, "Model emitter thread exited.")

    async def model_consumer(model_queue: asyncio.Queue, models_json: List[Dict], generator_complete: asyncio.Event) -> None:
    async def model_consumer(model_queue: asyncio.Queue, models_json: List[Dict], controller_complete: asyncio.Event, save_padlock: asyncio.Event ) -> None:
    """
    model_consumer: Consume model objects from the given queue and add them to the given json string.
    Args:
    model_queue (asyncio.Queue): The queue to consume the model objects from.
    models_json (list of dicts): The models object to append the models to.
    generator_complete (asyncio.Event): The event to set when the generator is complete.
    controller_complete (asyncio.Event): The event to set when the generator is complete.
    Returns:
    str: The json string with the models appended.
    """
    while generator_complete.is_set() == False or not model_queue.empty():
    while controller_complete.is_set() == False or not model_queue.empty():
    model_json = await model_queue.get()
    model_filename = model_json['filename']
    await save_padlock.wait()
    models_json = list_append(models_json, model_json, 'filename', True)
    log(logging.INFO, f"Processed model: {model_filename}")
    model_queue.task_done()

    log(logging.DEBUG, "Model consumer thread exited.")

    async def save_db_emitter(creators_json: List[Dict], models_json: List[Dict], file_path: str, save_queue: asyncio.Queue, controller_complete: asyncio.Event, save_padlock: asyncio.Event, save_interval: int = 60) -> None:
    """
    save_db_emitter: Emit save events to the given queue.
    Args:
    creators_json (list of dict): The creators object to save.
    models_json (list of dict): The models object to save.
    file_path (str): The path to the database file.
    save_queue (asyncio.Queue): The queue to emit the save events to.
    controller_complete (asyncio.Event): The event to set when the generator is complete.
    save_padlock (asyncio.Event): The event to set when the save event is emitted. Await this event before saving anything to the in-memory database.
    save_interval (int, optional): The number of seconds to wait between save events. Defaults to 60.
    """
    while controller_complete.is_set() == False or not save_queue.empty():
    await asyncio.sleep(save_interval)
    save_padlock.clear()
    data = {'creators': creators_json, 'models': models_json, 'file_path': file_path}
    await save_queue.put(data)
    log(logging.DEBUG, f"Emitted save event")
    save_padlock.set()

    log(logging.DEBUG, f"Emitted final save event")
    data = {'creators': creators_json, 'models': models_json, 'file_path': file_path}
    await save_queue.put(data)
    log(logging.DEBUG, "Save emitter thread exited.")

    async def save_db_consumer(save_queue: asyncio.Queue, controller_complete: asyncio.Event) -> None:
    """
    save_db_consumer: Consume save events from the given queue.
    Args:
    save_queue (asyncio.Queue): The queue to consume the save events from.
    """
    while controller_complete.is_set() == False or not save_queue.empty():
    data = await save_queue.get()
    creators_json = data['creators']
    models_json = data['models']
    file_path = data['file_path']
    await save_db_json(creators_json, models_json, file_path)
    save_queue.task_done()

    log(logging.DEBUG, "Save consumer thread exited.")

    async def load_db_json(file_path: str) -> (List[Dict], List[Dict]):
    """
    @@ -322,6 +410,7 @@ async def main() -> None:
    scraping_group.add_argument('-n', '--no-skip', action="store_true", dest="noskip", help="Do not skip creators that are already in the database. This will cause the tool to scrape all encountered creators, even if they are already in the database -- updating their models.")
    database_group = parser.add_argument_group("Database Options")
    database_group.add_argument('-j', '--json', type=str, default=f"{Path(__file__).parent.absolute()}{os.sep}civarti-db.json", dest="db", help="The path to the json file used as the database. If the file does not exist, it will be created.")
    database_group.add_argument('-i', '--save-interval', type=int, default=60, dest="saveinterval", help="The number of seconds to wait between saving the database to disk. This can help with performance, but setting it too low may result in data loss.")
    misc_group = parser.add_argument_group("Miscellaneous")
    misc_group.add_argument('-v', '--version', action="version", version="%(prog)s 1.0.0", help="Show the version of this tool.")
    misc_group.add_argument('-h', '--help', action="help", help="Show this help message and exit.")
    @@ -351,27 +440,48 @@ async def main() -> None:
    startpage = argv.startpage if argv.startpage != -1 else (last_key(creators_json, 'page', 1) + 1 if len(creators_json) > 0 and last_key(creators_json, 'page', 1) == 1 else last_key(creators_json, 'page', 1) + 1)
    log(logging.DEBUG, f"Starting from page: {startpage}")

    generator_complete = asyncio.Event()
    controller_complete = asyncio.Event()
    save_padlock = asyncio.Event()
    creator_queue = asyncio.Queue(argv.threads)
    model_queue = asyncio.Queue(argv.threads)
    creator_emitters = [
    asyncio.create_task(creator_emitter(creator_queue, creators_json, page, no_skip))
    for page in range(startpage, (argv.creatorlimit + startpage))
    page_queue = asyncio.Queue(argv.threads)
    save_queue = asyncio.Queue(1)

    save_padlock.set()

    tasks = [
    *[
    asyncio.create_task(controller(page_queue, controller_complete, save_padlock, argv.creatorlimit, startpage))
    ],
    *[
    asyncio.create_task(creator_emitter(creator_queue, creators_json, page_queue, controller_complete, save_padlock, no_skip))
    for _ in range(argv.threads)
    ],
    *[
    asyncio.create_task(model_emitter(creator_queue, model_queue, controller_complete))
    for _ in range(argv.threads)
    ],
    *[
    asyncio.create_task(model_consumer(model_queue, models_json, controller_complete, save_padlock))
    for _ in range(argv.threads)
    ],
    *[
    asyncio.create_task(save_db_emitter(creators_json, models_json, argv.db, save_queue, controller_complete, save_padlock, argv.saveinterval))
    ],
    *[
    asyncio.create_task(save_db_consumer(save_queue, controller_complete))
    ]
    model_emitters = asyncio.create_task(model_emitter(creator_queue, model_queue, generator_complete))
    model_consumers = asyncio.create_task(model_consumer(model_queue, models_json, generator_complete))
    ]

    await asyncio.gather(*tasks)

    await asyncio.gather(*creator_emitters, return_exceptions=True)
    await page_queue.join()
    await creator_queue.join()

    generator_complete = True
    await model_queue.join()
    await save_queue.join()

    model_emitters.cancel()
    model_consumers.cancel()

    log(logging.INFO, f"Saving results to {argv.db}...")
    await save_db_json(creators_json, models_json, argv.db)
    for task in tasks:
    task.cancel()

    log(logging.INFO, "Scraping process completed.")

  9. loopyd created this gist Nov 24, 2023.
    379 changes: 379 additions & 0 deletions civitai_scraper.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,379 @@
    """ =======================================================================================
    civitai-scraper.py v1.0.0 deitydurg
    =======================================================================================
    This script is used to scrape CivitAI (https://civitai.com) for models and creators.
    It will save the results to a json file, which can be used to bulk-download the models.
    This script is not affiliated with CivitAI in any way, and is provided as-is with some
    updates when I have time. Therefore you should use it at your own risk.
    ---------------------------------------------------------------------------------------
    Questions? Comments? Need to scream at me for writing this? OK!
    Feel free to present them to the Discord username deitydurg, or comment on this Gist.
    I will address them at my earliest convenience.
    For help with the script, run it with the -h or --help options.
    =======================================================================================
    """
    import json
    import os, sys
    from pathlib import Path
    import asyncio
    from argparse import ArgumentParser
    from typing import Any, Dict, List
    from aiolimiter import AsyncLimiter
    import aiohttp
    import aiofiles
    import logging

    logging.basicConfig(
    level=logging.DEBUG,
    format='[%(levelname)s] [%(asctime)s.%(msecs)03d] %(message)s',
    datefmt='%Y/%m/%d %H:%M:%S',
    handlers=[
    logging.StreamHandler(stream=sys.stdout)
    ]
    )

    def log(loglevel: int, message: str) -> None:
    """
    log: Log a message to the logging system.
    Args:
    message (int): The message to log.
    Returns:
    None
    """
    func = sys._getframe(1).f_code.co_name
    message = f"[{func}]: {message}"
    log_func = {
    'info': logging.info,
    'warning': logging.warning,
    'error': logging.error,
    'critical': logging.critical,
    'debug': logging.debug
    }.get(logging.getLevelName(loglevel).lower(), logging.info)
    log_func(message)

    async def api_call(url: str, method: str = 'GET', data: Any = None, retries: int = 3, delay: int = 5) -> str:
    """
    api_call: Make an API call to the given URL.
    Args:
    url (str): The URL to make the request to.
    method (str, optional): The HTTP method to use. Defaults to 'GET'.
    data (any, optional): The data to send with the request. Defaults to None.
    retries (int, optional): The number of times to retry the request. Defaults to 3.
    delay (int, optional): The number of seconds to wait between retries. Defaults to 5.
    Returns:
    str: The response text.
    """

    attempt = 0
    while attempt < retries:
    try:
    log(logging.DEBUG, f"{method} {url}")
    async with limiter:
    async with aiohttp.ClientSession() as session:
    async with session.request(method, url, data=data) as response:
    response.raise_for_status()
    return await response.text()
    except aiohttp.ClientError as e:
    log(logging.WARNING, f"Error on attempt {attempt + 1}/{retries} ({e})")
    await asyncio.sleep(delay)
    attempt += 1

    log(logging.ERROR, "Failed to retrieve data after retries.")
    return None

    async def creator_emitter(creator_queue: asyncio.Queue, creators_json: List[Dict], page: int = 1, noskip: bool = False) -> None:
    """
    creator_emitter: Emits creator objects to the given queue
    Args:
    creator_queue (asyncio.Queue): The queue to emit the creator objects to.
    creators_json (list of dict): The creators object to append the creators to.
    page (int, optional): The page on the user's models listing to scrape from.
    noskip (bool, optional): If True, do not skip creators that are already in the database. Defaults to False.
    Returns:
    None
    """

    creators = await api_call(f"https://civitai.com/api/v1/creators?page={page}")
    creators = json.loads(creators)
    if not creators or len(creators['items']) == 0:
    log(logging.INFO, "No creators found on page: {page}")
    return

    for creator in creators.get('items', []):
    creator_username = creator['username']
    creator_link = creator['link']
    if len(creators_json) > 0:
    in_creators_json = any(c['creator'] == creator_username for c in creators_json)
    else:
    in_creators_json = False
    if in_creators_json and noskip == False:
    log(logging.DEBUG, f"Skipping creator: {creator_username}")
    continue

    creator_json = {'creator': creator_username, 'link': creator_link, 'page': page}
    await creator_queue.put(creator_json)
    creators_json = list_append(creators_json, creator_json, 'creator', True)
    log(logging.INFO, f"Emitted creator: {creator_username}")

    async def model_emitter(creator_queue: asyncio.Queue, model_queue: asyncio.Queue, generator_complete: asyncio.Event) -> None:
    """
    model_emitter: Consume creator objects from the given queue and scrape their models.
    Args:
    creator_queue (asyncio.Queue): The queue to consume the creator objects from.
    model_queue (asyncio.Queue): The queue to emit the model objects to.
    generator_complete (asyncio.Event): The event to set when the generator is complete.
    Returns:
    None
    """
    while generator_complete.is_set() == False or not creator_queue.empty():
    creator_json = await creator_queue.get()

    page = 1
    while True:
    page_models = await api_call(f"{creator_json['link']}&page={page}")
    page_models = json.loads(page_models)

    if not page_models or len(page_models['items']) == 0:
    log(logging.INFO, f"No models found for creator: {creator_json['creator']}")
    break

    models_added = 0
    for model_type in ["LORA", "Checkpoint", "Controlnet"]:
    filtered_models = [m["modelVersions"] for m in page_models['items'] if m['type'] == model_type]
    if len(filtered_models) == 0:
    log(logging.DEBUG, f"No models found for {creator_json['creator']} with type: {model_type}")
    continue
    models_to_add = [
    {
    'filename': f"{creator_json['creator']}-{model_file['name']}",
    'url': model_file['downloadUrl'],
    'creator': creator_json['creator'],
    'type': model_type,
    'page': page
    }
    for model_versions in filtered_models
    for model_version in model_versions
    for model_file in model_version['files']
    if 'name' in model_file and 'downloadUrl' in model_file and model_file['pickleScanResult'] == 'Success' and model_file['virusScanResult'] == 'Success'
    ]
    for model in models_to_add:
    models_added += 1
    log(logging.INFO, f"Emitted model: {model['filename']}")
    await model_queue.put(model)

    if models_added == 0:
    log(logging.DEBUG, f"No models emitted for creator: {creator_json['creator']}")
    break

    page += 1

    creator_queue.task_done()

    async def model_consumer(model_queue: asyncio.Queue, models_json: List[Dict], generator_complete: asyncio.Event) -> None:
    """
    model_consumer: Consume model objects from the given queue and add them to the given json string.
    Args:
    model_queue (asyncio.Queue): The queue to consume the model objects from.
    models_json (list of dicts): The models object to append the models to.
    generator_complete (asyncio.Event): The event to set when the generator is complete.
    Returns:
    str: The json string with the models appended.
    """
    while generator_complete.is_set() == False or not model_queue.empty():
    model_json = await model_queue.get()
    model_filename = model_json['filename']
    models_json = list_append(models_json, model_json, 'filename', True)
    log(logging.INFO, f"Processed model: {model_filename}")
    model_queue.task_done()

    async def load_db_json(file_path: str) -> (List[Dict], List[Dict]):
    """
    load_db_json: Load the JSON database file at the given path.
    Args:
    file_path (str): The path to the json file.
    Returns:
    list of dict: The creators object.
    list of dict: The models object.
    """
    if not os.path.exists(file_path):
    return [], []
    async with aiofiles.open(file_path, 'r', encoding="utf-8") as file:
    data = json.loads(await file.read())
    log(logging.DEBUG, f"Loaded {len(data['creators'])} creators and {len(data['models'])} models from: {file_path}")
    return data['creators'], data['models']

    async def save_db_json(creators: List[Dict], models: List[Dict], file_path: str) -> None:
    """
    save_db_json: Save the given creators and models to the given JSON file.
    Args:
    creators (list of dict): The creators object to save.
    models (list of dict): The models object to save.
    file_path (str): The path to the json file.
    Returns:
    None
    """
    json_data = {'creators': creators, 'models': models}
    async with aiofiles.open(file_path, 'w', encoding="utf-8") as file:
    await file.write(json.dumps(json_data, ensure_ascii=False, indent=4))
    log(logging.DEBUG, f"Saved {len(creators)} creators and {len(models)} models to {file_path}")

    def last_key(data: List[Dict], key: str, default: Any = None) -> Any:
    """
    last_key: Get the last value of the given key in the given json data.
    Args:
    data (dist of dict): The json data to search.
    key (str): The key to search for.
    default (str, optional): The default value to return if the key is not found. Defaults to None.
    Returns:
    str: The value of the key.
    """
    return data[-1].get(key, default) if data else default

    def list_append(data: List[Dict], new_item: Dict, unique_key: str = None, update_if_exists: bool = False) -> List[Dict]:
    """
    list_append: Append or optionally update the given item to the given list.
    Args:
    data (list of dict): The data to append to or update.
    new_item (dict): The item to append or with which to update an existing item.
    unique_key (str, optional): The key to check for uniqueness. Defaults to None.
    update_if_exists (bool, optional): If True, update an existing item based on the unique_key.
    Defaults to False.
    Returns:
    list of dict: The data with the new item appended or existing item updated.
    """
    if unique_key and update_if_exists:
    for index, item in enumerate(data):
    if item.get(unique_key) == new_item.get(unique_key):
    data[index] = new_item
    break
    else:
    data.append(new_item)
    else:
    if new_item not in data:
    data.append(new_item)

    return data

    async def main() -> None:
    """
    main: The main function.
    Returns:
    None
    """
    parser = ArgumentParser(
    prog="civitai_scraper.py",
    description="Scrape CivitAI for models and creators.",
    allow_abbrev=True,
    add_help=False,
    epilog="Tool created by: deitydurg | If any questions, ask on Discord for assistance.")

    logging_group = parser.add_argument_group("Logging")
    logging_group.add_argument('-x', '--log-level', type=str, default="info", dest="loglevel", choices=["info", "warning", "error", "critical", "debug"], help="The logging level to use. If you want to see debug messages, set this to 'debug'.")
    logging_group.add_argument('-y', '--log-file', type=str, default=f"{Path(__file__).parent.absolute()}{os.sep}civarti-scraper.log", dest="logfile", help="The path to the log file where logs will be saved.")
    ratelimit_group = parser.add_argument_group("Rate Limits & Performance")
    ratelimit_group.add_argument('-p', '--api-period', type=int, default=180, dest="apiperiod", help="The period of time to limit API calls (in seconds). WARNING: Setting this value too low may result in a ban from Civiarti API, or you being temporarily ratelimited.")
    ratelimit_group.add_argument('-l', '--api-limit', type=int, default=100, dest="apilimit", help="The number of API calls to allow per period. WARNING: Setting this value too low may result in a ban from Civiarti API, or you being temporarily ratelimited.")
    ratelimit_group.add_argument('-t', '--threads', type=int, default=5, dest="threads", help="The maximum number of concurrent/asynchronous threads to run. This can help out with entering retries for making too many requests at once, but will slow down the tool. If you are seeing retry messages often, try lowering this value from its default of 5.")
    scraping_group = parser.add_argument_group("Scraping Options")
    scraping_group.add_argument('-c', '--creator-limit', type=int, default=100, dest="creatorlimit", help="The maximum number of creators to scrape.")
    scraping_group.add_argument('-s', '--start-page', type=int, default=-1, dest="startpage", help="The page of creators to start scraping from. You can use this to resume a previous scraping session. If this is set to -1, it will start from the last page scraped.")
    scraping_group.add_argument('-n', '--no-skip', action="store_true", dest="noskip", help="Do not skip creators that are already in the database. This will cause the tool to scrape all encountered creators, even if they are already in the database -- updating their models.")
    database_group = parser.add_argument_group("Database Options")
    database_group.add_argument('-j', '--json', type=str, default=f"{Path(__file__).parent.absolute()}{os.sep}civarti-db.json", dest="db", help="The path to the json file used as the database. If the file does not exist, it will be created.")
    misc_group = parser.add_argument_group("Miscellaneous")
    misc_group.add_argument('-v', '--version', action="version", version="%(prog)s 1.0.0", help="Show the version of this tool.")
    misc_group.add_argument('-h', '--help', action="help", help="Show this help message and exit.")
    argv = parser.parse_args()

    argv.creatorlimit = max(1, argv.creatorlimit)
    log(logging.DEBUG, f"Scraping up to {argv.creatorlimit} creators from Civitai")
    global limiter
    limiter = AsyncLimiter(argv.apiperiod, argv.apilimit)
    log(logging.DEBUG, f"Set API rate limit to {argv.apilimit} calls per {argv.apiperiod} second(s).")

    logging.getLogger().setLevel(argv.loglevel.upper())
    logging.getLogger().addHandler(logging.FileHandler(argv.logfile))

    no_skip = argv.noskip
    if no_skip == False:
    log(logging.DEBUG, "Skipping creators that are already in the database.")
    else:
    log(logging.DEBUG, "Not skipping creators that are already in the database.")

    ###############################
    # Start of the main program...#
    ###############################

    creators_json, models_json = await load_db_json(argv.db)

    startpage = argv.startpage if argv.startpage != -1 else (last_key(creators_json, 'page', 1) + 1 if len(creators_json) > 0 and last_key(creators_json, 'page', 1) == 1 else last_key(creators_json, 'page', 1) + 1)
    log(logging.DEBUG, f"Starting from page: {startpage}")

    generator_complete = asyncio.Event()
    creator_queue = asyncio.Queue(argv.threads)
    model_queue = asyncio.Queue(argv.threads)
    creator_emitters = [
    asyncio.create_task(creator_emitter(creator_queue, creators_json, page, no_skip))
    for page in range(startpage, (argv.creatorlimit + startpage))
    ]
    model_emitters = asyncio.create_task(model_emitter(creator_queue, model_queue, generator_complete))
    model_consumers = asyncio.create_task(model_consumer(model_queue, models_json, generator_complete))

    await asyncio.gather(*creator_emitters, return_exceptions=True)
    await creator_queue.join()

    generator_complete = True
    await model_queue.join()

    model_emitters.cancel()
    model_consumers.cancel()

    log(logging.INFO, f"Saving results to {argv.db}...")
    await save_db_json(creators_json, models_json, argv.db)

    log(logging.INFO, "Scraping process completed.")

    if __name__ == "__main__":
    asyncio.run(main())