import click from functools import wraps import mmap import time import multiprocessing import json import os c2n = {"{": 123, "}": 125, "[": 91, "]": 93} track = { c2n["{"]: 0, c2n["["]: 0, } values = { c2n["{"]: 1, c2n["["]: 1, c2n["}"]: -1, c2n["]"]: -1, } normalize = { c2n["{"]: c2n["{"], c2n["["]: c2n["["], c2n["}"]: c2n["{"], c2n["]"]: c2n["["], } ps = { "pnum": -1, "start": -1, "end": -1, "objects": 0, } open_curly_braces = c2n["{"] braces = { c2n["{"]: True, c2n["}"]: True, c2n["["]: True, c2n["]"]: True, } file_obj = None output_dir = None object_defs = None class ArrayEndException(Exception): pass def print_report(fn): @wraps(fn) def decorated(*args, **kwargs): res = fn(*args, **kwargs) click.echo(ps) return res return decorated def track_time(fn): @wraps(fn) def decorated(*args, **kwargs): start_time = time.time() res = fn(*args, **kwargs) end_time = time.time() ps["time"] = end_time - start_time return res return decorated def track_mem(fn): import tracemalloc @wraps(fn) def decorated(*args, **kwargs): b2m = lambda cp: (cp[0] / (1024 * 1024), cp[1] / (1024 * 1024)) tracemalloc.start() first_size, first_peak = b2m(tracemalloc.get_traced_memory()) res = fn(*args, **kwargs) second_size, second_peak = b2m(tracemalloc.get_traced_memory()) ps["mem_peek"] = second_peak - first_peak ps["mem"] = second_size - first_size tracemalloc.stop() return res return decorated def get_out_file(suffix): global file_obj global output_dir name = os.path.join(output_dir, f"{ps['pnum']}-{suffix}.json") mode = "wb" if file_obj: if file_obj.name != name: file_obj.close() file_obj = open(name, mode) else: file_obj = open(name, mode) return file_obj def save(obj, name): global ps ps["objects"] += 1 ps["last_object_len"] = len(obj) # print(ps) out = get_out_file(name) out.write(obj + b"\n") def next_open_braces(mm, start, end, track_array_end=False): x = start found = False while x <= end: if track_array_end and mm[x] == c2n["]"]: raise ArrayEndException() if mm[x] != open_curly_braces: x += 1 else: found = True break return x, found @print_report # @track_mem @track_time def find_json_objects(process_number, filename, start, end, out, defs): """ # start might be in middle of a object, so the # objective is to reach to the start of next array element. # Idea is to skip characters till you find a start of object i.e '{', # read till the end of that object, parse it to json and check if it has keys from `object_defs` in it. # true means you have parsed an array element now continue parsing next array objects. # otherwise skip to next '{' character and repeat the process, until you find the element in array """ global output_dir global object_defs global ps output_dir = out object_defs = defs ps = { "pnum": str(process_number).zfill(3), "start": start, "end": end, "objects": 0, } f = open(filename, "r") mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) obj, obj_name, obj_end, parse_time = parse_first_object(mm, start, end) ps["current"] = obj_end ps["last_obj_parse_time"] = parse_time save(obj, obj_name) i, _ = next_open_braces(mm, obj_end, end) while i <= end: obj, obj_end, parse_time = parse_object(mm, i) if obj: i = obj_end ps["current"] = obj_end ps["last_obj_parse_time"] = parse_time save(obj, obj_name) try: i, _ = next_open_braces(mm, obj_end, end, True) except ArrayEndException: i, next_available = next_open_braces(mm, obj_end, end) if next_available: obj, obj_end, parse_time = parse_object(mm, i) obj_name = detect_object_name(obj) ps["current"] = obj_end ps["last_obj_parse_time"] = parse_time save(obj, obj_name) def detect_object_name(obj): global object_defs actual_keys = set(json.loads(obj).keys()) names = [ name for name, keyset in object_defs.items() if not (set(keyset) - actual_keys) ] if names: return names[0] def parse_first_object(mm, start, end): obj_name = None obj = None obj_end = start # if loop is not executed, return safe position parse_time = None while not obj_name and obj_end < end: i, _ = next_open_braces(mm, obj_end, end) obj, obj_end, parse_time = parse_object(mm, i) if obj: obj_name = detect_object_name(obj) return obj, obj_name, obj_end, parse_time def parse_object(mm, pos_start_curly): if mm[pos_start_curly] != open_curly_braces: next_content = mm[pos_start_curly : pos_start_curly + 100] raise Exception( f"parse_object {pos_start_curly} is not a curly brace but {next_content}" ) # print("parsing from", mm[pos_start_curly : pos_start_curly + 100]) start = time.time() tc = track.copy() i = pos_start_curly c = mm[i] # c is { tc[c] = 1 i = i + 1 while sum(tc.values()) != 0: c = mm[i] if c in braces: val = values[c] tc[normalize[c]] += val i += 1 end = time.time() return mm[pos_start_curly:i], i, end - start def _get_intervals(start, end, segments): intervals = list(range(start, end + 1, int((end - start) / segments))) intervals[-1] = end intervals = list(zip(intervals[:-1], intervals[1:])) click.echo(intervals) return intervals @click.command() @click.option("-f", "--filename", type=click.Path(exists=True)) @click.option("-s", "--start", type=int, default=1) @click.option( "-e", "--end", type=int, default=-1, help="last character position we are interested in, defaults to end of file", ) @click.option( "-c", "--concurrency", type=int, default=1, help="number of processes to use" ) @click.option( "--object-name", type=str, multiple=True, ) @click.option( "--object-key", type=click.UNPROCESSED, callback=lambda _, __, keys: tuple([tuple(key.split(",")) for key in keys]), multiple=True, ) @click.option( "-o", "--out", type=click.Path(writable=True, file_okay=False, resolve_path=True), default=".", ) def start(filename, start, end, concurrency, object_name, object_key, out): """ This is a code in response to a problem explained below: There is a really big json file containing one root object with 2 huge arrays, the file cannot be loaded to memory but we need to rearrange the file to be able to load the file in spark. Any proper json parser will try to parse all the structure before letting us do anything with the object. we tried some parsers including jq's streaming parser, it worked for small sample files(~300MB) but was taking way longer for large sample files (~1GB). This is a pseudo json parser that can extract a json object by matching the opening and closing curly braces, it also uses knowledge of objects(known and always existing keys in object) to confirm that the parsed object is indeed the expected array object. it divides the jobs to multiple processes, each process parses a fixed section of the file, its likely that the start of a process can be in middle of a json object, so the process will skip though some initial characters until it finds the object that we are looking for (my matching keys provided as input) however, the end position is not a hard limit, it will pass though the end limit if an object parsing is in progress, this behavior ensures that characters skipped by one process are captured by another process. """ if len(object_name) != len(object_key): click.echo("for each object-name you must provide object-key", err=True) return 1 if not os.path.exists(out): os.makedirs(out) object_defs = dict(zip(object_name, object_key)) if end == -1: with open(filename, "r") as f: end = f.seek(0, os.SEEK_END) - 1 intervals = _get_intervals(start, end, concurrency) _processes = [] for start, end in intervals: process = multiprocessing.Process( target=find_json_objects, args=(len(_processes) + 1, filename, start, end, out, object_defs), ) process.start() _processes.append(process) for process in _processes: process.join() if __name__ == "__main__": start()