Skip to content

Instantly share code, notes, and snippets.

@letenkov
Last active December 19, 2024 15:40
Show Gist options
  • Select an option

  • Save letenkov/b0f58df7d01aa38bbfaba66fc40ecc0b to your computer and use it in GitHub Desktop.

Select an option

Save letenkov/b0f58df7d01aa38bbfaba66fc40ecc0b to your computer and use it in GitHub Desktop.

Revisions

  1. letenkov revised this gist Dec 19, 2024. 3 changed files with 163 additions and 0 deletions.
    5 changes: 5 additions & 0 deletions Dockerfile
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,5 @@
    FROM python:3.9-slim
    RUN pip install --no-cache-dir boto3==1.35.84 fastavro==1.9.7 cramjam==2.9.1
    ENV PYTHONUNBUFFERED=1
    COPY avro-reader.py /app/avro-reader.py
    WORKDIR /app
    118 changes: 118 additions & 0 deletions avro-reader.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,118 @@
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    # http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.

    # Author: Letenkov, Eugene
    # Copyright 2024 Letenkov, Eugene. All rights reserved.

    import os
    import boto3
    from fastavro import reader
    import io
    from concurrent.futures import ThreadPoolExecutor, as_completed
    import argparse
    import signal

    stop_processing = False

    def signal_handler(sig, frame):
    global stop_processing
    print('Signal received, stopping...')
    stop_processing = True

    def get_all_avro_files(bucket_name, prefix, s3_client):
    print(f"Fetching list of Avro files from bucket: {bucket_name} with prefix: {prefix}")
    paginator = s3_client.get_paginator('list_objects_v2')
    page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix)

    avro_files = []

    for page in page_iterator:
    file_info = page.get('Contents', [])
    avro_files.extend([file['Key'] for file in file_info if file['Key'].endswith('.avro')])

    print(f"Found {len(avro_files)} Avro files.")
    return avro_files

    def count_rows_in_avro_file(s3_client, bucket_name, file_key):
    if stop_processing:
    return 0
    print(f"Processing file: {file_key}\n")
    obj = s3_client.get_object(Bucket=bucket_name, Key=file_key)
    data = obj['Body'].read()

    with io.BytesIO(data) as file_io:
    avro_reader = reader(file_io)
    row_count = 0
    for _ in avro_reader:
    if stop_processing:
    print(f"Interrupted processing of file: {file_key}")
    return row_count
    row_count += 1
    return row_count

    def process_files(bucket_name, avro_files, num_threads):
    s3_client = boto3.client('s3')
    total_rows = 0

    num_files = len(avro_files)

    with ThreadPoolExecutor(max_workers=num_threads) as executor:
    futures = {executor.submit(count_rows_in_avro_file, s3_client, bucket_name, file_key): file_key for file_key in avro_files}
    completed_files = 0

    try:
    for future in as_completed(futures):
    if stop_processing:
    print("Process interrupted. Attempting to cancel remaining tasks...")
    for f in futures:
    f.cancel()
    break
    try:
    total_rows += future.result()
    completed_files += 1
    remaining_files = num_files - completed_files
    print(f"Processed {completed_files}/{num_files} files. Remaining: {remaining_files}\n")
    except Exception as e:
    print(f"Error processing file: {e}")
    finally:
    print("Shutting down executor")
    executor.shutdown(wait=False)

    return total_rows

    def main():
    global stop_processing
    signal.signal(signal.SIGINT, signal_handler)

    parser = argparse.ArgumentParser(description="Process Avro files in S3 bucket.")
    parser.add_argument('--bucket_name', required=True, help='Name of the S3 bucket')
    parser.add_argument('--prefix', required=True, help='Prefix of the S3 objects')
    parser.add_argument('--threads', type=int, default=10, help='Number of threads to use for processing files')
    args = parser.parse_args()

    bucket_name = args.bucket_name
    prefix = args.prefix
    num_threads = args.threads

    s3_client = boto3.client('s3')

    avro_files = get_all_avro_files(bucket_name, prefix, s3_client)

    total_rows = process_files(bucket_name, avro_files, num_threads)

    if not stop_processing:
    print(f"Total number of rows across all files: {total_rows}")
    else:
    print("Processing was interrupted.")

    if __name__ == '__main__':
    main()
    40 changes: 40 additions & 0 deletions deployment.yaml
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,40 @@
    apiVersion: apps/v1
    kind: Deployment
    metadata:
    name: avro-processing
    spec:
    replicas: 1
    selector:
    matchLabels:
    app: avro-processing
    template:
    metadata:
    labels:
    app: avro-processing
    spec:
    containers:
    - name: avro-reader
    image: your-username/your-image-name:latest
    command: ["/bin/sh"]
    args:
    - -c
    - >-
    python /app/avro-reader.py
    --bucket_name=$(S3_BUCKET_NAME)
    --prefix=$(S3_PREFIX)
    --threads=$(THREADS)
    env:
    - name: AWS_ACCESS_KEY_ID
    value: "your-access-key"
    - name: AWS_SECRET_ACCESS_KEY
    value: "your-secret-key"
    - name: AWS_DEFAULT_REGION
    value: "ru-central1"
    - name: AWS_ENDPOINT_URL
    value: "https://storage.yandexcloud.net"
    - name: S3_BUCKET_NAME
    value: "my-bucket"
    - name: S3_PREFIX
    value: "my/prefix/"
    - name: THREADS
    value: "20"
  2. letenkov created this gist Dec 19, 2024.
    70 changes: 70 additions & 0 deletions README.md
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,70 @@
    # Avro Reader

    This project provides a tool for processing Avro files stored on S3. It reads the files, counts the number of rows, and watches for new files that are added while running.

    ## Features

    - Reads Avro files from a specified S3 bucket and prefix.
    - Counts the number of rows in each Avro file.
    - Watches for and processes new files that are added to the S3 bucket during execution.
    - Allows configuration of the number of threads for parallel processing.

    ## Requirements

    - Docker
    - Kubernetes
    - AWS S3 or compatible object storage, such as Yandex Cloud Storage

    ## Setup

    ### Docker

    1. Build the Docker image:

    ```sh
    docker build -t your-username/your-image-name:latest .
    ```

    2. Push the Docker image to a container registry:

    ```sh
    docker push your-username/your-image-name:latest
    ```

    ### Kubernetes

    1. Update the `deployment.yaml` with your specific configuration such as access keys, bucket name, and prefix.

    2. Deploy to your Kubernetes cluster:

    ```sh
    kubectl apply -f deployment.yaml
    ```

    ## Configuration

    The behavior of the Avro Reader is controlled via environment variables specified in the Kubernetes `deployment.yaml`:

    - `AWS_ACCESS_KEY_ID`: Your AWS access key.
    - `AWS_SECRET_ACCESS_KEY`: Your AWS secret key.
    - `AWS_DEFAULT_REGION`: The AWS region where your S3 bucket is located.
    - `AWS_ENDPOINT_URL`: Endpoint URL for non-AWS S3 service like Yandex Cloud.
    - `S3_BUCKET_NAME`: Name of the S3 bucket.
    - `S3_PREFIX`: Prefix to filter relevant Avro files in the bucket.
    - `THREADS`: Number of threads to use for processing files concurrently.

    ## Usage

    Upon deployment, the application will:

    1. Connect to the specified S3 bucket.
    2. List and process all existing Avro files under the specified prefix.
    3. Continue watching for and processing any new files added to the bucket.

    ## License

    This project is licensed under the Apache License 2.0. See the [LICENSE](http://www.apache.org/licenses/LICENSE-2.0) file for more details.

    ## Author

    - Letenkov, Eugene, 2024