{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import SparkSession\n", "from pyspark.sql.types import StructType, StructField, IntegerType, DateType\n", "import pyspark.sql.functions as F\n", "from datetime import date" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "spark = (\n", " SparkSession.builder\n", " # .config(\"spark.eventLog.gcMetrics.youngGenerationGarbageCollectors\", \"G1 Young Generation\")\n", " # .config(\"spark.eventLog.gcMetrics.oldGenerationGarbageCollectors\", \"G1 Old Generation\")\n", " .getOrCreate()\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Setup\n", "* Given an inventory of items\n", "* That are being used for date ranges\n", "* Task: what is the utilization rate?" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "schema = StructType([\n", " StructField(\"id\", IntegerType(), False),\n", " StructField(\"start_date\", DateType(), False),\n", " StructField(\"end_date\", DateType(), False)\n", "])\n", "\n", "df = spark.createDataFrame(\n", " [\n", " (1, date(2024, 1, 1), date(2024, 1, 3)),\n", " (1, date(2024, 1, 8), date(2024, 1, 10)),\n", " (1, date(2024, 1, 13), date(2024, 1, 17)),\n", " (1, date(2024, 1, 29), date(2024, 1, 30)),\n", " (2, date(2024, 1, 2), date(2024, 1, 4)),\n", " (2, date(2024, 1, 10), date(2024, 1, 14)),\n", " (3, date(2024, 1, 14), date(2024, 1, 19)),\n", " (3, date(2024, 1, 27), date(2024, 1, 28)),\n", " ],\n", " schema\n", ")\n", "df.show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "min_date = df.select(F.min(\"start_date\")).collect()[0][0]\n", "max_date = df.select(F.max(\"end_date\")).collect()[0][0]\n", "min_date, max_date" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "# Create DataFrame with sequence of dates\n", "dates_df = spark.range(1).select(\n", " F.explode(\n", " F.sequence(\n", " F.lit(min_date),\n", " F.lit(max_date),\n", " F.expr('interval 1 day')\n", " )\n", " ).alias(\"current_date\")\n", ")\n", "dates_df.show(5)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "joined_df = df.crossJoin(dates_df)\n", "joined_df.show(5)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "final_df = joined_df.withColumn(\n", " \"used\",\n", " F.when(\n", " (F.col(\"start_date\") <= F.col(\"current_date\"))\n", " & (F.col(\"end_date\") >= F.col(\"current_date\")),\n", " 1,\n", " ).otherwise(0),\n", ")\n", "final_df.toPandas()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df_pivoted = (\n", " final_df.groupBy(\"current_date\")\n", " .pivot(\"id\")\n", " .agg(F.max(\"used\").alias(\"used\"))\n", " .orderBy(F.col(\"current_date\"))\n", ")\n", "df_pivoted.show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "wrong_result_df = final_df.groupBy(\"current_date\").agg(\n", " F.sum(\"used\").alias(\"total_used\"),\n", " # Don't count! That's wrong!\n", " F.count(\"used\").alias(\"count\")\n", ").orderBy(F.col(\"current_date\")).show(5)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "result_df = (\n", " final_df.groupBy(\"current_date\")\n", " .agg(F.sum(\"used\").alias(\"total_used\"))\n", " # What is the base you want? Daily? Weekly? Monthly?\n", " .withColumn(\"total\", F.lit(df.select(F.col(\"id\")).distinct().count()))\n", " .withColumn(\"percentage\", F.round(100 * F.col(\"total_used\") / F.col(\"total\"), 2))\n", " .orderBy(F.col(\"current_date\"))\n", " .show()\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df.createOrReplaceTempView(\"df\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sql = \"\"\"\n", "WITH date_bounds\n", " AS (SELECT Min(start_date) AS min_date,\n", " Max(end_date) AS max_date\n", " FROM df),\n", " dates\n", " AS (SELECT col AS CURRENT_DATE\n", " FROM (SELECT Explode(Sequence(min_date, max_date))\n", " FROM date_bounds))\n", "SELECT *\n", "FROM df\n", " CROSS JOIN dates\n", "\"\"\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "spark.sql(sql).show()" ] } ], "metadata": { "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.2" } }, "nbformat": 4, "nbformat_minor": 2 }