In [None]:
import findspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
findspark.init()


spark = (
 SparkSession.builder.appName("TestApp")
 .config("spark.driver.host", "localhost")
 .getOrCreate()
)

In [None]:
from pyspark.sql.functions import col

M_FEET_FACTOR = 3.280839895
KG_POUND_FACTOR = 2.20462

age_df = spark.createDataFrame(
 [
 ("bob", 11),
 ("alice", 22),
 ("peter", 33),
 ],
 ["name", "value"],
)

height_df = spark.createDataFrame(
 [
 ("bob", 1.10),
 ("alice", 1.69),
 ("peter", 1.88),
 ],
 ["name", "value"],
).withColumn("imp_value", F.round(F.col("value") * M_FEET_FACTOR, 2))

weight_df = spark.createDataFrame(
 [
 ("bob", 36),
 ("alice", 55),
 ("peter", 87),
 ],
 ["name", "value"],
).withColumn("imp_value", F.round(F.col("value") * KG_POUND_FACTOR, 2))

In [None]:
joined_df = age_df.join(height_df, on="name", how="left").join(
 weight_df, on="name", how="left"
)
joined_df.show()

## Using the original table

In [None]:
joined_df.select(age_df.value).show()

In [None]:
joined_df.select(weight_df.imp_value).show()

## Pre-process

In [None]:
pre_pro_age_df = age_df.withColumnRenamed("value", "age")
pre_pro_height_df = height_df.withColumnRenamed("value", "height").withColumnRenamed(
 "imp_value", "height (ft)"
)
pre_pro_weight_df = weight_df.withColumnRenamed("value", "weight").withColumnRenamed(
 "imp_value", "weight (pound)"
)

In [None]:
joined_df_2 = pre_pro_age_df.join(pre_pro_height_df, on="name", how="left").join(
 pre_pro_weight_df, on="name", how="left"
)
joined_df_2.show()

In [None]:
joined_df_2.select("age").show()

In [None]:
joined_df_2.select(F.col("height (ft)")).show()

## Post process

In [None]:
# Same as the first case
joined_df_3 = (
 age_df.join(height_df, on="name", how="left")
 .join(weight_df, on="name", how="left")
 .toDF(
 *[
 "name",
 "age",
 "height",
 "height_imp",
 "weight",
 "weight_imp",
 ]
 )
)

joined_df_3.show()