Skip to content

Instantly share code, notes, and snippets.

@amotl
Created July 9, 2025 23:49
Show Gist options
  • Select an option

  • Save amotl/f55474d633e7c71498c24ce3ca4c6332 to your computer and use it in GitHub Desktop.

Select an option

Save amotl/f55474d633e7c71498c24ce3ca4c6332 to your computer and use it in GitHub Desktop.

Revisions

  1. amotl created this gist Jul 9, 2025.
    111 changes: 111 additions & 0 deletions cratedb-orjson.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,111 @@
    """
    ## About
    `crate-python` uses `orjson` for JSON serialization.
    ## Errors
    - TypeError: Type is not JSON serializable: numpy.ndarray
    - TypeError: Type is not JSON serializable: recarray
    ## orjson
    `json_dumps` uses the following incantation of `orjson`.
    `json_encoder` is responsible for encoding `Decimal`, `dt.datetime`, and `dt.date` types.
    orjson.dumps(
    obj,
    default=json_encoder,
    option=(
    orjson.OPT_PASSTHROUGH_DATETIME
    | orjson.OPT_NON_STR_KEYS
    | orjson.OPT_SERIALIZE_NUMPY
    ),
    )
    ## Prerequisites
    uv pip install --upgrade pandas polars 'sqlalchemy-cratedb>=0.42.0dev2'
    docker run --rm --name=cratedb \
    --publish=4200:4200 --publish=5432:5432 \
    --env=CRATE_HEAP_SIZE=2g crate/crate:nightly -Cdiscovery.type=single-node
    """

    import io
    import json
    import logging
    from collections import OrderedDict

    import colorlog
    import numpy as np
    import pandas as pd
    import polars as pl

    from crate.client.http import json_dumps

    from sqlalchemy_cratedb import insert_bulk

    logger = colorlog.getLogger()

    payload_jsonl = io.BytesIO(b'{"foo":"bar","baz":["qux"]}')
    payload_numpy = {"foo": "bar","baz": np.array(["qux"])}


    def numpy_to_stdout():
    print(json_dumps(payload_numpy))


    def numpy_to_db():
    df = pd.DataFrame.from_records(payload_numpy)
    df.to_sql(name="numpy", con="crate://", index=False, if_exists="replace")


    def pandas_to_stdout():
    df = pd.read_json(payload_jsonl, lines=True)
    print(json_dumps(df.to_records()))


    def pandas_to_db():
    df = pd.read_json(payload_jsonl, lines=True)
    df.to_sql(name="pandas", con="crate://", index=False, if_exists="replace", method=insert_bulk)


    def polars_to_stdout():
    df = pl.read_ndjson(payload_jsonl)
    print(json_dumps(df.to_dicts()))


    def polars_to_db():
    df = pl.read_ndjson(payload_jsonl)
    df.write_database(table_name="polars", connection="crate://", if_table_exists="replace", method=insert_bulk)


    def main():
    functions = [
    numpy_to_stdout,
    numpy_to_db,
    pandas_to_stdout,
    pandas_to_db,
    polars_to_stdout,
    polars_to_db,
    ]
    success = 0
    results = OrderedDict()
    for fun in functions:
    print("=" * 42)
    try:
    fun()
    logger.info(f"Function succeeded: {fun}")
    success += 1
    results[fun.__name__] = True
    except:
    logger.exception(f"Function failed: {fun}")
    results[fun.__name__] = False
    logger.info(f"Succeeded: {success} / {len(functions)}")
    logger.info(f"Results:\n{json.dumps(results, indent=2)}")


    if __name__ == "__main__":
    logging.basicConfig(level=logging.DEBUG, format="%(asctime)s.%(msecs)03d [%(module)s] %(levelname)s %(funcName)s - %(message)s")
    main()