Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save trizist/f1d768bdf03ddb3cf097905315651508 to your computer and use it in GitHub Desktop.
Save trizist/f1d768bdf03ddb3cf097905315651508 to your computer and use it in GitHub Desktop.

Revisions

  1. @ylow ylow created this gist Sep 30, 2024.
    48 changes: 48 additions & 0 deletions content_defined_parquet_writer.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,48 @@
    import pandas as pd
    import pyarrow as pa
    import pyarrow.parquet as pq

    min_row_count = 512
    max_row_count = 2048
    def write_parquet_content_defined(df: pd.DataFrame, key_column: str, output_file: str):
    # Initialize the Parquet writer object
    writer = None
    batch_accumulator = []

    try:
    for idx, row in df.iterrows():
    # Append the current row to the batch accumulator
    batch_accumulator.append(row)

    # Check if the hash of the key column % 1024 == 0
    if (len(batch_accumulator) >= min_row_count and
    hash(row[key_column]) % 1024 == 0) or \
    len(batch_accumulator) >= max_row_count:
    # Convert the accumulated rows into a DataFrame
    batch_df = pd.DataFrame(batch_accumulator)

    # Convert the DataFrame to a PyArrow Table
    table = pa.Table.from_pandas(batch_df)

    # Initialize the writer if it's the first time
    if writer is None:
    writer = pq.ParquetWriter(output_file, table.schema)

    # Write the batch as a Parquet table
    writer.write_table(table)

    # Clear the accumulator for the next batch
    batch_accumulator = []

    # Write any remaining rows in the accumulator after the loop
    if batch_accumulator:
    batch_df = pd.DataFrame(batch_accumulator)
    table = pa.Table.from_pandas(batch_df)
    if writer is None:
    writer = pq.ParquetWriter(output_file, table.schema)
    writer.write_table(table)

    finally:
    # Close the writer to finish the file
    if writer is not None:
    writer.close()