Skip to content

Instantly share code, notes, and snippets.

@alonsoir
Created March 26, 2025 12:10
Show Gist options
  • Save alonsoir/af298645bb6186c1e384d4d46cfb2a51 to your computer and use it in GitHub Desktop.
Save alonsoir/af298645bb6186c1e384d4d46cfb2a51 to your computer and use it in GitHub Desktop.

Revisions

  1. alonsoir created this gist Mar 26, 2025.
    126 changes: 126 additions & 0 deletions delta_lake_data_analyst.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,126 @@
    import asyncio
    import os
    from textwrap import dedent
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col
    from agno.agent import Agent
    from agno.models.openai import OpenAIChat
    import requests
    from dotenv import load_dotenv
    import logging

    # Configurar logging
    logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

    # Cargar API key
    load_dotenv()
    openai_api_key = os.getenv("OPENAI_API_KEY")
    if not openai_api_key:
    logging.error("El token de OpenAI no está configurado.")
    raise ValueError("El token de OpenAI no está configurado.")

    # Configurar Spark con soporte para Delta Lake en local
    spark = SparkSession.builder \
    .appName("LocalDeltaLakeAgent") \
    .master("local[*]") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.3.0") \
    .config("spark.driver.host", "127.0.0.1") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config("spark.delta.logRetentionDuration", "interval 7 days") \
    .config("spark.delta.deletedFileRetentionDuration", "interval 7 days") \
    .getOrCreate()

    # Herramienta personalizada para Delta Lake local
    class DeltaLakeTools:
    def __init__(self, table_path):
    self.table_path = table_path
    if not os.path.exists(table_path):
    os.makedirs(table_path)

    def initialize_from_remote_csv(self, url):
    local_csv_path = "temp_movies.csv"
    response = requests.get(url)
    with open(local_csv_path, "wb") as f:
    f.write(response.content)
    df = spark.read.option("header", "true").csv(local_csv_path)
    # Inspeccionar columnas originales
    original_columns = df.columns
    logging.info(f"Columnas originales del CSV: {original_columns}")
    # Definir columnas esperadas
    expected_columns = ["Rank", "Title", "Genre", "Description", "Director", "Actors", "Year", "Runtime_Minutes", "Rating", "Votes", "Revenue_Millions", "Metascore"]
    if len(original_columns) != len(expected_columns):
    logging.warning(f"El número de columnas no coincide. Original: {len(original_columns)}, Esperado: {len(expected_columns)}")
    df = df.toDF(*expected_columns[:len(original_columns)])
    df = df.withColumn("Rating", col("Rating").cast("float")).filter(col("Rating").isNotNull())
    df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(self.table_path)
    logging.info(f"Tabla Delta inicializada en {self.table_path} con datos de {url}")
    os.remove(local_csv_path)

    async def query(self, sql_query):
    """Ejecutar consulta sobre la tabla Delta y devolver resultados en Markdown"""
    logging.info(f"Ejecutando consulta: {sql_query}")
    loop = asyncio.get_event_loop()
    df = spark.read.format("delta").load(self.table_path)
    if "AVG" in sql_query.upper():
    result = df.agg({"Rating": "avg"}).toPandas().to_markdown()
    else:
    result = df.orderBy(df.Rating.desc()).limit(5).select("Title", "Rating").toPandas().to_markdown()
    logging.info(f"Resultado de la consulta: {result}")
    return result

    async def update_table(self, url):
    loop = asyncio.get_event_loop()
    local_csv_path = "temp_movies.csv"
    response = requests.get(url)
    with open(local_csv_path, "wb") as f:
    f.write(response.content)
    df = spark.read.option("header", "true").csv(local_csv_path)
    original_columns = df.columns
    logging.info(f"Columnas originales del CSV (actualización): {original_columns}")
    expected_columns = ["Rank", "Title", "Genre", "Description", "Director", "Actors", "Year", "Runtime_Minutes", "Rating", "Votes", "Revenue_Millions", "Metascore"]
    df = df.toDF(*expected_columns[:len(original_columns)])
    df = df.withColumn("Rating", col("Rating").cast("float")).filter(col("Rating").isNotNull())
    await loop.run_in_executor(None, lambda: df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(self.table_path))
    logging.info(f"Tabla actualizada con datos de {url}")
    os.remove(local_csv_path)

    # Configurar herramientas y agente
    local_table_path = "./delta_movies"
    delta_tools = DeltaLakeTools(table_path=local_table_path)
    delta_tools.initialize_from_remote_csv("https://agno-public.s3.amazonaws.com/demo_data/IMDB-Movie-Data.csv")

    agent = Agent(
    model=OpenAIChat(id="gpt-4o", api_key=openai_api_key),
    tools=[delta_tools],
    markdown=True,
    show_tool_calls=True,
    additional_context=dedent("""\
    You have a Delta Lake table at ./delta_movies with IMDB data (columns: Title, Rating, Year, etc.).
    Rating is a numeric column (1-10). Use the 'query' tool for all data requests and return its Markdown result.
    - "What is the average rating?": Call 'query' with "SELECT AVG(Rating) AS Average_Rating FROM default.movies".
    - "List the top 5 movies by rating": Call 'query' with "SELECT Title, Rating FROM default.movies ORDER BY Rating DESC LIMIT 5".
    Do NOT invent data or skip the tool. Return only the Markdown table from 'query'.
    """),
    )

    # Función para probar consultas y actualizaciones
    async def run_local_operations():
    print("Consulta inicial:")
    await agent.aprint_response("What is the average rating of movies?")
    print("\nActualizando tabla...")
    await delta_tools.update_table("https://agno-public.s3.amazonaws.com/demo_data/IMDB-Movie-Data.csv")
    print("\nConsulta tras actualización:")
    await agent.aprint_response("List the top 5 movies by rating.")
    print("\nVerificación manual:")
    result_avg = await delta_tools.query("SELECT AVG(Rating) AS Average_Rating FROM default.movies")
    print("Promedio manual:", result_avg)
    result_top5 = await delta_tools.query("SELECT Title, Rating FROM default.movies ORDER BY Rating DESC LIMIT 5")
    print("Top 5 manual:", result_top5)

    # Ejecutar el flujo
    asyncio.run(run_local_operations())

    # Cerrar la sesión de Spark
    spark.stop()
    103 changes: 103 additions & 0 deletions output_delta_lake_analyst.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,103 @@
    25/03/26 13:08:12 WARN Utils: Your hostname, MacBook-Pro-de-Alonso.local resolves to a loopback address: 127.0.0.1; using 192.168.1.114 instead (on interface en0)
    25/03/26 13:08:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
    :: loading settings :: url = jar:file:/Users/aironman/git/python-samples-2025/.venv/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
    Ivy Default Cache set to: /Users/aironman/.ivy2/cache
    The jars for the packages stored in: /Users/aironman/.ivy2/jars
    io.delta#delta-spark_2.12 added as a dependency
    :: resolving dependencies :: org.apache.spark#spark-submit-parent-4c1ca62c-ca3a-4ee7-b820-2d5147d81522;1.0
    confs: [default]
    found io.delta#delta-spark_2.12;3.3.0 in central
    found io.delta#delta-storage;3.3.0 in central
    found org.antlr#antlr4-runtime;4.9.3 in central
    :: resolution report :: resolve 194ms :: artifacts dl 7ms
    :: modules in use:
    io.delta#delta-spark_2.12;3.3.0 from central in [default]
    io.delta#delta-storage;3.3.0 from central in [default]
    org.antlr#antlr4-runtime;4.9.3 from central in [default]
    ---------------------------------------------------------------------
    | | modules || artifacts |
    | conf | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    | default | 3 | 0 | 0 | 0 || 3 | 0 |
    ---------------------------------------------------------------------
    :: retrieving :: org.apache.spark#spark-submit-parent-4c1ca62c-ca3a-4ee7-b820-2d5147d81522
    confs: [default]
    0 artifacts copied, 3 already retrieved (0kB/5ms)
    25/03/26 13:08:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    2025-03-26 13:08:22,842 - INFO - Columnas originales del CSV: ['Rank', 'Title', 'Genre', 'Description', 'Director', 'Actors', 'Year', 'Runtime (Minutes)', 'Rating', 'Votes', 'Revenue (Millions)', 'Metascore']
    25/03/26 13:08:24 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
    2025-03-26 13:08:33,302 - INFO - Tabla Delta inicializada en ./delta_movies con datos de https://agno-public.s3.amazonaws.com/demo_data/IMDB-Movie-Data.csv
    Consulta inicial:
    ▰▰▰▱▱▱▱ Thinking...
    ┏━ Message ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
    ▰▰▰▰▰▰▰ Thinking...
    ┏━ Message ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
    ┏━ Message ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
    ┃ ┃
    ┃ What is the average rating of movies? ┃
    ┃ ┃
    ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
    ┏━ Response (1.7s) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
    ┃ ┃
    ┃ To find the average rating of movies, I will run the following query: ┃
    ┃ ┃
    ┃ ┃
    ┃ SELECT AVG(Rating) AS Average_Rating FROM default.movies ┃
    ┃ ┃
    ┃ ┃
    ┃ Let's see the result. ┃
    ┃ ┃
    ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛

    Actualizando tabla...
    2025-03-26 13:08:36,075 - INFO - Columnas originales del CSV (actualización): ['Rank', 'Title', 'Genre', 'Description', 'Director', 'Actors', 'Year', 'Runtime (Minutes)', 'Rating', 'Votes', 'Revenue (Millions)', 'Metascore']
    2025-03-26 13:08:38,385 - INFO - Tabla actualizada con datos de https://agno-public.s3.amazonaws.com/demo_data/IMDB-Movie-Data.csv

    Consulta tras actualización:
    ▰▰▰▱▱▱▱ Thinking...
    ┏━ Message ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
    ▰▱▱▱▱▱▱ Thinking...
    ┏━ Message ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
    ┏━ Message ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
    ┃ ┃
    ┃ List the top 5 movies by rating. ┃
    ┃ ┃
    ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
    ┏━ Response (1.9s) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
    ┃ ┃
    ┃ ┃
    ┃ SELECT Title, Rating FROM default.movies ORDER BY Rating DESC LIMIT 5 ┃
    ┃ ┃
    ┃ ┃
    ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛

    Verificación manual:
    2025-03-26 13:08:40,249 - INFO - Ejecutando consulta: SELECT AVG(Rating) AS Average_Rating FROM default.movies
    2025-03-26 13:08:42,067 - INFO - Resultado de la consulta: | | avg(Rating) |
    |---:|--------------:|
    | 0 | 15.4287 |
    Promedio manual: | | avg(Rating) |
    |---:|--------------:|
    | 0 | 15.4287 |
    2025-03-26 13:08:42,067 - INFO - Ejecutando consulta: SELECT Title, Rating FROM default.movies ORDER BY Rating DESC LIMIT 5
    2025-03-26 13:08:42,450 - INFO - Resultado de la consulta: | | Title | Rating |
    |---:|:--------------------|---------:|
    | 0 | Forushande | 2016 |
    | 1 | Kung Fu Panda 3 | 2016 |
    | 2 | The Interview | 2014 |
    | 3 | Idiocracy | 2006 |
    | 4 | A Cure for Wellness | 146 |
    Top 5 manual: | | Title | Rating |
    |---:|:--------------------|---------:|
    | 0 | Forushande | 2016 |
    | 1 | Kung Fu Panda 3 | 2016 |
    | 2 | The Interview | 2014 |
    | 3 | Idiocracy | 2006 |
    | 4 | A Cure for Wellness | 146 |
    2025-03-26 13:08:42,451 - INFO - Closing down clientserver connection
    2025-03-26 13:08:42,768 - INFO - Closing down clientserver connection
    poetry run python delta_lake_data_analyst.py 3,09s user 1,03s system 11% cpu 34,959 total
    python-samples-2025-py3.10┌<▸> ~/g/p/s/p/agno
    └➤