Skip to content

Instantly share code, notes, and snippets.

@zkhan93
Last active September 7, 2022 18:25
Show Gist options
  • Select an option

  • Save zkhan93/4683e35795a83c7f9d23b76d3f56c8de to your computer and use it in GitHub Desktop.

Select an option

Save zkhan93/4683e35795a83c7f9d23b76d3f56c8de to your computer and use it in GitHub Desktop.

Revisions

  1. zkhan93 revised this gist Sep 7, 2022. 1 changed file with 4 additions and 3 deletions.
    7 changes: 4 additions & 3 deletions json_array_splitter.py
    Original file line number Diff line number Diff line change
    @@ -254,7 +254,7 @@ def _get_intervals(start, end, segments):
    @click.option(
    "-o",
    "--out",
    type=click.Path(exists=True, writable=True, file_okay=False, resolve_path=True),
    type=click.Path(writable=True, file_okay=False, resolve_path=True),
    default=".",
    )
    def start(filename, start, end, concurrency, object_name, object_key, out):
    @@ -275,7 +275,8 @@ def start(filename, start, end, concurrency, object_name, object_key, out):
    if len(object_name) != len(object_key):
    click.echo("for each object-name you must provide object-key", err=True)
    return 1

    if not os.path.exists(out):
    os.makedirs(out)
    object_defs = dict(zip(object_name, object_key))
    if end == -1:
    with open(filename, "r") as f:
    @@ -295,4 +296,4 @@ def start(filename, start, end, concurrency, object_name, object_key, out):

    if __name__ == "__main__":

    start()
    start()
  2. zkhan93 revised this gist Sep 7, 2022. No changes.
  3. zkhan93 revised this gist Sep 7, 2022. 1 changed file with 3 additions and 4 deletions.
    7 changes: 3 additions & 4 deletions json_array_splitter.py
    Original file line number Diff line number Diff line change
    @@ -164,15 +164,13 @@ def find_json_objects(process_number, filename, start, end, out, defs):
    try:
    i, _ = next_open_braces(mm, obj_end, end, True)
    except ArrayEndException:
    print(ps["pnum"], "array ended for", obj_end, end)
    i, found = next_open_braces(mm, obj_end, end)
    if found:
    i, next_available = next_open_braces(mm, obj_end, end)
    if next_available:
    obj, obj_end, parse_time = parse_object(mm, i)
    obj_name = detect_object_name(obj)
    ps["current"] = obj_end
    ps["last_obj_parse_time"] = parse_time
    save(obj, obj_name)
    return


    def detect_object_name(obj):
    @@ -296,4 +294,5 @@ def start(filename, start, end, concurrency, object_name, object_key, out):


    if __name__ == "__main__":

    start()
  4. zkhan93 revised this gist Sep 7, 2022. 1 changed file with 17 additions and 11 deletions.
    28 changes: 17 additions & 11 deletions json_array_splitter.py
    Original file line number Diff line number Diff line change
    @@ -23,8 +23,6 @@
    c2n["}"]: c2n["{"],
    c2n["]"]: c2n["["],
    }
    file_obj = None
    output_dir = None
    ps = {
    "pnum": -1,
    "start": -1,
    @@ -38,6 +36,9 @@
    c2n["["]: True,
    c2n["]"]: True,
    }
    file_obj = None
    output_dir = None
    object_defs = None


    class ArrayEndException(Exception):
    @@ -124,7 +125,7 @@ def next_open_braces(mm, start, end, track_array_end=False):
    @print_report
    # @track_mem
    @track_time
    def find_json_objects(process_number, filename, start, end, object_defs, out):
    def find_json_objects(process_number, filename, start, end, out, defs):
    """
    # start might be in middle of a object, so the
    # objective is to reach to the start of next array element.
    @@ -133,20 +134,22 @@ def find_json_objects(process_number, filename, start, end, object_defs, out):
    # true means you have parsed an array element now continue parsing next array objects.
    # otherwise skip to next '{' character and repeat the process, until you find the element in array
    """
    global ps
    global output_dir
    global object_defs
    global ps
    output_dir = out
    object_defs = defs
    ps = {
    "pnum": str(process_number).zfill(3),
    "start": start,
    "end": end,
    "objects": 0,
    }

    output_dir = out
    f = open(filename, "r")
    mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)

    obj, obj_name, obj_end, parse_time = parse_first_object(mm, start, end, object_defs)
    obj, obj_name, obj_end, parse_time = parse_first_object(mm, start, end)
    ps["current"] = obj_end
    ps["last_obj_parse_time"] = parse_time
    save(obj, obj_name)
    @@ -165,14 +168,15 @@ def find_json_objects(process_number, filename, start, end, object_defs, out):
    i, found = next_open_braces(mm, obj_end, end)
    if found:
    obj, obj_end, parse_time = parse_object(mm, i)
    obj_name = detect_object_name(obj, object_defs)
    obj_name = detect_object_name(obj)
    ps["current"] = obj_end
    ps["last_obj_parse_time"] = parse_time
    save(obj, obj_name)
    return


    def detect_object_name(obj, object_defs):
    def detect_object_name(obj):
    global object_defs
    actual_keys = set(json.loads(obj).keys())
    names = [
    name for name, keyset in object_defs.items() if not (set(keyset) - actual_keys)
    @@ -181,7 +185,7 @@ def detect_object_name(obj, object_defs):
    return names[0]


    def parse_first_object(mm, start, end, object_defs):
    def parse_first_object(mm, start, end):
    obj_name = None
    obj = None
    obj_end = start # if loop is not executed, return safe position
    @@ -190,7 +194,7 @@ def parse_first_object(mm, start, end, object_defs):
    i, _ = next_open_braces(mm, obj_end, end)
    obj, obj_end, parse_time = parse_object(mm, i)
    if obj:
    obj_name = detect_object_name(obj, object_defs)
    obj_name = detect_object_name(obj)
    return obj, obj_name, obj_end, parse_time


    @@ -269,9 +273,11 @@ def start(filename, start, end, concurrency, object_name, object_key, out):
    its likely that the start of a process can be in middle of a json object, so the process will skip though some initial characters until it finds the object that we are looking for (my matching keys provided as input)
    however, the end position is not a hard limit, it will pass though the end limit if an object parsing is in progress, this behavior ensures that characters skipped by one process are captured by another process.
    """

    if len(object_name) != len(object_key):
    click.echo("for each object-name you must provide object-key", err=True)
    return 1

    object_defs = dict(zip(object_name, object_key))
    if end == -1:
    with open(filename, "r") as f:
    @@ -281,7 +287,7 @@ def start(filename, start, end, concurrency, object_name, object_key, out):
    for start, end in intervals:
    process = multiprocessing.Process(
    target=find_json_objects,
    args=(len(_processes) + 1, filename, start, end, object_defs, out),
    args=(len(_processes) + 1, filename, start, end, out, object_defs),
    )
    process.start()
    _processes.append(process)
  5. zkhan93 revised this gist Sep 7, 2022. 1 changed file with 4 additions and 2 deletions.
    6 changes: 4 additions & 2 deletions json_array_splitter.py
    Original file line number Diff line number Diff line change
    @@ -1,4 +1,6 @@
    map
    import click
    from functools import wraps
    import mmap
    import time
    import multiprocessing
    import json
    @@ -288,4 +290,4 @@ def start(filename, start, end, concurrency, object_name, object_key, out):


    if __name__ == "__main__":
    start()
    start()
  6. zkhan93 revised this gist Sep 7, 2022. 1 changed file with 35 additions and 43 deletions.
    78 changes: 35 additions & 43 deletions json_array_splitter.py
    Original file line number Diff line number Diff line change
    @@ -1,11 +1,42 @@
    import click
    from functools import wraps
    import mmap
    map
    import time
    import multiprocessing
    import json
    import os

    c2n = {"{": 123, "}": 125, "[": 91, "]": 93}
    track = {
    c2n["{"]: 0,
    c2n["["]: 0,
    }
    values = {
    c2n["{"]: 1,
    c2n["["]: 1,
    c2n["}"]: -1,
    c2n["]"]: -1,
    }
    normalize = {
    c2n["{"]: c2n["{"],
    c2n["["]: c2n["["],
    c2n["}"]: c2n["{"],
    c2n["]"]: c2n["["],
    }
    file_obj = None
    output_dir = None
    ps = {
    "pnum": -1,
    "start": -1,
    "end": -1,
    "objects": 0,
    }
    open_curly_braces = c2n["{"]
    braces = {
    c2n["{"]: True,
    c2n["}"]: True,
    c2n["["]: True,
    c2n["]"]: True,
    }


    class ArrayEndException(Exception):
    pass
    @@ -51,34 +82,6 @@ def decorated(*args, **kwargs):
    return decorated


    c2n = {"{": 123, "}": 125, "[": 91, "]": 93}

    track = {
    c2n["{"]: 0,
    c2n["["]: 0,
    }
    values = {
    c2n["{"]: 1,
    c2n["["]: 1,
    c2n["}"]: -1,
    c2n["]"]: -1,
    }
    normalize = {
    c2n["{"]: c2n["{"],
    c2n["["]: c2n["["],
    c2n["}"]: c2n["{"],
    c2n["]"]: c2n["["],
    }
    file_obj = None
    output_dir = None
    ps = {
    "pnum": -1,
    "start": -1,
    "end": -1,
    "objects": 0,
    }


    def get_out_file(suffix):
    global file_obj
    global output_dir
    @@ -102,9 +105,6 @@ def save(obj, name):
    out.write(obj + b"\n")


    open_curly_braces = c2n["{"]


    def next_open_braces(mm, start, end, track_array_end=False):
    x = start
    found = False
    @@ -192,14 +192,6 @@ def parse_first_object(mm, start, end, object_defs):
    return obj, obj_name, obj_end, parse_time


    braces = {
    c2n["{"]: True,
    c2n["}"]: True,
    c2n["["]: True,
    c2n["]"]: True,
    }


    def parse_object(mm, pos_start_curly):
    if mm[pos_start_curly] != open_curly_braces:
    next_content = mm[pos_start_curly : pos_start_curly + 100]
    @@ -296,4 +288,4 @@ def start(filename, start, end, concurrency, object_name, object_key, out):


    if __name__ == "__main__":
    start()
    start()
  7. zkhan93 revised this gist Sep 7, 2022. 1 changed file with 139 additions and 102 deletions.
    241 changes: 139 additions & 102 deletions json_array_splitter.py
    Original file line number Diff line number Diff line change
    @@ -1,30 +1,21 @@
    """
    This is a code in response to a problem explained below:
    There is a really big json file containing one root object with 2 huge arrays,
    the file cannot be loaded to memory but we need to rearrange the file to be able to load the file in spark.
    Any proper json parser will try to parse all the structure before letting us do anything with the object.
    we tried some parsers including jq's streaming parser, it worked for small sample files(~300MB) but was taking way longer for large sample files (~1GB).
    This is a pseudo json parser that can extract a json object by matching the opening and closing curly braces, it also uses knowledge of objects(known and always existing keys in object)
    to confirm that the parsed object is top level array object.
    it divides the jobs to multiple processes, each process parses a fixed section of the file,
    its likely that the start of a process can be in middle of a json object, so the process will skip though some initial characters until it finds the object that we are looking for (my matching keys provided as input)
    however, the end position is not a hard limit, it will pass though the end limit if an object parsing is in progress, this behavior ensures that characters skipped by one process are captured by another process.
    """


    import click
    from functools import wraps
    import mmap
    import time
    import multiprocessing
    import json
    import os


    class ArrayEndException(Exception):
    pass


    def print_report(fn):
    @wraps(fn)
    def decorated(*args, **kwargs):
    res = fn(*args, **kwargs)
    print(ps)
    click.echo(ps)
    return res

    return decorated
    @@ -54,7 +45,6 @@ def decorated(*args, **kwargs):
    second_size, second_peak = b2m(tracemalloc.get_traced_memory())
    ps["mem_peek"] = second_peak - first_peak
    ps["mem"] = second_size - first_size
    # stopping the library
    tracemalloc.stop()
    return res

    @@ -80,6 +70,7 @@ def decorated(*args, **kwargs):
    c2n["]"]: c2n["["],
    }
    file_obj = None
    output_dir = None
    ps = {
    "pnum": -1,
    "start": -1,
    @@ -88,89 +79,119 @@ def decorated(*args, **kwargs):
    }


    def get_out_file():
    def get_out_file(suffix):
    global file_obj
    if not file_obj:
    file_obj = open(f"out-{ps['pnum']}.json", "wb")
    global output_dir
    name = os.path.join(output_dir, f"{ps['pnum']}-{suffix}.json")
    mode = "wb"
    if file_obj:
    if file_obj.name != name:
    file_obj.close()
    file_obj = open(name, mode)
    else:
    file_obj = open(name, mode)
    return file_obj


    def save(obj):
    def save(obj, name):
    global ps
    ps["objects"] += 1
    ps["last_object_len"] = len(obj)
    # print(ps)
    out = get_out_file()
    out = get_out_file(name)
    out.write(obj + b"\n")


    open_curly_braces = c2n["{"]


    def next_open_braces(mm, start, end):
    def next_open_braces(mm, start, end, track_array_end=False):
    x = start
    while x <= end and mm[x] != open_curly_braces:
    x += 1
    return x
    found = False
    while x <= end:
    if track_array_end and mm[x] == c2n["]"]:
    raise ArrayEndException()
    if mm[x] != open_curly_braces:
    x += 1
    else:
    found = True
    break
    return x, found


    @print_report
    # @track_mem
    @track_time
    def find_json_objects(
    process_number,
    filename,
    start,
    end,
    keysets_to_test,
    ):
    def find_json_objects(process_number, filename, start, end, object_defs, out):
    """
    # start might be in middle of a object, so the
    # objective is to reach to the start of next array element.
    # Idea is to skip characters till you find a start of object i.e '{',
    # read till the end of that object, parse it to json and check if it has keys from `keysets_to_test` in it.
    # read till the end of that object, parse it to json and check if it has keys from `object_defs` in it.
    # true means you have parsed an array element now continue parsing next array objects.
    # otherwise skip to next '{' character and repeat the process, until you find the element in array
    """
    global ps
    global output_dir
    ps = {
    "pnum": process_number,
    "pnum": str(process_number).zfill(3),
    "start": start,
    "end": end,
    "objects": 0,
    }

    output_dir = out
    f = open(filename, "r")
    mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
    i = next_open_braces(mm, start, end)
    found_first = False
    while not found_first:
    obj, obj_end, parse_time = parse_object(mm, i)
    if obj_end > end:
    if obj:
    save(obj)
    return # no object in defined start, end limit
    actual_keys = set(json.loads(obj).keys())
    object_matched = any(
    not (set(keyset) - actual_keys) for keyset in keysets_to_test
    )
    if not object_matched:
    pass
    # print("process: ", process_number, "not found", i, end, obj[:100])
    else:
    save(obj)
    found_first = True
    i = next_open_braces(mm, obj_end, end)

    obj, obj_name, obj_end, parse_time = parse_first_object(mm, start, end, object_defs)
    ps["current"] = obj_end
    ps["last_obj_parse_time"] = parse_time
    save(obj, obj_name)
    i, _ = next_open_braces(mm, obj_end, end)
    while i <= end:
    obj, obj_end, parse_time = parse_object(mm, i)
    if obj:
    i = obj_end
    ps["current"] = obj_end
    ps["last_obj_parse_time"] = parse_time
    save(obj)
    i = next_open_braces(mm, obj_end, end)
    save(obj, obj_name)
    try:
    i, _ = next_open_braces(mm, obj_end, end, True)
    except ArrayEndException:
    print(ps["pnum"], "array ended for", obj_end, end)
    i, found = next_open_braces(mm, obj_end, end)
    if found:
    obj, obj_end, parse_time = parse_object(mm, i)
    obj_name = detect_object_name(obj, object_defs)
    ps["current"] = obj_end
    ps["last_obj_parse_time"] = parse_time
    save(obj, obj_name)
    return


    def detect_object_name(obj, object_defs):
    actual_keys = set(json.loads(obj).keys())
    names = [
    name for name, keyset in object_defs.items() if not (set(keyset) - actual_keys)
    ]
    if names:
    return names[0]


    def parse_first_object(mm, start, end, object_defs):
    obj_name = None
    obj = None
    obj_end = start # if loop is not executed, return safe position
    parse_time = None
    while not obj_name and obj_end < end:
    i, _ = next_open_braces(mm, obj_end, end)
    obj, obj_end, parse_time = parse_object(mm, i)
    if obj:
    obj_name = detect_object_name(obj, object_defs)
    return obj, obj_name, obj_end, parse_time


    braces = {
    c2n["{"]: True,
    c2n["}"]: True,
    @@ -206,57 +227,73 @@ def _get_intervals(start, end, segments):
    intervals = list(range(start, end + 1, int((end - start) / segments)))
    intervals[-1] = end
    intervals = list(zip(intervals[:-1], intervals[1:]))
    print(intervals)
    click.echo(intervals)
    return intervals


    def start(filename, begin, end, processes, keysets_to_test):

    intervals = _get_intervals(begin, end, processes)
    _procs = []
    for begin, end in intervals:
    proc = multiprocessing.Process(
    @click.command()
    @click.option("-f", "--filename", type=click.Path(exists=True))
    @click.option("-s", "--start", type=int, default=1)
    @click.option(
    "-e",
    "--end",
    type=int,
    default=-1,
    help="last character position we are interested in, defaults to end of file",
    )
    @click.option(
    "-c", "--concurrency", type=int, default=1, help="number of processes to use"
    )
    @click.option(
    "--object-name",
    type=str,
    multiple=True,
    )
    @click.option(
    "--object-key",
    type=click.UNPROCESSED,
    callback=lambda _, __, keys: tuple([tuple(key.split(",")) for key in keys]),
    multiple=True,
    )
    @click.option(
    "-o",
    "--out",
    type=click.Path(exists=True, writable=True, file_okay=False, resolve_path=True),
    default=".",
    )
    def start(filename, start, end, concurrency, object_name, object_key, out):
    """
    This is a code in response to a problem explained below:
    There is a really big json file containing one root object with 2 huge arrays,
    the file cannot be loaded to memory but we need to rearrange the file to be able to load the file in spark.
    Any proper json parser will try to parse all the structure before letting us do anything with the object.
    we tried some parsers including jq's streaming parser, it worked for small sample files(~300MB) but was taking way longer for large sample files (~1GB).
    This is a pseudo json parser that can extract a json object by matching the opening and closing curly braces, it also uses knowledge of objects(known and always existing keys in object)
    to confirm that the parsed object is indeed the expected array object.
    it divides the jobs to multiple processes, each process parses a fixed section of the file,
    its likely that the start of a process can be in middle of a json object, so the process will skip though some initial characters until it finds the object that we are looking for (my matching keys provided as input)
    however, the end position is not a hard limit, it will pass though the end limit if an object parsing is in progress, this behavior ensures that characters skipped by one process are captured by another process.
    """
    if len(object_name) != len(object_key):
    click.echo("for each object-name you must provide object-key", err=True)
    return 1
    object_defs = dict(zip(object_name, object_key))
    if end == -1:
    with open(filename, "r") as f:
    end = f.seek(0, os.SEEK_END) - 1
    intervals = _get_intervals(start, end, concurrency)
    _processes = []
    for start, end in intervals:
    process = multiprocessing.Process(
    target=find_json_objects,
    args=(len(_procs) + 1, filename, begin, end, keysets_to_test),
    args=(len(_processes) + 1, filename, start, end, object_defs, out),
    )
    proc.start()
    _procs.append(proc)
    for proc in _procs:
    proc.join()
    process.start()
    _processes.append(process)
    for process in _processes:
    process.join()


    if __name__ == "__main__":
    print("cores", multiprocessing.cpu_count())
    # ["billing_code_type", "negotiation_arrangement"],
    # ["negotiation_arrangement","billing_code",]
    # ["name", "billing_code", "negotiated_rates"]
    keysets = [
    [
    "negotiation_arrangement",
    "billing_code",
    ],
    ["provider_group_id", "provider_groups"],
    ]

    # 250 MB
    # begin = 191
    # end = 258326285
    # processes = 3
    # filename = "2022-08_290_37D0_in-network-rates.json"

    # 2GB 63 seconds
    # begin = 156
    # end = 1132108879
    # processes = 5
    # filename = "2022-08_890_58D0_in-network-rates_10_of_16.json"

    # 11 GB
    begin = 3
    end = 12064520309
    processes = 10
    filename = "2022-08_040_05C0_in-network-rates_24_of_25.json"
    # 1:19.65 total - in_network only (skipped 78% data here to reach the in_network) - 5 processes
    # 6:29.97 total - both the arrays 5 processes
    # 3:59.33 total - both arrays 10 processes

    start(filename, begin, end, processes, keysets)
    start()
  8. zkhan93 revised this gist Aug 15, 2022. No changes.
  9. zkhan93 revised this gist Aug 14, 2022. 1 changed file with 22 additions and 16 deletions.
    38 changes: 22 additions & 16 deletions json_array_splitter.py
    Original file line number Diff line number Diff line change
    @@ -122,13 +122,13 @@ def find_json_objects(
    filename,
    start,
    end,
    keys_to_test,
    keysets_to_test,
    ):
    """
    # start might be in middle of a object, so the
    # objective is to reach to the start of next array element.
    # Idea is to skip characters till you find a start of object i.e '{',
    # read till the end of that object, parse it to json and check if it has keys from `keys_to_test` in it.
    # read till the end of that object, parse it to json and check if it has keys from `keysets_to_test` in it.
    # true means you have parsed an array element now continue parsing next array objects.
    # otherwise skip to next '{' character and repeat the process, until you find the element in array
    """
    @@ -150,7 +150,10 @@ def find_json_objects(
    save(obj)
    return # no object in defined start, end limit
    actual_keys = set(json.loads(obj).keys())
    if set(keys_to_test) - actual_keys:
    object_matched = any(
    not (set(keyset) - actual_keys) for keyset in keysets_to_test
    )
    if not object_matched:
    pass
    # print("process: ", process_number, "not found", i, end, obj[:100])
    else:
    @@ -169,9 +172,6 @@ def find_json_objects(


    braces = {
    c2n["{"]: True,
    c2n["}"]: True,
    c2n["["]: True,
    c2n["{"]: True,
    c2n["}"]: True,
    c2n["["]: True,
    @@ -195,7 +195,8 @@ def parse_object(mm, pos_start_curly):
    while sum(tc.values()) != 0:
    c = mm[i]
    if c in braces:
    tc[normalize[c]] += values[c]
    val = values[c]
    tc[normalize[c]] += val
    i += 1
    end = time.time()
    return mm[pos_start_curly:i], i, end - start
    @@ -209,14 +210,14 @@ def _get_intervals(start, end, segments):
    return intervals


    def start(filename, begin, end, processes, keys_to_test):
    def start(filename, begin, end, processes, keysets_to_test):

    intervals = _get_intervals(begin, end, processes)
    _procs = []
    for begin, end in intervals:
    proc = multiprocessing.Process(
    target=find_json_objects,
    args=(len(_procs) + 1, filename, begin, end, keys_to_test),
    args=(len(_procs) + 1, filename, begin, end, keysets_to_test),
    )
    proc.start()
    _procs.append(proc)
    @@ -229,9 +230,12 @@ def start(filename, begin, end, processes, keys_to_test):
    # ["billing_code_type", "negotiation_arrangement"],
    # ["negotiation_arrangement","billing_code",]
    # ["name", "billing_code", "negotiated_rates"]
    keys_to_test = [
    "negotiation_arrangement",
    "billing_code",
    keysets = [
    [
    "negotiation_arrangement",
    "billing_code",
    ],
    ["provider_group_id", "provider_groups"],
    ]

    # 250 MB
    @@ -247,10 +251,12 @@ def start(filename, begin, end, processes, keys_to_test):
    # filename = "2022-08_890_58D0_in-network-rates_10_of_16.json"

    # 11 GB
    begin = 9513206232
    begin = 3
    end = 12064520309
    processes = 5
    processes = 10
    filename = "2022-08_040_05C0_in-network-rates_24_of_25.json"
    # 379.81s user 2.62s system 480% cpu 1:19.65 total
    # 1:19.65 total - in_network only (skipped 78% data here to reach the in_network) - 5 processes
    # 6:29.97 total - both the arrays 5 processes
    # 3:59.33 total - both arrays 10 processes

    start(filename, begin, end, processes, keys_to_test)
    start(filename, begin, end, processes, keysets)
  10. zkhan93 revised this gist Aug 14, 2022. 1 changed file with 162 additions and 86 deletions.
    248 changes: 162 additions & 86 deletions json_array_splitter.py
    Original file line number Diff line number Diff line change
    @@ -1,5 +1,5 @@
    """
    This is in response to a problem explained below:
    This is a code in response to a problem explained below:
    There is a really big json file containing one root object with 2 huge arrays,
    the file cannot be loaded to memory but we need to rearrange the file to be able to load the file in spark.
    Any proper json parser will try to parse all the structure before letting us do anything with the object.
    @@ -12,55 +12,99 @@
    however, the end position is not a hard limit, it will pass though the end limit if an object parsing is in progress, this behavior ensures that characters skipped by one process are captured by another process.
    """


    from functools import wraps
    import mmap
    import time
    import multiprocessing
    import json
    import logging

    logging.basicConfig(level=logging.INFO)

    print("cores", multiprocessing.cpu_count())
    def print_report(fn):
    @wraps(fn)
    def decorated(*args, **kwargs):
    res = fn(*args, **kwargs)
    print(ps)
    return res

    return decorated


    def track_time(fn):
    @wraps(fn)
    def decorated(*args, **kwargs):
    start_time = time.time()
    res = fn(*args, **kwargs)
    end_time = time.time()
    ps["time"] = end_time - start_time
    return res

    return decorated


    def track_mem(fn):
    import tracemalloc

    @wraps(fn)
    def decorated(*args, **kwargs):
    b2m = lambda cp: (cp[0] / (1024 * 1024), cp[1] / (1024 * 1024))
    tracemalloc.start()
    first_size, first_peak = b2m(tracemalloc.get_traced_memory())
    res = fn(*args, **kwargs)
    second_size, second_peak = b2m(tracemalloc.get_traced_memory())
    ps["mem_peek"] = second_peak - first_peak
    ps["mem"] = second_size - first_size
    # stopping the library
    tracemalloc.stop()
    return res

    return decorated


    c2n = {"{": 123, "}": 125, "[": 91, "]": 93}

    track = {
    "{": 0,
    "[": 0,
    c2n["{"]: 0,
    c2n["["]: 0,
    }
    values = {
    "{": 1,
    "[": 1,
    "}": -1,
    "]": -1,
    c2n["{"]: 1,
    c2n["["]: 1,
    c2n["}"]: -1,
    c2n["]"]: -1,
    }
    normalize = {
    "{": "{",
    "[": "[",
    "}": "{",
    "]": "[",
    c2n["{"]: c2n["{"],
    c2n["["]: c2n["["],
    c2n["}"]: c2n["{"],
    c2n["]"]: c2n["["],
    }


    objs = []
    pnum = 0
    file_obj = None
    ps = {
    "pnum": -1,
    "start": -1,
    "end": -1,
    "objects": 0,
    }


    def get_out_file():
    global file_obj
    if not file_obj:
    file_obj = open(f"out-{pnum}.json", "w")
    file_obj = open(f"out-{ps['pnum']}.json", "wb")
    return file_obj


    def save(obj):
    # print(".", end="")
    # objs.append(obj)
    global ps
    ps["objects"] += 1
    ps["last_object_len"] = len(obj)
    # print(ps)
    out = get_out_file()
    out.write(obj + "\n")
    out.write(obj + b"\n")


    # ["billing_code_type", "negotiation_arrangement"],
    # ["negotiation_arrangement","billing_code",]
    open_curly_braces = ord("{")
    open_curly_braces = c2n["{"]


    def next_open_braces(mm, start, end):
    @@ -70,111 +114,143 @@ def next_open_braces(mm, start, end):
    return x


    @print_report
    # @track_mem
    @track_time
    def find_json_objects(
    process_number,
    filename,
    start,
    end,
    keys_to_test=["name", "billing_code", "negotiated_rates"],
    keys_to_test,
    ):
    global pnum
    pnum = process_number
    f = open(filename, "r")
    mm = mmap.mmap(f.fileno(), 0)
    print("object parsing start for", start, end)
    """
    # start might be in middle of a object, so the
    # objective is to reach to the start of next array element.
    # Idea is to skip characters till you find a start of object i.e '{',
    # read till the end of that object, parse it to json and check if it has keys from `keys_to_test` in it.
    # true means you have parsed an array element now continue parsing next array objects.
    # otherwise skip to next '{' character and repeat the process, until you find the element in array
    i = start
    # look for {

    """
    global ps
    ps = {
    "pnum": process_number,
    "start": start,
    "end": end,
    "objects": 0,
    }
    f = open(filename, "r")
    mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
    i = next_open_braces(mm, start, end)
    found_first = False

    while not found_first:
    i = next_open_braces(mm, i, end)
    obj, obj_end = parse_object(mm, i)
    obj, obj_end, parse_time = parse_object(mm, i)
    if obj_end > end:
    if obj:
    save(obj)
    return # no object in defined start, end limit
    actual_keys = set(json.loads(obj).keys())
    if set(keys_to_test) - actual_keys:
    i = next_open_braces(mm, obj_end, end)
    pass
    # print("process: ", process_number, "not found", i, end, obj[:100])
    else:
    save(obj)
    found_first = True
    while i <= end:
    i = next_open_braces(mm, obj_end, end)
    obj, obj_end = parse_object(mm, i)
    while i <= end:
    obj, obj_end, parse_time = parse_object(mm, i)
    if obj:
    i = obj_end
    ps["current"] = obj_end
    ps["last_obj_parse_time"] = parse_time
    save(obj)
    i = next_open_braces(mm, obj_end, end)
    return


    def parse_object(mm, pos_start_curley):
    if mm[pos_start_curley] != open_curly_braces:
    # print("parsing from", mm[pos_start_curley : pos_start_curley + 100])
    return "", pos_start_curley
    braces = {
    c2n["{"]: True,
    c2n["}"]: True,
    c2n["["]: True,
    c2n["{"]: True,
    c2n["}"]: True,
    c2n["["]: True,
    c2n["]"]: True,
    }


    def parse_object(mm, pos_start_curly):
    if mm[pos_start_curly] != open_curly_braces:
    next_content = mm[pos_start_curly : pos_start_curly + 100]
    raise Exception(
    f"parse_object {pos_start_curly} is not a curly brace but {next_content}"
    )
    # print("parsing from", mm[pos_start_curly : pos_start_curly + 100])
    start = time.time()
    tc = track.copy()
    i = pos_start_curley
    c = chr(mm[i]) # c is {
    i = pos_start_curly
    c = mm[i] # c is {
    tc[c] = 1
    obj_content = [c]
    max_i = len(mm)
    i = i + 1
    while i < max_i and sum(tc.values()) != 0:
    c = chr(mm[i])
    if c in ["{", "}", "[", "]"]:
    while sum(tc.values()) != 0:
    c = mm[i]
    if c in braces:
    tc[normalize[c]] += values[c]
    obj_content.append(c)
    i += 1
    return "".join(obj_content), i


    def do(filename):
    start_time = time.time()
    procs = []
    # for sample file1
    # 191 start of first array object
    # 71678056, # start of in_network array
    # 258326285 # end of file

    # for sample file2 1.13 GB
    # start = 156 # start of in_network array
    # start = # start of provider_references array
    # end of file 1132108880
    start = 156
    end = 1132108879
    processes = 5
    intervals = list(range(start, end, int((end - start) / processes)))
    end = time.time()
    return mm[pos_start_curly:i], i, end - start


    def _get_intervals(start, end, segments):
    intervals = list(range(start, end + 1, int((end - start) / segments)))
    intervals[-1] = end
    intervals = list(zip(intervals[:-1], intervals[1:]))
    print(intervals)
    for start, end in intervals:
    return intervals


    def start(filename, begin, end, processes, keys_to_test):

    intervals = _get_intervals(begin, end, processes)
    _procs = []
    for begin, end in intervals:
    proc = multiprocessing.Process(
    target=find_json_objects,
    args=(
    len(procs) + 1,
    filename,
    start,
    end,
    ),
    args=(len(_procs) + 1, filename, begin, end, keys_to_test),
    )
    proc.start()
    procs.append(proc)

    for proc in procs:
    _procs.append(proc)
    for proc in _procs:
    proc.join()

    end = time.time()
    print(end - start_time)
    return


    if __name__ == "__main__":
    do("2022-08_890_58D0_in-network-rates_10_of_16.json")
    print("cores", multiprocessing.cpu_count())
    # ["billing_code_type", "negotiation_arrangement"],
    # ["negotiation_arrangement","billing_code",]
    # ["name", "billing_code", "negotiated_rates"]
    keys_to_test = [
    "negotiation_arrangement",
    "billing_code",
    ]

    # 250 MB
    # begin = 191
    # end = 258326285
    # processes = 3
    # filename = "2022-08_290_37D0_in-network-rates.json"

    # 2GB 63 seconds
    # begin = 156
    # end = 1132108879
    # processes = 5
    # filename = "2022-08_890_58D0_in-network-rates_10_of_16.json"

    # 11 GB
    begin = 9513206232
    end = 12064520309
    processes = 5
    filename = "2022-08_040_05C0_in-network-rates_24_of_25.json"
    # 379.81s user 2.62s system 480% cpu 1:19.65 total

    start(filename, begin, end, processes, keys_to_test)
  11. zkhan93 revised this gist Aug 12, 2022. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion json_array_splitter.py
    Original file line number Diff line number Diff line change
    @@ -1,5 +1,5 @@
    """
    This is a code in response to a problem explained below:
    This is in response to a problem explained below:
    There is a really big json file containing one root object with 2 huge arrays,
    the file cannot be loaded to memory but we need to rearrange the file to be able to load the file in spark.
    Any proper json parser will try to parse all the structure before letting us do anything with the object.
  12. zkhan93 revised this gist Aug 12, 2022. 1 changed file with 14 additions and 0 deletions.
    14 changes: 14 additions & 0 deletions json_array_splitter.py
    Original file line number Diff line number Diff line change
    @@ -1,3 +1,17 @@
    """
    This is a code in response to a problem explained below:
    There is a really big json file containing one root object with 2 huge arrays,
    the file cannot be loaded to memory but we need to rearrange the file to be able to load the file in spark.
    Any proper json parser will try to parse all the structure before letting us do anything with the object.
    we tried some parsers including jq's streaming parser, it worked for small sample files(~300MB) but was taking way longer for large sample files (~1GB).
    This is a pseudo json parser that can extract a json object by matching the opening and closing curly braces, it also uses knowledge of objects(known and always existing keys in object)
    to confirm that the parsed object is top level array object.
    it divides the jobs to multiple processes, each process parses a fixed section of the file,
    its likely that the start of a process can be in middle of a json object, so the process will skip though some initial characters until it finds the object that we are looking for (my matching keys provided as input)
    however, the end position is not a hard limit, it will pass though the end limit if an object parsing is in progress, this behavior ensures that characters skipped by one process are captured by another process.
    """

    import mmap
    import time
    import multiprocessing
  13. zkhan93 created this gist Aug 12, 2022.
    166 changes: 166 additions & 0 deletions json_array_splitter.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,166 @@
    import mmap
    import time
    import multiprocessing
    import json
    import logging

    logging.basicConfig(level=logging.INFO)

    print("cores", multiprocessing.cpu_count())
    track = {
    "{": 0,
    "[": 0,
    }
    values = {
    "{": 1,
    "[": 1,
    "}": -1,
    "]": -1,
    }
    normalize = {
    "{": "{",
    "[": "[",
    "}": "{",
    "]": "[",
    }


    objs = []
    pnum = 0
    file_obj = None


    def get_out_file():
    global file_obj
    if not file_obj:
    file_obj = open(f"out-{pnum}.json", "w")
    return file_obj


    def save(obj):
    # print(".", end="")
    # objs.append(obj)
    out = get_out_file()
    out.write(obj + "\n")


    # ["billing_code_type", "negotiation_arrangement"],
    # ["negotiation_arrangement","billing_code",]
    open_curly_braces = ord("{")


    def next_open_braces(mm, start, end):
    x = start
    while x <= end and mm[x] != open_curly_braces:
    x += 1
    return x


    def find_json_objects(
    process_number,
    filename,
    start,
    end,
    keys_to_test=["name", "billing_code", "negotiated_rates"],
    ):
    global pnum
    pnum = process_number
    f = open(filename, "r")
    mm = mmap.mmap(f.fileno(), 0)
    print("object parsing start for", start, end)
    # start might be in middle of a object, so the
    # objective is to reach to the start of next array element.
    # Idea is to skip characters till you find a start of object i.e '{',
    # read till the end of that object, parse it to json and check if it has keys from `keys_to_test` in it.
    # true means you have parsed an array element now continue parsing next array objects.
    # otherwise skip to next '{' character and repeat the process, until you find the element in array
    i = start
    # look for {

    found_first = False

    while not found_first:
    i = next_open_braces(mm, i, end)
    obj, obj_end = parse_object(mm, i)
    if obj_end > end:
    if obj:
    save(obj)
    return # no object in defined start, end limit
    actual_keys = set(json.loads(obj).keys())
    if set(keys_to_test) - actual_keys:
    i = next_open_braces(mm, obj_end, end)
    # print("process: ", process_number, "not found", i, end, obj[:100])
    else:
    save(obj)
    found_first = True
    while i <= end:
    i = next_open_braces(mm, obj_end, end)
    obj, obj_end = parse_object(mm, i)
    if obj:
    save(obj)
    return


    def parse_object(mm, pos_start_curley):
    if mm[pos_start_curley] != open_curly_braces:
    # print("parsing from", mm[pos_start_curley : pos_start_curley + 100])
    return "", pos_start_curley

    tc = track.copy()
    i = pos_start_curley
    c = chr(mm[i]) # c is {
    tc[c] = 1
    obj_content = [c]
    max_i = len(mm)
    i = i + 1
    while i < max_i and sum(tc.values()) != 0:
    c = chr(mm[i])
    if c in ["{", "}", "[", "]"]:
    tc[normalize[c]] += values[c]
    obj_content.append(c)
    i += 1
    return "".join(obj_content), i


    def do(filename):
    start_time = time.time()
    procs = []
    # for sample file1
    # 191 start of first array object
    # 71678056, # start of in_network array
    # 258326285 # end of file

    # for sample file2 1.13 GB
    # start = 156 # start of in_network array
    # start = # start of provider_references array
    # end of file 1132108880
    start = 156
    end = 1132108879
    processes = 5
    intervals = list(range(start, end, int((end - start) / processes)))
    intervals[-1] = end
    intervals = list(zip(intervals[:-1], intervals[1:]))
    print(intervals)
    for start, end in intervals:
    proc = multiprocessing.Process(
    target=find_json_objects,
    args=(
    len(procs) + 1,
    filename,
    start,
    end,
    ),
    )
    proc.start()
    procs.append(proc)

    for proc in procs:
    proc.join()

    end = time.time()
    print(end - start_time)
    return


    if __name__ == "__main__":
    do("2022-08_890_58D0_in-network-rates_10_of_16.json")