-
-
Save trizist/f1d768bdf03ddb3cf097905315651508 to your computer and use it in GitHub Desktop.
Revisions
-
ylow created this gist
Sep 30, 2024 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,48 @@ import pandas as pd import pyarrow as pa import pyarrow.parquet as pq min_row_count = 512 max_row_count = 2048 def write_parquet_content_defined(df: pd.DataFrame, key_column: str, output_file: str): # Initialize the Parquet writer object writer = None batch_accumulator = [] try: for idx, row in df.iterrows(): # Append the current row to the batch accumulator batch_accumulator.append(row) # Check if the hash of the key column % 1024 == 0 if (len(batch_accumulator) >= min_row_count and hash(row[key_column]) % 1024 == 0) or \ len(batch_accumulator) >= max_row_count: # Convert the accumulated rows into a DataFrame batch_df = pd.DataFrame(batch_accumulator) # Convert the DataFrame to a PyArrow Table table = pa.Table.from_pandas(batch_df) # Initialize the writer if it's the first time if writer is None: writer = pq.ParquetWriter(output_file, table.schema) # Write the batch as a Parquet table writer.write_table(table) # Clear the accumulator for the next batch batch_accumulator = [] # Write any remaining rows in the accumulator after the loop if batch_accumulator: batch_df = pd.DataFrame(batch_accumulator) table = pa.Table.from_pandas(batch_df) if writer is None: writer = pq.ParquetWriter(output_file, table.schema) writer.write_table(table) finally: # Close the writer to finish the file if writer is not None: writer.close()