Last active
September 7, 2022 18:25
-
-
Save zkhan93/4683e35795a83c7f9d23b76d3f56c8de to your computer and use it in GitHub Desktop.
huge JSON array objects splitter
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import click | |
| from functools import wraps | |
| import mmap | |
| import time | |
| import multiprocessing | |
| import json | |
| import os | |
| c2n = {"{": 123, "}": 125, "[": 91, "]": 93} | |
| track = { | |
| c2n["{"]: 0, | |
| c2n["["]: 0, | |
| } | |
| values = { | |
| c2n["{"]: 1, | |
| c2n["["]: 1, | |
| c2n["}"]: -1, | |
| c2n["]"]: -1, | |
| } | |
| normalize = { | |
| c2n["{"]: c2n["{"], | |
| c2n["["]: c2n["["], | |
| c2n["}"]: c2n["{"], | |
| c2n["]"]: c2n["["], | |
| } | |
| ps = { | |
| "pnum": -1, | |
| "start": -1, | |
| "end": -1, | |
| "objects": 0, | |
| } | |
| open_curly_braces = c2n["{"] | |
| braces = { | |
| c2n["{"]: True, | |
| c2n["}"]: True, | |
| c2n["["]: True, | |
| c2n["]"]: True, | |
| } | |
| file_obj = None | |
| output_dir = None | |
| object_defs = None | |
| class ArrayEndException(Exception): | |
| pass | |
| def print_report(fn): | |
| @wraps(fn) | |
| def decorated(*args, **kwargs): | |
| res = fn(*args, **kwargs) | |
| click.echo(ps) | |
| return res | |
| return decorated | |
| def track_time(fn): | |
| @wraps(fn) | |
| def decorated(*args, **kwargs): | |
| start_time = time.time() | |
| res = fn(*args, **kwargs) | |
| end_time = time.time() | |
| ps["time"] = end_time - start_time | |
| return res | |
| return decorated | |
| def track_mem(fn): | |
| import tracemalloc | |
| @wraps(fn) | |
| def decorated(*args, **kwargs): | |
| b2m = lambda cp: (cp[0] / (1024 * 1024), cp[1] / (1024 * 1024)) | |
| tracemalloc.start() | |
| first_size, first_peak = b2m(tracemalloc.get_traced_memory()) | |
| res = fn(*args, **kwargs) | |
| second_size, second_peak = b2m(tracemalloc.get_traced_memory()) | |
| ps["mem_peek"] = second_peak - first_peak | |
| ps["mem"] = second_size - first_size | |
| tracemalloc.stop() | |
| return res | |
| return decorated | |
| def get_out_file(suffix): | |
| global file_obj | |
| global output_dir | |
| name = os.path.join(output_dir, f"{ps['pnum']}-{suffix}.json") | |
| mode = "wb" | |
| if file_obj: | |
| if file_obj.name != name: | |
| file_obj.close() | |
| file_obj = open(name, mode) | |
| else: | |
| file_obj = open(name, mode) | |
| return file_obj | |
| def save(obj, name): | |
| global ps | |
| ps["objects"] += 1 | |
| ps["last_object_len"] = len(obj) | |
| # print(ps) | |
| out = get_out_file(name) | |
| out.write(obj + b"\n") | |
| def next_open_braces(mm, start, end, track_array_end=False): | |
| x = start | |
| found = False | |
| while x <= end: | |
| if track_array_end and mm[x] == c2n["]"]: | |
| raise ArrayEndException() | |
| if mm[x] != open_curly_braces: | |
| x += 1 | |
| else: | |
| found = True | |
| break | |
| return x, found | |
| @print_report | |
| # @track_mem | |
| @track_time | |
| def find_json_objects(process_number, filename, start, end, out, defs): | |
| """ | |
| # start might be in middle of a object, so the | |
| # objective is to reach to the start of next array element. | |
| # Idea is to skip characters till you find a start of object i.e '{', | |
| # read till the end of that object, parse it to json and check if it has keys from `object_defs` in it. | |
| # true means you have parsed an array element now continue parsing next array objects. | |
| # otherwise skip to next '{' character and repeat the process, until you find the element in array | |
| """ | |
| global output_dir | |
| global object_defs | |
| global ps | |
| output_dir = out | |
| object_defs = defs | |
| ps = { | |
| "pnum": str(process_number).zfill(3), | |
| "start": start, | |
| "end": end, | |
| "objects": 0, | |
| } | |
| f = open(filename, "r") | |
| mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) | |
| obj, obj_name, obj_end, parse_time = parse_first_object(mm, start, end) | |
| ps["current"] = obj_end | |
| ps["last_obj_parse_time"] = parse_time | |
| save(obj, obj_name) | |
| i, _ = next_open_braces(mm, obj_end, end) | |
| while i <= end: | |
| obj, obj_end, parse_time = parse_object(mm, i) | |
| if obj: | |
| i = obj_end | |
| ps["current"] = obj_end | |
| ps["last_obj_parse_time"] = parse_time | |
| save(obj, obj_name) | |
| try: | |
| i, _ = next_open_braces(mm, obj_end, end, True) | |
| except ArrayEndException: | |
| i, next_available = next_open_braces(mm, obj_end, end) | |
| if next_available: | |
| obj, obj_end, parse_time = parse_object(mm, i) | |
| obj_name = detect_object_name(obj) | |
| ps["current"] = obj_end | |
| ps["last_obj_parse_time"] = parse_time | |
| save(obj, obj_name) | |
| def detect_object_name(obj): | |
| global object_defs | |
| actual_keys = set(json.loads(obj).keys()) | |
| names = [ | |
| name for name, keyset in object_defs.items() if not (set(keyset) - actual_keys) | |
| ] | |
| if names: | |
| return names[0] | |
| def parse_first_object(mm, start, end): | |
| obj_name = None | |
| obj = None | |
| obj_end = start # if loop is not executed, return safe position | |
| parse_time = None | |
| while not obj_name and obj_end < end: | |
| i, _ = next_open_braces(mm, obj_end, end) | |
| obj, obj_end, parse_time = parse_object(mm, i) | |
| if obj: | |
| obj_name = detect_object_name(obj) | |
| return obj, obj_name, obj_end, parse_time | |
| def parse_object(mm, pos_start_curly): | |
| if mm[pos_start_curly] != open_curly_braces: | |
| next_content = mm[pos_start_curly : pos_start_curly + 100] | |
| raise Exception( | |
| f"parse_object {pos_start_curly} is not a curly brace but {next_content}" | |
| ) | |
| # print("parsing from", mm[pos_start_curly : pos_start_curly + 100]) | |
| start = time.time() | |
| tc = track.copy() | |
| i = pos_start_curly | |
| c = mm[i] # c is { | |
| tc[c] = 1 | |
| i = i + 1 | |
| while sum(tc.values()) != 0: | |
| c = mm[i] | |
| if c in braces: | |
| val = values[c] | |
| tc[normalize[c]] += val | |
| i += 1 | |
| end = time.time() | |
| return mm[pos_start_curly:i], i, end - start | |
| def _get_intervals(start, end, segments): | |
| intervals = list(range(start, end + 1, int((end - start) / segments))) | |
| intervals[-1] = end | |
| intervals = list(zip(intervals[:-1], intervals[1:])) | |
| click.echo(intervals) | |
| return intervals | |
| @click.command() | |
| @click.option("-f", "--filename", type=click.Path(exists=True)) | |
| @click.option("-s", "--start", type=int, default=1) | |
| @click.option( | |
| "-e", | |
| "--end", | |
| type=int, | |
| default=-1, | |
| help="last character position we are interested in, defaults to end of file", | |
| ) | |
| @click.option( | |
| "-c", "--concurrency", type=int, default=1, help="number of processes to use" | |
| ) | |
| @click.option( | |
| "--object-name", | |
| type=str, | |
| multiple=True, | |
| ) | |
| @click.option( | |
| "--object-key", | |
| type=click.UNPROCESSED, | |
| callback=lambda _, __, keys: tuple([tuple(key.split(",")) for key in keys]), | |
| multiple=True, | |
| ) | |
| @click.option( | |
| "-o", | |
| "--out", | |
| type=click.Path(writable=True, file_okay=False, resolve_path=True), | |
| default=".", | |
| ) | |
| def start(filename, start, end, concurrency, object_name, object_key, out): | |
| """ | |
| This is a code in response to a problem explained below: | |
| There is a really big json file containing one root object with 2 huge arrays, | |
| the file cannot be loaded to memory but we need to rearrange the file to be able to load the file in spark. | |
| Any proper json parser will try to parse all the structure before letting us do anything with the object. | |
| we tried some parsers including jq's streaming parser, it worked for small sample files(~300MB) but was taking way longer for large sample files (~1GB). | |
| This is a pseudo json parser that can extract a json object by matching the opening and closing curly braces, it also uses knowledge of objects(known and always existing keys in object) | |
| to confirm that the parsed object is indeed the expected array object. | |
| it divides the jobs to multiple processes, each process parses a fixed section of the file, | |
| its likely that the start of a process can be in middle of a json object, so the process will skip though some initial characters until it finds the object that we are looking for (my matching keys provided as input) | |
| however, the end position is not a hard limit, it will pass though the end limit if an object parsing is in progress, this behavior ensures that characters skipped by one process are captured by another process. | |
| """ | |
| if len(object_name) != len(object_key): | |
| click.echo("for each object-name you must provide object-key", err=True) | |
| return 1 | |
| if not os.path.exists(out): | |
| os.makedirs(out) | |
| object_defs = dict(zip(object_name, object_key)) | |
| if end == -1: | |
| with open(filename, "r") as f: | |
| end = f.seek(0, os.SEEK_END) - 1 | |
| intervals = _get_intervals(start, end, concurrency) | |
| _processes = [] | |
| for start, end in intervals: | |
| process = multiprocessing.Process( | |
| target=find_json_objects, | |
| args=(len(_processes) + 1, filename, start, end, out, object_defs), | |
| ) | |
| process.start() | |
| _processes.append(process) | |
| for process in _processes: | |
| process.join() | |
| if __name__ == "__main__": | |
| start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment