import json import sys import traceback import os import re import asyncio import inspect import functools import loguru from loguru import logger from loguru._better_exceptions import ExceptionFormatter PIPELINE_NAME = "YOUR PROJECT" SERIALIZED_LOGGING = True def colorize_json_string(json_str): """ Apply colorization to a JSON string after it's been serialized. Colorize message based on the color of the level. """ # Define color codes reset = "\033[0m" green = "\033[32m" # For timestamp and success level cyan = "\033[36m" # For DEBUG level and paths white = "\033[0m" # For INFO level yellow = "\033[33m" # For WARNING level red = "\033[31m" # For ERROR level magenta = "\033[35m" # For CRITICAL level white_on_red = "\033[37;41m" # For CRITICAL level # Find and colorize the timestamp json_str = re.sub(r'("time": ")([^"]+)(")', rf'\1{green}\2{reset}\3', json_str) # Extract the level before colorizing to determine message color level_match = re.search(r'"level": "([^"]+)"', json_str) level_color = white # Default color if level_match: level = level_match.group(1) if level == "DEBUG": level_color = cyan elif level == "INFO": level_color = white elif level == "WARNING": level_color = yellow elif level == "ERROR": level_color = red elif level == "SUCCESS": level_color = green elif level == "CRITICAL": level_color = white_on_red # Find and colorize the log level json_str = re.sub(r'("level": ")DEBUG(")', rf'\1{cyan}DEBUG{reset}\2', json_str) json_str = re.sub(r'("level": ")INFO(")', rf'\1{white}INFO{reset}\2', json_str) json_str = re.sub(r'("level": ")WARNING(")', rf'\1{yellow}WARNING{reset}\2', json_str) json_str = re.sub(r'("level": ")ERROR(")', rf'\1{red}ERROR{reset}\2', json_str) json_str = re.sub(r'("level": ")SUCCESS(")', rf'\1{green}SUCCESS{reset}\2', json_str) json_str = re.sub(r'("level": ")CRITICAL(")', rf'\1{white_on_red}CRITICAL{reset}\2', json_str) # Find and colorize the message using the level color json_str = re.sub(r'("message": ")(.*?)(")', rf'\1{level_color}\2{reset}\3', json_str) # Find and colorize the path json_str = re.sub(r'("path": ")(.*?)(")', rf'\1{cyan}\2{reset}\3', json_str) # Find and colorize exceptions json_str = re.sub(r'("type": ")(.*?)(")', rf'\1{red}\2{reset}\3', json_str) json_str = re.sub(r'("value": ")(.*?)(")', rf'\1{red}\2{reset}\3', json_str) return json_str def serialize(record): """Serialize with datetime, path info, and apply colorization to the JSON string.""" # Extract datetime timestamp = record["time"].isoformat(timespec='milliseconds') # Extract file path, module, function and line info file_path = record["file"].path module_name = record["name"] function_name = record["function"] line_number = record["line"] # Special handling for Jupyter notebooks if module_name.isdigit() or "ipython-input" in str(file_path).lower(): # Check if we're in a Jupyter notebook try: # Try to get the notebook name if possible import IPython notebook_path = IPython.get_ipython().kernel.session.config.get('IPKernelApp', {}).get('connection_file', '') if notebook_path: notebook_name = os.path.basename(notebook_path).split('.', 1)[0] module_name = f"jupyter.{notebook_name}" else: module_name = "__main__" except (ImportError, AttributeError): module_name = "__main__" # Fallback name for Jupyter environments path_info = f"{module_name}:{function_name}:{line_number}" # Get log level level = record["level"].name # Extract other info error: loguru.RecordException = record["exception"] error_by_default = sys.exc_info() # logger.error pipeline: str | None = record["extra"].get("pipeline", None) show_exception_value: bool = record["extra"].get("show_exception_value", True) extra = record["extra"].copy() extra.update({"pipeline": pipeline}) # Process exception info if error: # only set when exception. exc_type, exc_value, exc_tb = error.type, error.value, error.traceback # Use ExceptionFormatter directly with the specific error components formatter = ExceptionFormatter(backtrace=True, diagnose=True, colorize=True) formatted_traceback = formatter.format_exception(exc_type, exc_value, exc_tb) exception = { "type": exc_type.__name__, "value": str(exc_value).strip("'") if show_exception_value else None, "traceback": "".join(formatted_traceback), } elif error_by_default[0]: # whenever error occurs _type, _value, _ = sys.exc_info() exception = { "type": _type.__name__, "value": str(_value).strip("'") if show_exception_value else None, "traceback": None, } else: exception = None # Prepare data for serialization to_serialize = { "time": timestamp, "level": level, "path": path_info, "message": record["message"], "pipeline": pipeline, "exception": exception, } # Add other extra fields for key, value in extra.items(): if key not in ("pipeline", "serialized", "show_exception_value"): to_serialize[key] = value # Convert to JSON string json_str = json.dumps(to_serialize) # Colorize the JSON string return colorize_json_string(json_str) def patching(record): """Patch the logger.""" record["extra"]["serialized"] = serialize(record) def get_contextualized_logger( pipeline_name: str = PIPELINE_NAME, default_logger=logger ): """Generates a contextualized logger with pipeline_name.""" if not SERIALIZED_LOGGING: # Replace with your SERIALIZED_LOGGING variable return default_logger default_logger.remove() default_logger = default_logger.patch(patching) default_logger.add( sink=sys.stdout, colorize=True, serialize=False, # custom serialization requires this to be False backtrace=True, diagnose=True, level="INFO", format="{extra[serialized]}", ) return default_logger.bind(pipeline="P1") def logger_with_context(*param_specs, **fixed_context): """Decorator that automatically extracts context from specified function parameters. Works with both synchronous and asynchronous functions. Args: *param_specs: Parameter specifications, which can be: - Simple parameter names (e.g., "task") - Attribute paths (e.g., "task.id" to get task.id) - Dictionaries to include directly in context **fixed_context: Additional fixed context values Returns: Decorated function with automatic context logging """ # Process param_specs to separate dictionaries from string specifications processed_param_specs = [] additional_context = {} for spec in param_specs: if isinstance(spec, dict): # If a dictionary is passed as a positional argument, # add it to additional context additional_context.update(spec) else: # Otherwise, treat it as a parameter name or attribute path processed_param_specs.append(spec) # Merge fixed_context with any dictionaries from param_specs merged_fixed_context = dict(additional_context) merged_fixed_context.update(fixed_context) def decorator(func): # Check if the function is async or sync is_async = asyncio.iscoroutinefunction(func) def get_context_from_args(bound_args): # Build context dictionary from parameters and fixed values context = dict(merged_fixed_context) # Process parameter specifications for spec in processed_param_specs: if "." in spec: # Handle object attribute access (e.g., "task.id") obj_name, attr_name = spec.split(".", 1) if obj_name not in bound_args.arguments: continue obj = bound_args.arguments[obj_name] if obj is None: continue # Try to get the attribute, gracefully handle missing attributes try: # Use the attribute name as the context key if attr_name == "oid": modified_attr_name = "id" else: modified_attr_name = attr_name context[f"{obj_name}_{modified_attr_name}"] = getattr( obj, attr_name ) except AttributeError: logger.warning(f"Object {obj_name} has no attribute {attr_name}") else: # Handle direct parameter access if spec in bound_args.arguments: param_value = bound_args.arguments[spec] if isinstance(param_value, dict): # If parameter is a dictionary, merge it into context context.update(param_value) else: context[spec] = param_value return context @functools.wraps(func) async def async_wrapper(*args, **kwargs): # Get parameter values from function call sig = inspect.signature(func) bound_args = sig.bind(*args, **kwargs) bound_args.apply_defaults() # Get context from arguments context = get_context_from_args(bound_args) # Execute async function with context with logger.contextualize(**context): result = await func(*args, **kwargs) return result @functools.wraps(func) def sync_wrapper(*args, **kwargs): # Get parameter values from function call sig = inspect.signature(func) bound_args = sig.bind(*args, **kwargs) bound_args.apply_defaults() # Get context from arguments context = get_context_from_args(bound_args) # Execute sync function with context with logger.contextualize(**context): result = func(*args, **kwargs) return result # Return the appropriate wrapper based on the function type return async_wrapper if is_async else sync_wrapper return decorator