#!/usr/bin/env python3 import fileinput import json import re import sys from argparse import ArgumentParser from dataclasses import dataclass from pprint import pprint from typing import Any, Dict, List, Optional, Union DEFAULT_JSON_INDENT = 4 FORMAT = "{asctime} {severity} {name} {module}:{funcName}:{lineno} {message}" REQUIRED_KEYS = "{asctime} {severity} {name} {module}:{funcName}:{lineno} {message}" # Stack traces or exception tracebacks KEYS_WITH_STACK_TRACES = [ "exc_info", "stack_info", ] KNOWN_KEYS = [ "name", "asctime", "severity", "module", "funcName", "lineno", "message", ] + KEYS_WITH_STACK_TRACES DEFAULT_IGNORED_KEYS = [ "filename", "pathname", ] StrOrBytes = Union[str, bytes] ParsedLog = Dict[str, Any] @dataclass class Arguments: files: List[str] fail_if_non_json: bool missing_keys_warnings: bool ignored_keys: List[str] show_stack_traces: bool show_extra_keys: bool show_message: bool show_non_json: bool strip_stack_traces: bool replace_stack_trace_newlines: Optional[str] extra_keys_sep: str json_indent: Optional[int] def dataclass_to_dict(dataclass_) -> Dict[str, Any]: return { key: getattr(dataclass_, key) for key in dataclass_.__dataclass_fields__.keys() } def parse_plain_json(line): try: return json.loads(line) except json.decoder.JSONDecodeError: return None def parse_loki_json(line): try: return parse_plain_json(line.split("\t", 1)[1]) except IndexError: return None JSON_PARSERS = [parse_plain_json, parse_loki_json] def parse( line: StrOrBytes, fail_if_non_json: bool = False ) -> Union[ParsedLog, StrOrBytes]: for parser in JSON_PARSERS: log_msg = parser(line) if log_msg is not None: return log_msg if fail_if_non_json: print( "FATAL: could not parse JSON with parsers", ", ".join(func.__name__ for func in JSON_PARSERS), ) print(line[:-1]) sys.exit(1) return line def format_extra_keys(log_msg: ParsedLog, args: Arguments) -> str: extra_items = { key: value for key, value in log_msg.items() if not (key in KNOWN_KEYS or key in args.ignored_keys) } output = "" if args.show_extra_keys and len(extra_items) > 0: # Add newlines between msg components if args.show_message: output += args.extra_keys_sep output += json.dumps(extra_items, indent=args.json_indent) return output def strip_stack_trace(stack_trace: str) -> str: lines = stack_trace.split("\n") last_file_location_index = len(lines) - 1 for index, line in enumerate(lines): if re.match('\\s*File ".*", line .*', line): last_file_location_index = index return "\n".join(lines[:2] + lines[last_file_location_index:]) def format_stack_traces( log_msg: ParsedLog, newline_at_start: bool, strip: bool, replace_newlines: Optional[str] = None, ) -> str: output = "" for i, field in enumerate(KEYS_WITH_STACK_TRACES): if field in log_msg: if log_msg[field] == "NoneType: None": continue # Add newlines between msg components if i == 0 and not newline_at_start: exc_sep = "" else: exc_sep = replace_newlines if replace_newlines is not None else "\n" stack_trace = log_msg[field] if strip: stack_trace = strip_stack_trace(stack_trace) if replace_newlines: stack_trace = stack_trace.replace("\n", replace_newlines) output += exc_sep + stack_trace return output def format_log_msg( log_msg: Union[ParsedLog, StrOrBytes], original_line: StrOrBytes, args: Arguments ) -> Optional[StrOrBytes]: try: if not isinstance(log_msg, dict): raise ValueError(f"expected dict, received {type(log_msg)}") msg = "" if args.show_message: msg += FORMAT.format(**{"name": "|"} | log_msg) msg += format_extra_keys(log_msg, args) if args.show_stack_traces: msg += format_stack_traces( log_msg, newline_at_start=args.show_extra_keys or args.show_message, strip=args.strip_stack_traces, replace_newlines=args.replace_stack_trace_newlines, ) if len(msg) == 0: return None return msg except (ValueError, KeyError) as exc: if isinstance(exc, KeyError) and args.missing_keys_warnings: print("WARNING: missing keys", file=sys.stderr) if args.show_non_json: # Exclude trailing newline return original_line[:-1] def parse_args() -> Arguments: parser = ArgumentParser( description="""Convert JSON logs from stdin or files into readable output. The JSON keys are assumed to come from Python. Stack trace and exception tracebacks will be searched for in the keys: """ + (", ".join(KEYS_WITH_STACK_TRACES)) ) parser.add_argument( "files", help="Which log files to read. Reads from stdin as well.", nargs="*" ) parser.add_argument( "--missing-keys-warnings", help="Print a warning to stderr if there are missing required keys in JSON logs.", action="store_true", ) parser.add_argument( "--no-extra-keys", help="Do not print extra keys in a JSON log line", action="store_true", ) parser.add_argument( "--no-message", help="Do not print the main message. Only the stack traces or extra keys", action="store_true", ) non_json_group = parser.add_mutually_exclusive_group() non_json_group.add_argument( "--no-non-json", help="Hide non-JSON input instead of printing it as is", action="store_true", ) non_json_group.add_argument( "--only-non-json", help="Only show non-JSON input", action="store_true", ) non_json_group.add_argument( "--fail-if-non-json", help="Exit immediately if some lines cannot be parsed as JSON.", action="store_true", ) stack_trace_group = parser.add_mutually_exclusive_group() stack_trace_group.add_argument( "--no-stack-traces", help="Do not print Python stack traces or exception tracebacks.", action="store_true", ) stack_trace_group.add_argument( "--only-stack-traces", help="Only print JSON log lines that contain Python stack traces or exception tracebacks.", action="store_true", ) parser.add_argument( "--strip-stack-traces", help="Print only the first two lines and last few lines, including the last file position, of Python stack traces or exception tracebacks.", action="store_true", ) parser.add_argument( "--replace-stack-trace-newlines", help="Print Python stack traces or exception tracebacks, but replace newlines with the argument.", action="store", default=None, ) parser.add_argument( "--extra-keys-on-same-line", help="Print extra keys on the same line as the formatted log message", action="store_true", ) parser.add_argument( "--stack-traces-one-line", help="Print the stack traces and exception tracebacks on a single line. Same as \"--replace-stack-trace-newlines ' '\"", action="store_true", ) parser.add_argument( "--one-line", help="Print the formatted log message on a single line. Excludes stack traces and exception tracebacks.", action="store_true", ) parser.add_argument( "--ignore", help="Specify a key which must be ignored. Can be given multiple times", action="append", default=None, ) # TODO # parser.add_argument("--message-format") # parser.add_argument("--known-keys") json_formatting = parser.add_mutually_exclusive_group() json_formatting.add_argument( "--json-indent", help="How much to indent extra keys when pretty-printing them as JSON.", default=DEFAULT_JSON_INDENT, ) json_formatting.add_argument( "--compact-json", help="Print extra keys in a compact format", action="store_true", ) parser.add_argument("--print-arguments", action="store_true") args = parser.parse_args() # convenience for --no-extra --no-message if args.only_stack_traces: args.no_extra_keys = True args.no_message = True if args.stack_traces_one_line: args.replace_stack_trace_newlines = " " application_args = Arguments( files=args.files, missing_keys_warnings=args.missing_keys_warnings, show_stack_traces=not args.no_stack_traces and not args.only_non_json, show_extra_keys=not args.no_extra_keys and not args.only_non_json, show_message=not args.no_message and not args.only_non_json, show_non_json=args.only_non_json or not args.no_non_json, fail_if_non_json=args.fail_if_non_json, strip_stack_traces=args.strip_stack_traces, replace_stack_trace_newlines=args.replace_stack_trace_newlines, extra_keys_sep=" " if args.one_line or args.extra_keys_on_same_line else "\n", json_indent=None if args.one_line or args.compact_json else args.json_indent, ignored_keys=DEFAULT_IGNORED_KEYS if args.ignore is None else args.ignore, ) if args.print_arguments: print("Arguments:", file=sys.stderr) pprint(dataclass_to_dict(application_args), stream=sys.stderr) return application_args def main(): args = parse_args() with fileinput.input(files=args.files, mode="r") as fileinputinput: try: for line in fileinputinput: parsed_json = parse(line, args.fail_if_non_json) readable_message = format_log_msg(parsed_json, line, args) if readable_message is not None: print(readable_message) except KeyboardInterrupt: return if __name__ == "__main__": main()