Skip to content

Instantly share code, notes, and snippets.

@zkhan93
Last active September 7, 2022 18:25
Show Gist options
  • Select an option

  • Save zkhan93/4683e35795a83c7f9d23b76d3f56c8de to your computer and use it in GitHub Desktop.

Select an option

Save zkhan93/4683e35795a83c7f9d23b76d3f56c8de to your computer and use it in GitHub Desktop.
huge JSON array objects splitter
import click
from functools import wraps
import mmap
import time
import multiprocessing
import json
import os
c2n = {"{": 123, "}": 125, "[": 91, "]": 93}
track = {
c2n["{"]: 0,
c2n["["]: 0,
}
values = {
c2n["{"]: 1,
c2n["["]: 1,
c2n["}"]: -1,
c2n["]"]: -1,
}
normalize = {
c2n["{"]: c2n["{"],
c2n["["]: c2n["["],
c2n["}"]: c2n["{"],
c2n["]"]: c2n["["],
}
ps = {
"pnum": -1,
"start": -1,
"end": -1,
"objects": 0,
}
open_curly_braces = c2n["{"]
braces = {
c2n["{"]: True,
c2n["}"]: True,
c2n["["]: True,
c2n["]"]: True,
}
file_obj = None
output_dir = None
object_defs = None
class ArrayEndException(Exception):
pass
def print_report(fn):
@wraps(fn)
def decorated(*args, **kwargs):
res = fn(*args, **kwargs)
click.echo(ps)
return res
return decorated
def track_time(fn):
@wraps(fn)
def decorated(*args, **kwargs):
start_time = time.time()
res = fn(*args, **kwargs)
end_time = time.time()
ps["time"] = end_time - start_time
return res
return decorated
def track_mem(fn):
import tracemalloc
@wraps(fn)
def decorated(*args, **kwargs):
b2m = lambda cp: (cp[0] / (1024 * 1024), cp[1] / (1024 * 1024))
tracemalloc.start()
first_size, first_peak = b2m(tracemalloc.get_traced_memory())
res = fn(*args, **kwargs)
second_size, second_peak = b2m(tracemalloc.get_traced_memory())
ps["mem_peek"] = second_peak - first_peak
ps["mem"] = second_size - first_size
tracemalloc.stop()
return res
return decorated
def get_out_file(suffix):
global file_obj
global output_dir
name = os.path.join(output_dir, f"{ps['pnum']}-{suffix}.json")
mode = "wb"
if file_obj:
if file_obj.name != name:
file_obj.close()
file_obj = open(name, mode)
else:
file_obj = open(name, mode)
return file_obj
def save(obj, name):
global ps
ps["objects"] += 1
ps["last_object_len"] = len(obj)
# print(ps)
out = get_out_file(name)
out.write(obj + b"\n")
def next_open_braces(mm, start, end, track_array_end=False):
x = start
found = False
while x <= end:
if track_array_end and mm[x] == c2n["]"]:
raise ArrayEndException()
if mm[x] != open_curly_braces:
x += 1
else:
found = True
break
return x, found
@print_report
# @track_mem
@track_time
def find_json_objects(process_number, filename, start, end, out, defs):
"""
# start might be in middle of a object, so the
# objective is to reach to the start of next array element.
# Idea is to skip characters till you find a start of object i.e '{',
# read till the end of that object, parse it to json and check if it has keys from `object_defs` in it.
# true means you have parsed an array element now continue parsing next array objects.
# otherwise skip to next '{' character and repeat the process, until you find the element in array
"""
global output_dir
global object_defs
global ps
output_dir = out
object_defs = defs
ps = {
"pnum": str(process_number).zfill(3),
"start": start,
"end": end,
"objects": 0,
}
f = open(filename, "r")
mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
obj, obj_name, obj_end, parse_time = parse_first_object(mm, start, end)
ps["current"] = obj_end
ps["last_obj_parse_time"] = parse_time
save(obj, obj_name)
i, _ = next_open_braces(mm, obj_end, end)
while i <= end:
obj, obj_end, parse_time = parse_object(mm, i)
if obj:
i = obj_end
ps["current"] = obj_end
ps["last_obj_parse_time"] = parse_time
save(obj, obj_name)
try:
i, _ = next_open_braces(mm, obj_end, end, True)
except ArrayEndException:
i, next_available = next_open_braces(mm, obj_end, end)
if next_available:
obj, obj_end, parse_time = parse_object(mm, i)
obj_name = detect_object_name(obj)
ps["current"] = obj_end
ps["last_obj_parse_time"] = parse_time
save(obj, obj_name)
def detect_object_name(obj):
global object_defs
actual_keys = set(json.loads(obj).keys())
names = [
name for name, keyset in object_defs.items() if not (set(keyset) - actual_keys)
]
if names:
return names[0]
def parse_first_object(mm, start, end):
obj_name = None
obj = None
obj_end = start # if loop is not executed, return safe position
parse_time = None
while not obj_name and obj_end < end:
i, _ = next_open_braces(mm, obj_end, end)
obj, obj_end, parse_time = parse_object(mm, i)
if obj:
obj_name = detect_object_name(obj)
return obj, obj_name, obj_end, parse_time
def parse_object(mm, pos_start_curly):
if mm[pos_start_curly] != open_curly_braces:
next_content = mm[pos_start_curly : pos_start_curly + 100]
raise Exception(
f"parse_object {pos_start_curly} is not a curly brace but {next_content}"
)
# print("parsing from", mm[pos_start_curly : pos_start_curly + 100])
start = time.time()
tc = track.copy()
i = pos_start_curly
c = mm[i] # c is {
tc[c] = 1
i = i + 1
while sum(tc.values()) != 0:
c = mm[i]
if c in braces:
val = values[c]
tc[normalize[c]] += val
i += 1
end = time.time()
return mm[pos_start_curly:i], i, end - start
def _get_intervals(start, end, segments):
intervals = list(range(start, end + 1, int((end - start) / segments)))
intervals[-1] = end
intervals = list(zip(intervals[:-1], intervals[1:]))
click.echo(intervals)
return intervals
@click.command()
@click.option("-f", "--filename", type=click.Path(exists=True))
@click.option("-s", "--start", type=int, default=1)
@click.option(
"-e",
"--end",
type=int,
default=-1,
help="last character position we are interested in, defaults to end of file",
)
@click.option(
"-c", "--concurrency", type=int, default=1, help="number of processes to use"
)
@click.option(
"--object-name",
type=str,
multiple=True,
)
@click.option(
"--object-key",
type=click.UNPROCESSED,
callback=lambda _, __, keys: tuple([tuple(key.split(",")) for key in keys]),
multiple=True,
)
@click.option(
"-o",
"--out",
type=click.Path(writable=True, file_okay=False, resolve_path=True),
default=".",
)
def start(filename, start, end, concurrency, object_name, object_key, out):
"""
This is a code in response to a problem explained below:
There is a really big json file containing one root object with 2 huge arrays,
the file cannot be loaded to memory but we need to rearrange the file to be able to load the file in spark.
Any proper json parser will try to parse all the structure before letting us do anything with the object.
we tried some parsers including jq's streaming parser, it worked for small sample files(~300MB) but was taking way longer for large sample files (~1GB).
This is a pseudo json parser that can extract a json object by matching the opening and closing curly braces, it also uses knowledge of objects(known and always existing keys in object)
to confirm that the parsed object is indeed the expected array object.
it divides the jobs to multiple processes, each process parses a fixed section of the file,
its likely that the start of a process can be in middle of a json object, so the process will skip though some initial characters until it finds the object that we are looking for (my matching keys provided as input)
however, the end position is not a hard limit, it will pass though the end limit if an object parsing is in progress, this behavior ensures that characters skipped by one process are captured by another process.
"""
if len(object_name) != len(object_key):
click.echo("for each object-name you must provide object-key", err=True)
return 1
if not os.path.exists(out):
os.makedirs(out)
object_defs = dict(zip(object_name, object_key))
if end == -1:
with open(filename, "r") as f:
end = f.seek(0, os.SEEK_END) - 1
intervals = _get_intervals(start, end, concurrency)
_processes = []
for start, end in intervals:
process = multiprocessing.Process(
target=find_json_objects,
args=(len(_processes) + 1, filename, start, end, out, object_defs),
)
process.start()
_processes.append(process)
for process in _processes:
process.join()
if __name__ == "__main__":
start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment