Last active
December 19, 2024 15:40
-
-
Save letenkov/b0f58df7d01aa38bbfaba66fc40ecc0b to your computer and use it in GitHub Desktop.
Revisions
-
letenkov revised this gist
Dec 19, 2024 . 3 changed files with 163 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,5 @@ FROM python:3.9-slim RUN pip install --no-cache-dir boto3==1.35.84 fastavro==1.9.7 cramjam==2.9.1 ENV PYTHONUNBUFFERED=1 COPY avro-reader.py /app/avro-reader.py WORKDIR /app This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,118 @@ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # Author: Letenkov, Eugene # Copyright 2024 Letenkov, Eugene. All rights reserved. import os import boto3 from fastavro import reader import io from concurrent.futures import ThreadPoolExecutor, as_completed import argparse import signal stop_processing = False def signal_handler(sig, frame): global stop_processing print('Signal received, stopping...') stop_processing = True def get_all_avro_files(bucket_name, prefix, s3_client): print(f"Fetching list of Avro files from bucket: {bucket_name} with prefix: {prefix}") paginator = s3_client.get_paginator('list_objects_v2') page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix) avro_files = [] for page in page_iterator: file_info = page.get('Contents', []) avro_files.extend([file['Key'] for file in file_info if file['Key'].endswith('.avro')]) print(f"Found {len(avro_files)} Avro files.") return avro_files def count_rows_in_avro_file(s3_client, bucket_name, file_key): if stop_processing: return 0 print(f"Processing file: {file_key}\n") obj = s3_client.get_object(Bucket=bucket_name, Key=file_key) data = obj['Body'].read() with io.BytesIO(data) as file_io: avro_reader = reader(file_io) row_count = 0 for _ in avro_reader: if stop_processing: print(f"Interrupted processing of file: {file_key}") return row_count row_count += 1 return row_count def process_files(bucket_name, avro_files, num_threads): s3_client = boto3.client('s3') total_rows = 0 num_files = len(avro_files) with ThreadPoolExecutor(max_workers=num_threads) as executor: futures = {executor.submit(count_rows_in_avro_file, s3_client, bucket_name, file_key): file_key for file_key in avro_files} completed_files = 0 try: for future in as_completed(futures): if stop_processing: print("Process interrupted. Attempting to cancel remaining tasks...") for f in futures: f.cancel() break try: total_rows += future.result() completed_files += 1 remaining_files = num_files - completed_files print(f"Processed {completed_files}/{num_files} files. Remaining: {remaining_files}\n") except Exception as e: print(f"Error processing file: {e}") finally: print("Shutting down executor") executor.shutdown(wait=False) return total_rows def main(): global stop_processing signal.signal(signal.SIGINT, signal_handler) parser = argparse.ArgumentParser(description="Process Avro files in S3 bucket.") parser.add_argument('--bucket_name', required=True, help='Name of the S3 bucket') parser.add_argument('--prefix', required=True, help='Prefix of the S3 objects') parser.add_argument('--threads', type=int, default=10, help='Number of threads to use for processing files') args = parser.parse_args() bucket_name = args.bucket_name prefix = args.prefix num_threads = args.threads s3_client = boto3.client('s3') avro_files = get_all_avro_files(bucket_name, prefix, s3_client) total_rows = process_files(bucket_name, avro_files, num_threads) if not stop_processing: print(f"Total number of rows across all files: {total_rows}") else: print("Processing was interrupted.") if __name__ == '__main__': main() This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,40 @@ apiVersion: apps/v1 kind: Deployment metadata: name: avro-processing spec: replicas: 1 selector: matchLabels: app: avro-processing template: metadata: labels: app: avro-processing spec: containers: - name: avro-reader image: your-username/your-image-name:latest command: ["/bin/sh"] args: - -c - >- python /app/avro-reader.py --bucket_name=$(S3_BUCKET_NAME) --prefix=$(S3_PREFIX) --threads=$(THREADS) env: - name: AWS_ACCESS_KEY_ID value: "your-access-key" - name: AWS_SECRET_ACCESS_KEY value: "your-secret-key" - name: AWS_DEFAULT_REGION value: "ru-central1" - name: AWS_ENDPOINT_URL value: "https://storage.yandexcloud.net" - name: S3_BUCKET_NAME value: "my-bucket" - name: S3_PREFIX value: "my/prefix/" - name: THREADS value: "20" -
letenkov created this gist
Dec 19, 2024 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,70 @@ # Avro Reader This project provides a tool for processing Avro files stored on S3. It reads the files, counts the number of rows, and watches for new files that are added while running. ## Features - Reads Avro files from a specified S3 bucket and prefix. - Counts the number of rows in each Avro file. - Watches for and processes new files that are added to the S3 bucket during execution. - Allows configuration of the number of threads for parallel processing. ## Requirements - Docker - Kubernetes - AWS S3 or compatible object storage, such as Yandex Cloud Storage ## Setup ### Docker 1. Build the Docker image: ```sh docker build -t your-username/your-image-name:latest . ``` 2. Push the Docker image to a container registry: ```sh docker push your-username/your-image-name:latest ``` ### Kubernetes 1. Update the `deployment.yaml` with your specific configuration such as access keys, bucket name, and prefix. 2. Deploy to your Kubernetes cluster: ```sh kubectl apply -f deployment.yaml ``` ## Configuration The behavior of the Avro Reader is controlled via environment variables specified in the Kubernetes `deployment.yaml`: - `AWS_ACCESS_KEY_ID`: Your AWS access key. - `AWS_SECRET_ACCESS_KEY`: Your AWS secret key. - `AWS_DEFAULT_REGION`: The AWS region where your S3 bucket is located. - `AWS_ENDPOINT_URL`: Endpoint URL for non-AWS S3 service like Yandex Cloud. - `S3_BUCKET_NAME`: Name of the S3 bucket. - `S3_PREFIX`: Prefix to filter relevant Avro files in the bucket. - `THREADS`: Number of threads to use for processing files concurrently. ## Usage Upon deployment, the application will: 1. Connect to the specified S3 bucket. 2. List and process all existing Avro files under the specified prefix. 3. Continue watching for and processing any new files added to the bucket. ## License This project is licensed under the Apache License 2.0. See the [LICENSE](http://www.apache.org/licenses/LICENSE-2.0) file for more details. ## Author - Letenkov, Eugene, 2024