In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, DateType
import pyspark.sql.functions as F
from datetime import date

In [None]:
spark = (
 SparkSession.builder
 # .config("spark.eventLog.gcMetrics.youngGenerationGarbageCollectors", "G1 Young Generation")
 # .config("spark.eventLog.gcMetrics.oldGenerationGarbageCollectors", "G1 Old Generation")
 .getOrCreate()
)

# Setup
* Given an inventory of items
* That are being used for date ranges
* Task: what is the utilization rate?

In [None]:
schema = StructType([
 StructField("id", IntegerType(), False),
 StructField("start_date", DateType(), False),
 StructField("end_date", DateType(), False)
])

df = spark.createDataFrame(
 [
 (1, date(2024, 1, 1), date(2024, 1, 3)),
 (1, date(2024, 1, 8), date(2024, 1, 10)),
 (1, date(2024, 1, 13), date(2024, 1, 17)),
 (1, date(2024, 1, 29), date(2024, 1, 30)),
 (2, date(2024, 1, 2), date(2024, 1, 4)),
 (2, date(2024, 1, 10), date(2024, 1, 14)),
 (3, date(2024, 1, 14), date(2024, 1, 19)),
 (3, date(2024, 1, 27), date(2024, 1, 28)),
 ],
 schema
)
df.show()

In [None]:
min_date = df.select(F.min("start_date")).collect()[0][0]
max_date = df.select(F.max("end_date")).collect()[0][0]
min_date, max_date

In [None]:

# Create DataFrame with sequence of dates
dates_df = spark.range(1).select(
 F.explode(
 F.sequence(
 F.lit(min_date),
 F.lit(max_date),
 F.expr('interval 1 day')
 )
 ).alias("current_date")
)
dates_df.show(5)

In [None]:
joined_df = df.crossJoin(dates_df)
joined_df.show(5)

In [None]:
final_df = joined_df.withColumn(
 "used",
 F.when(
 (F.col("start_date") <= F.col("current_date"))
 & (F.col("end_date") >= F.col("current_date")),
 1,
 ).otherwise(0),
)
final_df.toPandas()

In [None]:
df_pivoted = (
 final_df.groupBy("current_date")
 .pivot("id")
 .agg(F.max("used").alias("used"))
 .orderBy(F.col("current_date"))
)
df_pivoted.show()

In [None]:
wrong_result_df = final_df.groupBy("current_date").agg(
 F.sum("used").alias("total_used"),
 # Don't count! That's wrong!
 F.count("used").alias("count")
).orderBy(F.col("current_date")).show(5)

In [None]:
result_df = (
 final_df.groupBy("current_date")
 .agg(F.sum("used").alias("total_used"))
 # What is the base you want? Daily? Weekly? Monthly?
 .withColumn("total", F.lit(df.select(F.col("id")).distinct().count()))
 .withColumn("percentage", F.round(100 * F.col("total_used") / F.col("total"), 2))
 .orderBy(F.col("current_date"))
 .show()
)

In [None]:
df.createOrReplaceTempView("df")

In [None]:
sql = """
WITH date_bounds
 AS (SELECT Min(start_date) AS min_date,
 Max(end_date) AS max_date
 FROM df),
 dates
 AS (SELECT col AS CURRENT_DATE
 FROM (SELECT Explode(Sequence(min_date, max_date))
 FROM date_bounds))
SELECT *
FROM df
 CROSS JOIN dates
"""

In [None]:
spark.sql(sql).show()