Last active
September 7, 2022 18:25
-
-
Save zkhan93/4683e35795a83c7f9d23b76d3f56c8de to your computer and use it in GitHub Desktop.
Revisions
-
zkhan93 revised this gist
Sep 7, 2022 . 1 changed file with 4 additions and 3 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -254,7 +254,7 @@ def _get_intervals(start, end, segments): @click.option( "-o", "--out", type=click.Path(writable=True, file_okay=False, resolve_path=True), default=".", ) def start(filename, start, end, concurrency, object_name, object_key, out): @@ -275,7 +275,8 @@ def start(filename, start, end, concurrency, object_name, object_key, out): if len(object_name) != len(object_key): click.echo("for each object-name you must provide object-key", err=True) return 1 if not os.path.exists(out): os.makedirs(out) object_defs = dict(zip(object_name, object_key)) if end == -1: with open(filename, "r") as f: @@ -295,4 +296,4 @@ def start(filename, start, end, concurrency, object_name, object_key, out): if __name__ == "__main__": start() -
zkhan93 revised this gist
Sep 7, 2022 . No changes.There are no files selected for viewing
-
zkhan93 revised this gist
Sep 7, 2022 . 1 changed file with 3 additions and 4 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -164,15 +164,13 @@ def find_json_objects(process_number, filename, start, end, out, defs): try: i, _ = next_open_braces(mm, obj_end, end, True) except ArrayEndException: i, next_available = next_open_braces(mm, obj_end, end) if next_available: obj, obj_end, parse_time = parse_object(mm, i) obj_name = detect_object_name(obj) ps["current"] = obj_end ps["last_obj_parse_time"] = parse_time save(obj, obj_name) def detect_object_name(obj): @@ -296,4 +294,5 @@ def start(filename, start, end, concurrency, object_name, object_key, out): if __name__ == "__main__": start() -
zkhan93 revised this gist
Sep 7, 2022 . 1 changed file with 17 additions and 11 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -23,8 +23,6 @@ c2n["}"]: c2n["{"], c2n["]"]: c2n["["], } ps = { "pnum": -1, "start": -1, @@ -38,6 +36,9 @@ c2n["["]: True, c2n["]"]: True, } file_obj = None output_dir = None object_defs = None class ArrayEndException(Exception): @@ -124,7 +125,7 @@ def next_open_braces(mm, start, end, track_array_end=False): @print_report # @track_mem @track_time def find_json_objects(process_number, filename, start, end, out, defs): """ # start might be in middle of a object, so the # objective is to reach to the start of next array element. @@ -133,20 +134,22 @@ def find_json_objects(process_number, filename, start, end, object_defs, out): # true means you have parsed an array element now continue parsing next array objects. # otherwise skip to next '{' character and repeat the process, until you find the element in array """ global output_dir global object_defs global ps output_dir = out object_defs = defs ps = { "pnum": str(process_number).zfill(3), "start": start, "end": end, "objects": 0, } f = open(filename, "r") mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) obj, obj_name, obj_end, parse_time = parse_first_object(mm, start, end) ps["current"] = obj_end ps["last_obj_parse_time"] = parse_time save(obj, obj_name) @@ -165,14 +168,15 @@ def find_json_objects(process_number, filename, start, end, object_defs, out): i, found = next_open_braces(mm, obj_end, end) if found: obj, obj_end, parse_time = parse_object(mm, i) obj_name = detect_object_name(obj) ps["current"] = obj_end ps["last_obj_parse_time"] = parse_time save(obj, obj_name) return def detect_object_name(obj): global object_defs actual_keys = set(json.loads(obj).keys()) names = [ name for name, keyset in object_defs.items() if not (set(keyset) - actual_keys) @@ -181,7 +185,7 @@ def detect_object_name(obj, object_defs): return names[0] def parse_first_object(mm, start, end): obj_name = None obj = None obj_end = start # if loop is not executed, return safe position @@ -190,7 +194,7 @@ def parse_first_object(mm, start, end, object_defs): i, _ = next_open_braces(mm, obj_end, end) obj, obj_end, parse_time = parse_object(mm, i) if obj: obj_name = detect_object_name(obj) return obj, obj_name, obj_end, parse_time @@ -269,9 +273,11 @@ def start(filename, start, end, concurrency, object_name, object_key, out): its likely that the start of a process can be in middle of a json object, so the process will skip though some initial characters until it finds the object that we are looking for (my matching keys provided as input) however, the end position is not a hard limit, it will pass though the end limit if an object parsing is in progress, this behavior ensures that characters skipped by one process are captured by another process. """ if len(object_name) != len(object_key): click.echo("for each object-name you must provide object-key", err=True) return 1 object_defs = dict(zip(object_name, object_key)) if end == -1: with open(filename, "r") as f: @@ -281,7 +287,7 @@ def start(filename, start, end, concurrency, object_name, object_key, out): for start, end in intervals: process = multiprocessing.Process( target=find_json_objects, args=(len(_processes) + 1, filename, start, end, out, object_defs), ) process.start() _processes.append(process) -
zkhan93 revised this gist
Sep 7, 2022 . 1 changed file with 4 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,4 +1,6 @@ import click from functools import wraps import mmap import time import multiprocessing import json @@ -288,4 +290,4 @@ def start(filename, start, end, concurrency, object_name, object_key, out): if __name__ == "__main__": start() -
zkhan93 revised this gist
Sep 7, 2022 . 1 changed file with 35 additions and 43 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,11 +1,42 @@ map import time import multiprocessing import json import os c2n = {"{": 123, "}": 125, "[": 91, "]": 93} track = { c2n["{"]: 0, c2n["["]: 0, } values = { c2n["{"]: 1, c2n["["]: 1, c2n["}"]: -1, c2n["]"]: -1, } normalize = { c2n["{"]: c2n["{"], c2n["["]: c2n["["], c2n["}"]: c2n["{"], c2n["]"]: c2n["["], } file_obj = None output_dir = None ps = { "pnum": -1, "start": -1, "end": -1, "objects": 0, } open_curly_braces = c2n["{"] braces = { c2n["{"]: True, c2n["}"]: True, c2n["["]: True, c2n["]"]: True, } class ArrayEndException(Exception): pass @@ -51,34 +82,6 @@ def decorated(*args, **kwargs): return decorated def get_out_file(suffix): global file_obj global output_dir @@ -102,9 +105,6 @@ def save(obj, name): out.write(obj + b"\n") def next_open_braces(mm, start, end, track_array_end=False): x = start found = False @@ -192,14 +192,6 @@ def parse_first_object(mm, start, end, object_defs): return obj, obj_name, obj_end, parse_time def parse_object(mm, pos_start_curly): if mm[pos_start_curly] != open_curly_braces: next_content = mm[pos_start_curly : pos_start_curly + 100] @@ -296,4 +288,4 @@ def start(filename, start, end, concurrency, object_name, object_key, out): if __name__ == "__main__": start() -
zkhan93 revised this gist
Sep 7, 2022 . 1 changed file with 139 additions and 102 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,30 +1,21 @@ import click from functools import wraps import mmap import time import multiprocessing import json import os class ArrayEndException(Exception): pass def print_report(fn): @wraps(fn) def decorated(*args, **kwargs): res = fn(*args, **kwargs) click.echo(ps) return res return decorated @@ -54,7 +45,6 @@ def decorated(*args, **kwargs): second_size, second_peak = b2m(tracemalloc.get_traced_memory()) ps["mem_peek"] = second_peak - first_peak ps["mem"] = second_size - first_size tracemalloc.stop() return res @@ -80,6 +70,7 @@ def decorated(*args, **kwargs): c2n["]"]: c2n["["], } file_obj = None output_dir = None ps = { "pnum": -1, "start": -1, @@ -88,89 +79,119 @@ def decorated(*args, **kwargs): } def get_out_file(suffix): global file_obj global output_dir name = os.path.join(output_dir, f"{ps['pnum']}-{suffix}.json") mode = "wb" if file_obj: if file_obj.name != name: file_obj.close() file_obj = open(name, mode) else: file_obj = open(name, mode) return file_obj def save(obj, name): global ps ps["objects"] += 1 ps["last_object_len"] = len(obj) # print(ps) out = get_out_file(name) out.write(obj + b"\n") open_curly_braces = c2n["{"] def next_open_braces(mm, start, end, track_array_end=False): x = start found = False while x <= end: if track_array_end and mm[x] == c2n["]"]: raise ArrayEndException() if mm[x] != open_curly_braces: x += 1 else: found = True break return x, found @print_report # @track_mem @track_time def find_json_objects(process_number, filename, start, end, object_defs, out): """ # start might be in middle of a object, so the # objective is to reach to the start of next array element. # Idea is to skip characters till you find a start of object i.e '{', # read till the end of that object, parse it to json and check if it has keys from `object_defs` in it. # true means you have parsed an array element now continue parsing next array objects. # otherwise skip to next '{' character and repeat the process, until you find the element in array """ global ps global output_dir ps = { "pnum": str(process_number).zfill(3), "start": start, "end": end, "objects": 0, } output_dir = out f = open(filename, "r") mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) obj, obj_name, obj_end, parse_time = parse_first_object(mm, start, end, object_defs) ps["current"] = obj_end ps["last_obj_parse_time"] = parse_time save(obj, obj_name) i, _ = next_open_braces(mm, obj_end, end) while i <= end: obj, obj_end, parse_time = parse_object(mm, i) if obj: i = obj_end ps["current"] = obj_end ps["last_obj_parse_time"] = parse_time save(obj, obj_name) try: i, _ = next_open_braces(mm, obj_end, end, True) except ArrayEndException: print(ps["pnum"], "array ended for", obj_end, end) i, found = next_open_braces(mm, obj_end, end) if found: obj, obj_end, parse_time = parse_object(mm, i) obj_name = detect_object_name(obj, object_defs) ps["current"] = obj_end ps["last_obj_parse_time"] = parse_time save(obj, obj_name) return def detect_object_name(obj, object_defs): actual_keys = set(json.loads(obj).keys()) names = [ name for name, keyset in object_defs.items() if not (set(keyset) - actual_keys) ] if names: return names[0] def parse_first_object(mm, start, end, object_defs): obj_name = None obj = None obj_end = start # if loop is not executed, return safe position parse_time = None while not obj_name and obj_end < end: i, _ = next_open_braces(mm, obj_end, end) obj, obj_end, parse_time = parse_object(mm, i) if obj: obj_name = detect_object_name(obj, object_defs) return obj, obj_name, obj_end, parse_time braces = { c2n["{"]: True, c2n["}"]: True, @@ -206,57 +227,73 @@ def _get_intervals(start, end, segments): intervals = list(range(start, end + 1, int((end - start) / segments))) intervals[-1] = end intervals = list(zip(intervals[:-1], intervals[1:])) click.echo(intervals) return intervals @click.command() @click.option("-f", "--filename", type=click.Path(exists=True)) @click.option("-s", "--start", type=int, default=1) @click.option( "-e", "--end", type=int, default=-1, help="last character position we are interested in, defaults to end of file", ) @click.option( "-c", "--concurrency", type=int, default=1, help="number of processes to use" ) @click.option( "--object-name", type=str, multiple=True, ) @click.option( "--object-key", type=click.UNPROCESSED, callback=lambda _, __, keys: tuple([tuple(key.split(",")) for key in keys]), multiple=True, ) @click.option( "-o", "--out", type=click.Path(exists=True, writable=True, file_okay=False, resolve_path=True), default=".", ) def start(filename, start, end, concurrency, object_name, object_key, out): """ This is a code in response to a problem explained below: There is a really big json file containing one root object with 2 huge arrays, the file cannot be loaded to memory but we need to rearrange the file to be able to load the file in spark. Any proper json parser will try to parse all the structure before letting us do anything with the object. we tried some parsers including jq's streaming parser, it worked for small sample files(~300MB) but was taking way longer for large sample files (~1GB). This is a pseudo json parser that can extract a json object by matching the opening and closing curly braces, it also uses knowledge of objects(known and always existing keys in object) to confirm that the parsed object is indeed the expected array object. it divides the jobs to multiple processes, each process parses a fixed section of the file, its likely that the start of a process can be in middle of a json object, so the process will skip though some initial characters until it finds the object that we are looking for (my matching keys provided as input) however, the end position is not a hard limit, it will pass though the end limit if an object parsing is in progress, this behavior ensures that characters skipped by one process are captured by another process. """ if len(object_name) != len(object_key): click.echo("for each object-name you must provide object-key", err=True) return 1 object_defs = dict(zip(object_name, object_key)) if end == -1: with open(filename, "r") as f: end = f.seek(0, os.SEEK_END) - 1 intervals = _get_intervals(start, end, concurrency) _processes = [] for start, end in intervals: process = multiprocessing.Process( target=find_json_objects, args=(len(_processes) + 1, filename, start, end, object_defs, out), ) process.start() _processes.append(process) for process in _processes: process.join() if __name__ == "__main__": start() -
zkhan93 revised this gist
Aug 15, 2022 . No changes.There are no files selected for viewing
-
zkhan93 revised this gist
Aug 14, 2022 . 1 changed file with 22 additions and 16 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -122,13 +122,13 @@ def find_json_objects( filename, start, end, keysets_to_test, ): """ # start might be in middle of a object, so the # objective is to reach to the start of next array element. # Idea is to skip characters till you find a start of object i.e '{', # read till the end of that object, parse it to json and check if it has keys from `keysets_to_test` in it. # true means you have parsed an array element now continue parsing next array objects. # otherwise skip to next '{' character and repeat the process, until you find the element in array """ @@ -150,7 +150,10 @@ def find_json_objects( save(obj) return # no object in defined start, end limit actual_keys = set(json.loads(obj).keys()) object_matched = any( not (set(keyset) - actual_keys) for keyset in keysets_to_test ) if not object_matched: pass # print("process: ", process_number, "not found", i, end, obj[:100]) else: @@ -169,9 +172,6 @@ def find_json_objects( braces = { c2n["{"]: True, c2n["}"]: True, c2n["["]: True, @@ -195,7 +195,8 @@ def parse_object(mm, pos_start_curly): while sum(tc.values()) != 0: c = mm[i] if c in braces: val = values[c] tc[normalize[c]] += val i += 1 end = time.time() return mm[pos_start_curly:i], i, end - start @@ -209,14 +210,14 @@ def _get_intervals(start, end, segments): return intervals def start(filename, begin, end, processes, keysets_to_test): intervals = _get_intervals(begin, end, processes) _procs = [] for begin, end in intervals: proc = multiprocessing.Process( target=find_json_objects, args=(len(_procs) + 1, filename, begin, end, keysets_to_test), ) proc.start() _procs.append(proc) @@ -229,9 +230,12 @@ def start(filename, begin, end, processes, keys_to_test): # ["billing_code_type", "negotiation_arrangement"], # ["negotiation_arrangement","billing_code",] # ["name", "billing_code", "negotiated_rates"] keysets = [ [ "negotiation_arrangement", "billing_code", ], ["provider_group_id", "provider_groups"], ] # 250 MB @@ -247,10 +251,12 @@ def start(filename, begin, end, processes, keys_to_test): # filename = "2022-08_890_58D0_in-network-rates_10_of_16.json" # 11 GB begin = 3 end = 12064520309 processes = 10 filename = "2022-08_040_05C0_in-network-rates_24_of_25.json" # 1:19.65 total - in_network only (skipped 78% data here to reach the in_network) - 5 processes # 6:29.97 total - both the arrays 5 processes # 3:59.33 total - both arrays 10 processes start(filename, begin, end, processes, keysets) -
zkhan93 revised this gist
Aug 14, 2022 . 1 changed file with 162 additions and 86 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,5 +1,5 @@ """ This is a code in response to a problem explained below: There is a really big json file containing one root object with 2 huge arrays, the file cannot be loaded to memory but we need to rearrange the file to be able to load the file in spark. Any proper json parser will try to parse all the structure before letting us do anything with the object. @@ -12,55 +12,99 @@ however, the end position is not a hard limit, it will pass though the end limit if an object parsing is in progress, this behavior ensures that characters skipped by one process are captured by another process. """ from functools import wraps import mmap import time import multiprocessing import json def print_report(fn): @wraps(fn) def decorated(*args, **kwargs): res = fn(*args, **kwargs) print(ps) return res return decorated def track_time(fn): @wraps(fn) def decorated(*args, **kwargs): start_time = time.time() res = fn(*args, **kwargs) end_time = time.time() ps["time"] = end_time - start_time return res return decorated def track_mem(fn): import tracemalloc @wraps(fn) def decorated(*args, **kwargs): b2m = lambda cp: (cp[0] / (1024 * 1024), cp[1] / (1024 * 1024)) tracemalloc.start() first_size, first_peak = b2m(tracemalloc.get_traced_memory()) res = fn(*args, **kwargs) second_size, second_peak = b2m(tracemalloc.get_traced_memory()) ps["mem_peek"] = second_peak - first_peak ps["mem"] = second_size - first_size # stopping the library tracemalloc.stop() return res return decorated c2n = {"{": 123, "}": 125, "[": 91, "]": 93} track = { c2n["{"]: 0, c2n["["]: 0, } values = { c2n["{"]: 1, c2n["["]: 1, c2n["}"]: -1, c2n["]"]: -1, } normalize = { c2n["{"]: c2n["{"], c2n["["]: c2n["["], c2n["}"]: c2n["{"], c2n["]"]: c2n["["], } file_obj = None ps = { "pnum": -1, "start": -1, "end": -1, "objects": 0, } def get_out_file(): global file_obj if not file_obj: file_obj = open(f"out-{ps['pnum']}.json", "wb") return file_obj def save(obj): global ps ps["objects"] += 1 ps["last_object_len"] = len(obj) # print(ps) out = get_out_file() out.write(obj + b"\n") open_curly_braces = c2n["{"] def next_open_braces(mm, start, end): @@ -70,111 +114,143 @@ def next_open_braces(mm, start, end): return x @print_report # @track_mem @track_time def find_json_objects( process_number, filename, start, end, keys_to_test, ): """ # start might be in middle of a object, so the # objective is to reach to the start of next array element. # Idea is to skip characters till you find a start of object i.e '{', # read till the end of that object, parse it to json and check if it has keys from `keys_to_test` in it. # true means you have parsed an array element now continue parsing next array objects. # otherwise skip to next '{' character and repeat the process, until you find the element in array """ global ps ps = { "pnum": process_number, "start": start, "end": end, "objects": 0, } f = open(filename, "r") mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) i = next_open_braces(mm, start, end) found_first = False while not found_first: obj, obj_end, parse_time = parse_object(mm, i) if obj_end > end: if obj: save(obj) return # no object in defined start, end limit actual_keys = set(json.loads(obj).keys()) if set(keys_to_test) - actual_keys: pass # print("process: ", process_number, "not found", i, end, obj[:100]) else: save(obj) found_first = True i = next_open_braces(mm, obj_end, end) while i <= end: obj, obj_end, parse_time = parse_object(mm, i) if obj: i = obj_end ps["current"] = obj_end ps["last_obj_parse_time"] = parse_time save(obj) i = next_open_braces(mm, obj_end, end) return braces = { c2n["{"]: True, c2n["}"]: True, c2n["["]: True, c2n["{"]: True, c2n["}"]: True, c2n["["]: True, c2n["]"]: True, } def parse_object(mm, pos_start_curly): if mm[pos_start_curly] != open_curly_braces: next_content = mm[pos_start_curly : pos_start_curly + 100] raise Exception( f"parse_object {pos_start_curly} is not a curly brace but {next_content}" ) # print("parsing from", mm[pos_start_curly : pos_start_curly + 100]) start = time.time() tc = track.copy() i = pos_start_curly c = mm[i] # c is { tc[c] = 1 i = i + 1 while sum(tc.values()) != 0: c = mm[i] if c in braces: tc[normalize[c]] += values[c] i += 1 end = time.time() return mm[pos_start_curly:i], i, end - start def _get_intervals(start, end, segments): intervals = list(range(start, end + 1, int((end - start) / segments))) intervals[-1] = end intervals = list(zip(intervals[:-1], intervals[1:])) print(intervals) return intervals def start(filename, begin, end, processes, keys_to_test): intervals = _get_intervals(begin, end, processes) _procs = [] for begin, end in intervals: proc = multiprocessing.Process( target=find_json_objects, args=(len(_procs) + 1, filename, begin, end, keys_to_test), ) proc.start() _procs.append(proc) for proc in _procs: proc.join() if __name__ == "__main__": print("cores", multiprocessing.cpu_count()) # ["billing_code_type", "negotiation_arrangement"], # ["negotiation_arrangement","billing_code",] # ["name", "billing_code", "negotiated_rates"] keys_to_test = [ "negotiation_arrangement", "billing_code", ] # 250 MB # begin = 191 # end = 258326285 # processes = 3 # filename = "2022-08_290_37D0_in-network-rates.json" # 2GB 63 seconds # begin = 156 # end = 1132108879 # processes = 5 # filename = "2022-08_890_58D0_in-network-rates_10_of_16.json" # 11 GB begin = 9513206232 end = 12064520309 processes = 5 filename = "2022-08_040_05C0_in-network-rates_24_of_25.json" # 379.81s user 2.62s system 480% cpu 1:19.65 total start(filename, begin, end, processes, keys_to_test) -
zkhan93 revised this gist
Aug 12, 2022 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,5 +1,5 @@ """ This is in response to a problem explained below: There is a really big json file containing one root object with 2 huge arrays, the file cannot be loaded to memory but we need to rearrange the file to be able to load the file in spark. Any proper json parser will try to parse all the structure before letting us do anything with the object. -
zkhan93 revised this gist
Aug 12, 2022 . 1 changed file with 14 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,3 +1,17 @@ """ This is a code in response to a problem explained below: There is a really big json file containing one root object with 2 huge arrays, the file cannot be loaded to memory but we need to rearrange the file to be able to load the file in spark. Any proper json parser will try to parse all the structure before letting us do anything with the object. we tried some parsers including jq's streaming parser, it worked for small sample files(~300MB) but was taking way longer for large sample files (~1GB). This is a pseudo json parser that can extract a json object by matching the opening and closing curly braces, it also uses knowledge of objects(known and always existing keys in object) to confirm that the parsed object is top level array object. it divides the jobs to multiple processes, each process parses a fixed section of the file, its likely that the start of a process can be in middle of a json object, so the process will skip though some initial characters until it finds the object that we are looking for (my matching keys provided as input) however, the end position is not a hard limit, it will pass though the end limit if an object parsing is in progress, this behavior ensures that characters skipped by one process are captured by another process. """ import mmap import time import multiprocessing -
zkhan93 created this gist
Aug 12, 2022 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,166 @@ import mmap import time import multiprocessing import json import logging logging.basicConfig(level=logging.INFO) print("cores", multiprocessing.cpu_count()) track = { "{": 0, "[": 0, } values = { "{": 1, "[": 1, "}": -1, "]": -1, } normalize = { "{": "{", "[": "[", "}": "{", "]": "[", } objs = [] pnum = 0 file_obj = None def get_out_file(): global file_obj if not file_obj: file_obj = open(f"out-{pnum}.json", "w") return file_obj def save(obj): # print(".", end="") # objs.append(obj) out = get_out_file() out.write(obj + "\n") # ["billing_code_type", "negotiation_arrangement"], # ["negotiation_arrangement","billing_code",] open_curly_braces = ord("{") def next_open_braces(mm, start, end): x = start while x <= end and mm[x] != open_curly_braces: x += 1 return x def find_json_objects( process_number, filename, start, end, keys_to_test=["name", "billing_code", "negotiated_rates"], ): global pnum pnum = process_number f = open(filename, "r") mm = mmap.mmap(f.fileno(), 0) print("object parsing start for", start, end) # start might be in middle of a object, so the # objective is to reach to the start of next array element. # Idea is to skip characters till you find a start of object i.e '{', # read till the end of that object, parse it to json and check if it has keys from `keys_to_test` in it. # true means you have parsed an array element now continue parsing next array objects. # otherwise skip to next '{' character and repeat the process, until you find the element in array i = start # look for { found_first = False while not found_first: i = next_open_braces(mm, i, end) obj, obj_end = parse_object(mm, i) if obj_end > end: if obj: save(obj) return # no object in defined start, end limit actual_keys = set(json.loads(obj).keys()) if set(keys_to_test) - actual_keys: i = next_open_braces(mm, obj_end, end) # print("process: ", process_number, "not found", i, end, obj[:100]) else: save(obj) found_first = True while i <= end: i = next_open_braces(mm, obj_end, end) obj, obj_end = parse_object(mm, i) if obj: save(obj) return def parse_object(mm, pos_start_curley): if mm[pos_start_curley] != open_curly_braces: # print("parsing from", mm[pos_start_curley : pos_start_curley + 100]) return "", pos_start_curley tc = track.copy() i = pos_start_curley c = chr(mm[i]) # c is { tc[c] = 1 obj_content = [c] max_i = len(mm) i = i + 1 while i < max_i and sum(tc.values()) != 0: c = chr(mm[i]) if c in ["{", "}", "[", "]"]: tc[normalize[c]] += values[c] obj_content.append(c) i += 1 return "".join(obj_content), i def do(filename): start_time = time.time() procs = [] # for sample file1 # 191 start of first array object # 71678056, # start of in_network array # 258326285 # end of file # for sample file2 1.13 GB # start = 156 # start of in_network array # start = # start of provider_references array # end of file 1132108880 start = 156 end = 1132108879 processes = 5 intervals = list(range(start, end, int((end - start) / processes))) intervals[-1] = end intervals = list(zip(intervals[:-1], intervals[1:])) print(intervals) for start, end in intervals: proc = multiprocessing.Process( target=find_json_objects, args=( len(procs) + 1, filename, start, end, ), ) proc.start() procs.append(proc) for proc in procs: proc.join() end = time.time() print(end - start_time) return if __name__ == "__main__": do("2022-08_890_58D0_in-network-rates_10_of_16.json")