Last active
October 9, 2025 07:51
-
-
Save loopyd/581977be7040ffa5796458afb3c96bd2 to your computer and use it in GitHub Desktop.
[python] CivitAI Scraper - Data archival utility for CivitAI
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ ======================================================================================= | |
| civitai-scraper.py v1.0.1 deitydurg | |
| ======================================================================================= | |
| This script is used to scrape CivitAI (https://civitai.com) for models and creators. | |
| It will save the results to a json file, which can be used to bulk-download the models. | |
| This script is not affiliated with CivitAI in any way, and is provided as-is with some | |
| updates when I have time. Therefore you should use it at your own risk. | |
| --------------------------------------------------------------------------------------- | |
| Questions? Comments? Need to scream at me for writing this? OK! | |
| Feel free to present them to the Discord username deitydurg, or comment on this Gist. | |
| I will address them at my earliest convenience. | |
| For help with the script, run it with the -h or --help options. | |
| ======================================================================================= | |
| """ | |
| import json | |
| import os, sys | |
| from pathlib import Path | |
| import asyncio | |
| from argparse import ArgumentParser | |
| from typing import Any, Dict, List | |
| from aiolimiter import AsyncLimiter | |
| import aiohttp | |
| import aiofiles | |
| import logging | |
| logging.basicConfig( | |
| level=logging.DEBUG, | |
| format='[%(levelname)s] [%(asctime)s.%(msecs)03d] %(message)s', | |
| datefmt='%Y/%m/%d %H:%M:%S', | |
| handlers=[ | |
| logging.StreamHandler(stream=sys.stdout) | |
| ] | |
| ) | |
| def log(loglevel: int, message: str) -> None: | |
| """ | |
| log: Log a message to the logging system. | |
| Args: | |
| message (int): The message to log. | |
| Returns: | |
| None | |
| """ | |
| func = sys._getframe(1).f_code.co_name | |
| message = f"[{func}]: {message}" | |
| log_func = { | |
| 'info': logging.info, | |
| 'warning': logging.warning, | |
| 'error': logging.error, | |
| 'critical': logging.critical, | |
| 'debug': logging.debug | |
| }.get(logging.getLevelName(loglevel).lower(), logging.info) | |
| log_func(message) | |
| async def api_call(url: str, method: str = 'GET', data: Any = None, retries: int = 3, delay: int = 5) -> str: | |
| """ | |
| api_call: Make an API call to the given URL. | |
| Args: | |
| url (str): The URL to make the request to. | |
| method (str, optional): The HTTP method to use. Defaults to 'GET'. | |
| data (any, optional): The data to send with the request. Defaults to None. | |
| retries (int, optional): The number of times to retry the request. Defaults to 3. | |
| delay (int, optional): The number of seconds to wait between retries. Defaults to 5. | |
| Returns: | |
| str: The response text. | |
| """ | |
| attempt = 0 | |
| while attempt < retries: | |
| try: | |
| log(logging.DEBUG, f"{method} {url}") | |
| async with limiter: | |
| async with aiohttp.ClientSession() as session: | |
| async with session.request(method, url, data=data) as response: | |
| response.raise_for_status() | |
| return await response.text() | |
| except aiohttp.ClientError as e: | |
| log(logging.WARNING, f"Error on attempt {attempt + 1}/{retries} ({e})") | |
| await asyncio.sleep(delay) | |
| attempt += 1 | |
| log(logging.ERROR, "Failed to retrieve data after retries.") | |
| return None | |
| async def controller(page_queue: asyncio.Queue, controller_complete: asyncio.Event, save_padlock: asyncio.Event, page_limit: int, start_page: int) -> None: | |
| """ | |
| controller: The main controller for the CivitAI scraping system. | |
| Args: | |
| page_queue (asyncio.Queue): The queue to emit the page numbers to. | |
| save_queue (asyncio.Queue): The queue to emit the save events to. | |
| page_limit (int): The maximum number of pages to emit. | |
| start_page (int): The page of creators to start scraping from. | |
| Returns: | |
| None | |
| """ | |
| page = start_page | |
| while page <= start_page + page_limit: | |
| await page_queue.put(page) | |
| log(logging.DEBUG, f"Emitted page: {page}") | |
| page += 1 | |
| while not page_queue.empty(): | |
| await asyncio.sleep(1) | |
| while not save_padlock.is_set(): | |
| await asyncio.sleep(1) | |
| controller_complete.set() | |
| log(logging.DEBUG, "Controller thread exited.") | |
| async def creator_emitter(creator_queue: asyncio.Queue, creators_json: List[Dict], page_queue: asyncio.Queue, controller_complete: asyncio.Event, save_padlock: asyncio.Event, noskip: bool = False) -> None: | |
| """ | |
| creator_emitter: Emits creator objects to the given queue | |
| Args: | |
| creator_queue (asyncio.Queue): The queue to emit the creator objects to. | |
| creators_json (list of dict): The creators object to append the creators to. | |
| page (int, optional): The page on the user's models listing to scrape from. | |
| noskip (bool, optional): If True, do not skip creators that are already in the database. Defaults to False. | |
| Returns: | |
| None | |
| """ | |
| while controller_complete.is_set() == False or not page_queue.empty(): | |
| page = await page_queue.get() | |
| log(logging.INFO, f"Consumed page: {page}") | |
| creators = await api_call(f"https://civitai.com/api/v1/creators?page={page}") | |
| creators = json.loads(creators) | |
| if not creators or len(creators['items']) == 0: | |
| log(logging.INFO, f"No creators found on page: {page}") | |
| return | |
| for creator in creators.get('items', []): | |
| creator_username = creator['username'] | |
| creator_link = creator['link'] | |
| if len(creators_json) > 0: | |
| in_creators_json = any(c['creator'] == creator_username for c in creators_json) | |
| else: | |
| in_creators_json = False | |
| if in_creators_json and noskip == False: | |
| log(logging.DEBUG, f"Skipping creator: {creator_username}") | |
| continue | |
| creator_json = {'creator': creator_username, 'link': creator_link, 'page': page} | |
| await creator_queue.put(creator_json) | |
| await save_padlock.wait() | |
| creators_json = list_append(creators_json, creator_json, 'creator', True) | |
| log(logging.DEBUG, f"Emitted creator: {creator_username}") | |
| page_queue.task_done() | |
| log(logging.DEBUG, "Creator emitter thread exited.") | |
| async def model_emitter(creator_queue: asyncio.Queue, model_queue: asyncio.Queue, controller_complete: asyncio.Event) -> None: | |
| """ | |
| model_emitter: Consume creator objects from the given queue and scrape their models. | |
| Args: | |
| creator_queue (asyncio.Queue): The queue to consume the creator objects from. | |
| model_queue (asyncio.Queue): The queue to emit the model objects to. | |
| controller_complete (asyncio.Event): The event to set when the generator is complete. | |
| Returns: | |
| None | |
| """ | |
| while controller_complete.is_set() == False or not creator_queue.empty(): | |
| creator_json = await creator_queue.get() | |
| page = 1 | |
| while True: | |
| page_models = await api_call(f"{creator_json['link']}&page={page}") | |
| page_models = json.loads(page_models) | |
| if not page_models or len(page_models['items']) == 0: | |
| log(logging.INFO, f"No models found for creator: {creator_json['creator']}") | |
| break | |
| models_added = 0 | |
| for model_type in ["LORA", "Checkpoint", "Controlnet"]: | |
| filtered_models = [m["modelVersions"] for m in page_models['items'] if m['type'] == model_type] | |
| if len(filtered_models) == 0: | |
| log(logging.INFO, f"No models found for {creator_json['creator']} with type: {model_type}") | |
| continue | |
| models_to_add = [ | |
| { | |
| 'filename': f"{creator_json['creator']}-{model_file['name']}", | |
| 'url': model_file['downloadUrl'], | |
| 'creator': creator_json['creator'], | |
| 'type': model_type, | |
| 'page': page | |
| } | |
| for model_versions in filtered_models | |
| for model_version in model_versions | |
| for model_file in model_version['files'] | |
| if 'name' in model_file and 'downloadUrl' in model_file and model_file['pickleScanResult'] == 'Success' and model_file['virusScanResult'] == 'Success' | |
| ] | |
| for model in models_to_add: | |
| models_added += 1 | |
| log(logging.DEBUG, f"Emitted model: {model['filename']}") | |
| await model_queue.put(model) | |
| if models_added == 0: | |
| log(logging.DEBUG, f"No models emitted for creator: {creator_json['creator']}") | |
| break | |
| page += 1 | |
| creator_queue.task_done() | |
| log(logging.DEBUG, "Model emitter thread exited.") | |
| async def model_consumer(model_queue: asyncio.Queue, models_json: List[Dict], controller_complete: asyncio.Event, save_padlock: asyncio.Event ) -> None: | |
| """ | |
| model_consumer: Consume model objects from the given queue and add them to the given json string. | |
| Args: | |
| model_queue (asyncio.Queue): The queue to consume the model objects from. | |
| models_json (list of dicts): The models object to append the models to. | |
| controller_complete (asyncio.Event): The event to set when the generator is complete. | |
| Returns: | |
| str: The json string with the models appended. | |
| """ | |
| while controller_complete.is_set() == False or not model_queue.empty(): | |
| model_json = await model_queue.get() | |
| model_filename = model_json['filename'] | |
| await save_padlock.wait() | |
| models_json = list_append(models_json, model_json, 'filename', True) | |
| log(logging.INFO, f"Processed model: {model_filename}") | |
| model_queue.task_done() | |
| log(logging.DEBUG, "Model consumer thread exited.") | |
| async def save_db_emitter(creators_json: List[Dict], models_json: List[Dict], file_path: str, save_queue: asyncio.Queue, controller_complete: asyncio.Event, save_padlock: asyncio.Event, save_interval: int = 60) -> None: | |
| """ | |
| save_db_emitter: Emit save events to the given queue. | |
| Args: | |
| creators_json (list of dict): The creators object to save. | |
| models_json (list of dict): The models object to save. | |
| file_path (str): The path to the database file. | |
| save_queue (asyncio.Queue): The queue to emit the save events to. | |
| controller_complete (asyncio.Event): The event to set when the generator is complete. | |
| save_padlock (asyncio.Event): The event to set when the save event is emitted. Await this event before saving anything to the in-memory database. | |
| save_interval (int, optional): The number of seconds to wait between save events. Defaults to 60. | |
| """ | |
| while controller_complete.is_set() == False or not save_queue.empty(): | |
| await asyncio.sleep(save_interval) | |
| save_padlock.clear() | |
| data = {'creators': creators_json, 'models': models_json, 'file_path': file_path} | |
| await save_queue.put(data) | |
| log(logging.DEBUG, f"Emitted save event") | |
| save_padlock.set() | |
| log(logging.DEBUG, f"Emitted final save event") | |
| data = {'creators': creators_json, 'models': models_json, 'file_path': file_path} | |
| await save_queue.put(data) | |
| log(logging.DEBUG, "Save emitter thread exited.") | |
| async def save_db_consumer(save_queue: asyncio.Queue, controller_complete: asyncio.Event) -> None: | |
| """ | |
| save_db_consumer: Consume save events from the given queue. | |
| Args: | |
| save_queue (asyncio.Queue): The queue to consume the save events from. | |
| """ | |
| while controller_complete.is_set() == False or not save_queue.empty(): | |
| data = await save_queue.get() | |
| creators_json = data['creators'] | |
| models_json = data['models'] | |
| file_path = data['file_path'] | |
| await save_db_json(creators_json, models_json, file_path) | |
| save_queue.task_done() | |
| log(logging.DEBUG, "Save consumer thread exited.") | |
| async def load_db_json(file_path: str) -> (List[Dict], List[Dict]): | |
| """ | |
| load_db_json: Load the JSON database file at the given path. | |
| Args: | |
| file_path (str): The path to the json file. | |
| Returns: | |
| list of dict: The creators object. | |
| list of dict: The models object. | |
| """ | |
| if not os.path.exists(file_path): | |
| return [], [] | |
| async with aiofiles.open(file_path, 'r', encoding="utf-8") as file: | |
| data = json.loads(await file.read()) | |
| log(logging.DEBUG, f"Loaded {len(data['creators'])} creators and {len(data['models'])} models from: {file_path}") | |
| return data['creators'], data['models'] | |
| async def save_db_json(creators: List[Dict], models: List[Dict], file_path: str) -> None: | |
| """ | |
| save_db_json: Save the given creators and models to the given JSON file. | |
| Args: | |
| creators (list of dict): The creators object to save. | |
| models (list of dict): The models object to save. | |
| file_path (str): The path to the json file. | |
| Returns: | |
| None | |
| """ | |
| json_data = {'creators': creators, 'models': models} | |
| async with aiofiles.open(file_path, 'w', encoding="utf-8") as file: | |
| await file.write(json.dumps(json_data, ensure_ascii=False, indent=4)) | |
| log(logging.DEBUG, f"Saved {len(creators)} creators and {len(models)} models to {file_path}") | |
| def last_key(data: List[Dict], key: str, default: Any = None) -> Any: | |
| """ | |
| last_key: Get the last value of the given key in the given json data. | |
| Args: | |
| data (dist of dict): The json data to search. | |
| key (str): The key to search for. | |
| default (str, optional): The default value to return if the key is not found. Defaults to None. | |
| Returns: | |
| str: The value of the key. | |
| """ | |
| return data[-1].get(key, default) if data else default | |
| def list_append(data: List[Dict], new_item: Dict, unique_key: str = None, update_if_exists: bool = False) -> List[Dict]: | |
| """ | |
| list_append: Append or optionally update the given item to the given list. | |
| Args: | |
| data (list of dict): The data to append to or update. | |
| new_item (dict): The item to append or with which to update an existing item. | |
| unique_key (str, optional): The key to check for uniqueness. Defaults to None. | |
| update_if_exists (bool, optional): If True, update an existing item based on the unique_key. | |
| Defaults to False. | |
| Returns: | |
| list of dict: The data with the new item appended or existing item updated. | |
| """ | |
| if unique_key and update_if_exists: | |
| for index, item in enumerate(data): | |
| if item.get(unique_key) == new_item.get(unique_key): | |
| data[index] = new_item | |
| break | |
| else: | |
| data.append(new_item) | |
| else: | |
| if new_item not in data: | |
| data.append(new_item) | |
| return data | |
| async def main() -> None: | |
| """ | |
| main: The main function. | |
| Returns: | |
| None | |
| """ | |
| parser = ArgumentParser( | |
| prog="civitai_scraper.py", | |
| description="Scrape CivitAI for models and creators.", | |
| allow_abbrev=True, | |
| add_help=False, | |
| epilog="Tool created by: deitydurg | If any questions, ask on Discord for assistance.") | |
| logging_group = parser.add_argument_group("Logging") | |
| logging_group.add_argument('-x', '--log-level', type=str, default="info", dest="loglevel", choices=["info", "warning", "error", "critical", "debug"], help="The logging level to use. If you want to see debug messages, set this to 'debug'.") | |
| logging_group.add_argument('-y', '--log-file', type=str, default=f"{Path(__file__).parent.absolute()}{os.sep}civarti-scraper.log", dest="logfile", help="The path to the log file where logs will be saved.") | |
| ratelimit_group = parser.add_argument_group("Rate Limits & Performance") | |
| ratelimit_group.add_argument('-p', '--api-period', type=int, default=180, dest="apiperiod", help="The period of time to limit API calls (in seconds). WARNING: Setting this value too low may result in a ban from Civiarti API, or you being temporarily ratelimited.") | |
| ratelimit_group.add_argument('-l', '--api-limit', type=int, default=100, dest="apilimit", help="The number of API calls to allow per period. WARNING: Setting this value too low may result in a ban from Civiarti API, or you being temporarily ratelimited.") | |
| ratelimit_group.add_argument('-t', '--threads', type=int, default=5, dest="threads", help="The maximum number of concurrent/asynchronous threads to run. This can help out with entering retries for making too many requests at once, but will slow down the tool. If you are seeing retry messages often, try lowering this value from its default of 5.") | |
| scraping_group = parser.add_argument_group("Scraping Options") | |
| scraping_group.add_argument('-c', '--creator-limit', type=int, default=100, dest="creatorlimit", help="The maximum number of creators to scrape.") | |
| scraping_group.add_argument('-s', '--start-page', type=int, default=-1, dest="startpage", help="The page of creators to start scraping from. You can use this to resume a previous scraping session. If this is set to -1, it will start from the last page scraped.") | |
| scraping_group.add_argument('-n', '--no-skip', action="store_true", dest="noskip", help="Do not skip creators that are already in the database. This will cause the tool to scrape all encountered creators, even if they are already in the database -- updating their models.") | |
| database_group = parser.add_argument_group("Database Options") | |
| database_group.add_argument('-j', '--json', type=str, default=f"{Path(__file__).parent.absolute()}{os.sep}civarti-db.json", dest="db", help="The path to the json file used as the database. If the file does not exist, it will be created.") | |
| database_group.add_argument('-i', '--save-interval', type=int, default=60, dest="saveinterval", help="The number of seconds to wait between saving the database to disk. This can help with performance, but setting it too low may result in data loss.") | |
| misc_group = parser.add_argument_group("Miscellaneous") | |
| misc_group.add_argument('-v', '--version', action="version", version="%(prog)s 1.0.1", help="Show the version of this tool.") | |
| misc_group.add_argument('-h', '--help', action="help", help="Show this help message and exit.") | |
| argv = parser.parse_args() | |
| argv.creatorlimit = max(1, argv.creatorlimit) | |
| log(logging.DEBUG, f"Scraping up to {argv.creatorlimit} creators from Civitai") | |
| global limiter | |
| limiter = AsyncLimiter(argv.apiperiod, argv.apilimit) | |
| log(logging.DEBUG, f"Set API rate limit to {argv.apilimit} calls per {argv.apiperiod} second(s).") | |
| logging.getLogger().setLevel(argv.loglevel.upper()) | |
| logging.getLogger().addHandler(logging.FileHandler(argv.logfile)) | |
| no_skip = argv.noskip | |
| if no_skip == False: | |
| log(logging.DEBUG, "Skipping creators that are already in the database.") | |
| else: | |
| log(logging.DEBUG, "Not skipping creators that are already in the database.") | |
| ############################### | |
| # Start of the main program...# | |
| ############################### | |
| creators_json, models_json = await load_db_json(argv.db) | |
| startpage = argv.startpage if argv.startpage != -1 else (last_key(creators_json, 'page', 1) + 1 if len(creators_json) > 0 and last_key(creators_json, 'page', 1) == 1 else last_key(creators_json, 'page', 1) + 1) | |
| log(logging.DEBUG, f"Starting from page: {startpage}") | |
| controller_complete = asyncio.Event() | |
| save_padlock = asyncio.Event() | |
| creator_queue = asyncio.Queue(argv.threads) | |
| model_queue = asyncio.Queue(argv.threads) | |
| page_queue = asyncio.Queue(argv.threads) | |
| save_queue = asyncio.Queue(1) | |
| save_padlock.set() | |
| tasks = [ | |
| *[ | |
| asyncio.create_task(controller(page_queue, controller_complete, save_padlock, argv.creatorlimit, startpage)) | |
| ], | |
| *[ | |
| asyncio.create_task(creator_emitter(creator_queue, creators_json, page_queue, controller_complete, save_padlock, no_skip)) | |
| for _ in range(argv.threads) | |
| ], | |
| *[ | |
| asyncio.create_task(model_emitter(creator_queue, model_queue, controller_complete)) | |
| for _ in range(argv.threads) | |
| ], | |
| *[ | |
| asyncio.create_task(model_consumer(model_queue, models_json, controller_complete, save_padlock)) | |
| for _ in range(argv.threads) | |
| ], | |
| *[ | |
| asyncio.create_task(save_db_emitter(creators_json, models_json, argv.db, save_queue, controller_complete, save_padlock, argv.saveinterval)) | |
| ], | |
| *[ | |
| asyncio.create_task(save_db_consumer(save_queue, controller_complete)) | |
| ] | |
| ] | |
| await asyncio.gather(*tasks) | |
| await page_queue.join() | |
| await creator_queue.join() | |
| await model_queue.join() | |
| await save_queue.join() | |
| for task in tasks: | |
| task.cancel() | |
| log(logging.INFO, "Scraping process completed.") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Changelog
1.0.0
๐ Notes:
1.0.1
-save-internalparameter that allows scraper to save to the database periodically.save_emitterandsave_consumerthreads to thread model to control saving and save file padlocking event for safe I/O writes to database file.๐ Notes:
1.0.2
colorama.--no-colorswitch to turn colorized output on or off.brotli(produces smaller database files)๐Notes:
1.0.3
pydanticfor 37% performance increase through object serialization and singleton decorator..envfile support to configure the tool with.envfile placed next to it.๐ Notes: