{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import findspark\n", "from pyspark.sql import SparkSession\n", "from pyspark.sql import functions as F\n", "findspark.init()\n", "\n", "\n", "spark = (\n", " SparkSession.builder.appName(\"TestApp\")\n", " .config(\"spark.driver.host\", \"localhost\")\n", " .getOrCreate()\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql.functions import col\n", "\n", "M_FEET_FACTOR = 3.280839895\n", "KG_POUND_FACTOR = 2.20462\n", "\n", "age_df = spark.createDataFrame(\n", " [\n", " (\"bob\", 11),\n", " (\"alice\", 22),\n", " (\"peter\", 33),\n", " ],\n", " [\"name\", \"value\"],\n", ")\n", "\n", "height_df = spark.createDataFrame(\n", " [\n", " (\"bob\", 1.10),\n", " (\"alice\", 1.69),\n", " (\"peter\", 1.88),\n", " ],\n", " [\"name\", \"value\"],\n", ").withColumn(\"imp_value\", F.round(F.col(\"value\") * M_FEET_FACTOR, 2))\n", "\n", "weight_df = spark.createDataFrame(\n", " [\n", " (\"bob\", 36),\n", " (\"alice\", 55),\n", " (\"peter\", 87),\n", " ],\n", " [\"name\", \"value\"],\n", ").withColumn(\"imp_value\", F.round(F.col(\"value\") * KG_POUND_FACTOR, 2))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "joined_df = age_df.join(height_df, on=\"name\", how=\"left\").join(\n", " weight_df, on=\"name\", how=\"left\"\n", ")\n", "joined_df.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Using the original table" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "joined_df.select(age_df.value).show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "joined_df.select(weight_df.imp_value).show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Pre-process" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pre_pro_age_df = age_df.withColumnRenamed(\"value\", \"age\")\n", "pre_pro_height_df = height_df.withColumnRenamed(\"value\", \"height\").withColumnRenamed(\n", " \"imp_value\", \"height (ft)\"\n", ")\n", "pre_pro_weight_df = weight_df.withColumnRenamed(\"value\", \"weight\").withColumnRenamed(\n", " \"imp_value\", \"weight (pound)\"\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "joined_df_2 = pre_pro_age_df.join(pre_pro_height_df, on=\"name\", how=\"left\").join(\n", " pre_pro_weight_df, on=\"name\", how=\"left\"\n", ")\n", "joined_df_2.show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "joined_df_2.select(\"age\").show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "joined_df_2.select(F.col(\"height (ft)\")).show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Post process" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Same as the first case\n", "joined_df_3 = (\n", " age_df.join(height_df, on=\"name\", how=\"left\")\n", " .join(weight_df, on=\"name\", how=\"left\")\n", " .toDF(\n", " *[\n", " \"name\",\n", " \"age\",\n", " \"height\",\n", " \"height_imp\",\n", " \"weight\",\n", " \"weight_imp\",\n", " ]\n", " )\n", ")\n", "\n", "joined_df_3.show()" ] } ], "metadata": { "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.2" } }, "nbformat": 4, "nbformat_minor": 2 }