Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save msampathkumar/52eb290a31b98c687237aa141be1e26b to your computer and use it in GitHub Desktop.
Save msampathkumar/52eb290a31b98c687237aa141be1e26b to your computer and use it in GitHub Desktop.

Revisions

  1. @evenv evenv revised this gist Mar 22, 2016. 1 changed file with 21 additions and 2 deletions.
    23 changes: 21 additions & 2 deletions Spark Dataframe Cheat Sheet.py
    Original file line number Diff line number Diff line change
    @@ -27,23 +27,42 @@
    # filtering
    df.filter('A_in_unix_time > 946684800')

    # grouping and aggregating
    df.groupBy("A").agg(
    first("B").alias("my first")
    , last("B").alias("my last")
    , sum("B").alias("my everything")
    )

    # pivoting
    unpivoted.groupBy('A','B').pivot('C').agg(first('D')).orderBy(['A','B']) # first could be any aggregate function
    df.groupBy('A','B').pivot('C').agg(first('D')).orderBy(['A','B']) # first could be any aggregate function

    # inspecting dataframes
    display(df) # table in notebook at least
    df.show() # text table

    ######################################### Date time manipulation ################################
    # Casting to timestamp from string with format 2015-01-01 23:59:59
    df = df.select( df.start_time.cast("timestamp").alias("start_time") )
    df.select( df.start_time.cast("timestamp").alias("start_time") )

    # Get all records that have a start_time and end_time in the same day, and the difference between the end_time and start_time is less or equal to 1 hour.
    condition = \
    (to_date(df.start_time) == to_date(df.end_time)) & \
    (df.start_time + expr("INTERVAL 1 HOUR") >= df.end_time)
    df.filter(condition).show()

    ############### WRITING TO AMAZON REDSHIFT ###############

    REDSHIFT_JDBC_URL = "jdbc:redshift://%s:5439/%s" % (REDSHIFT_SERVER,DATABASE)

    df.write \
    .format("com.databricks.spark.redshift") \
    .option("url", REDSHIFT_JDBC_URL) \
    .option("dbtable", TABLE_NAME) \
    .option("tempdir", "s3n://%s:%s@%s" % (ACCESS_KEY,SECRET, S3_BUCKET_PATH)) \
    .mode("overwrite") \
    .save()


    ######################### REFERENCE #########################

  2. @evenv evenv revised this gist Mar 22, 2016. 4 changed files with 69 additions and 72 deletions.
    36 changes: 0 additions & 36 deletions -- Basic Manipulation.py
    Original file line number Diff line number Diff line change
    @@ -1,36 +0,0 @@
    # misc import statements
    from pyspark.sql import SQLContext
    from pyspark.sql.types import *
    from pyspark.sql.functions import *
    from pprint import pprint as pp

    ## creating dataframes

    df = sqlContext.createDataFrame([(1, 4), (2, 5), (3, 6)], ["A", "B"]) # from manual data
    df = sqlContext.read.format('com.databricks.spark.csv') \
    .options(delimiter=';',header='true', inferschema='true',mode="FAILFAST") \
    .load('csv_file_name_or_*_reference')

    # adding columns and keeping existing ones
    df.withColumn('zero', F.lit(0))
    df.withColumn('A_times_two', df.A * 2)

    # selecting columns, and creating new ones
    df.select(
    'A'
    , 'B'
    , col('A').alias('new_name_for_A') # col is nice for referring to columns for alias etc without having to repeat the dataframe name
    , ( col('B') > 0 ).alias('is_B_greater_than_zero')
    , unix_timestamp('A','dd.MM.yyyy HH:mm:ss').alias('A_in_unix_time') # convert to unix time from text
    )

    # filtering
    df.filter('A_in_unix_time > 946684800')

    # pivoting
    unpivoted.groupBy('A','B').pivot('C').agg(first('D')).orderBy(['A','B']) # first could be any aggregate function

    # inspecting dataframes
    display(df) # table in notebook at least
    df.show() # text table

    17 changes: 0 additions & 17 deletions 1 - functions.py
    Original file line number Diff line number Diff line change
    @@ -1,17 +0,0 @@
    # aggregate functions
    approxCountDistinct, avg, count, countDistinct, first, last, max, mean, min, sum, sumDistinct

    # window functions
    cumeDist, denseRank, lag, lead, ntile, percentRank, rank, rowNumber

    # string functions
    ascii, base64, concat, concat_ws, decode, encode, format_number, format_string, get_json_object, initcap, instr, length, levenshtein, locate, lower, lpad, ltrim, printf, regexp_extract, regexp_replace, repeat, reverse, rpad, rtrim, soundex, space, split, substring, substring_index, translate, trim, unbase64, upper

    # misc functions
    array, bitwiseNOT, callUDF, coalesce, crc32, greatest, if, inputFileName, isNaN, isnotnull, isnull, least, lit, md5, monotonicallyIncreasingId, nanvl, negate, not, rand, randn, sha, sha1, sparkPartitionId, struct, when

    # datetime
    current_date, current_timestamp, trunc, date_format
    datediff, date_add, date_sub, add_months, last_day, next_day, months_between
    year, month, dayofmonth, hour, minute, second
    unix_timestamp, from_unixtime, to_date, quarter, day, dayofyear, weekofyear, from_utc_timestamp, to_utc_timestamp
    69 changes: 69 additions & 0 deletions Spark Dataframe Cheat Sheet.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,69 @@
    # A simple cheat sheet of Spark Dataframe syntax
    # Current for Spark 1.6.1

    # import statements
    from pyspark.sql import SQLContext
    from pyspark.sql.types import *
    from pyspark.sql.functions import *

    #creating dataframes
    df = sqlContext.createDataFrame([(1, 4), (2, 5), (3, 6)], ["A", "B"]) # from manual data
    df = sqlContext.read.format('com.databricks.spark.csv') \
    .options(delimiter=';',header='true', inferschema='true',mode="FAILFAST") \
    .load('csv_file_name_or_*_reference')

    # adding columns and keeping existing ones
    df.withColumn('zero', F.lit(0))
    df.withColumn('A_times_two', df.A * 2)

    # selecting columns, and creating new ones
    df.select(
    'A' # most of the time it's sufficient to just use the column name
    , col('A').alias('new_name_for_A') # in other cases the col method is nice for referring to columnswithout having to repeat the dataframe name
    , ( col('B') > 0 ).alias('is_B_greater_than_zero')
    , unix_timestamp('A','dd.MM.yyyy HH:mm:ss').alias('A_in_unix_time') # convert to unix time from text
    )

    # filtering
    df.filter('A_in_unix_time > 946684800')

    # pivoting
    unpivoted.groupBy('A','B').pivot('C').agg(first('D')).orderBy(['A','B']) # first could be any aggregate function

    # inspecting dataframes
    display(df) # table in notebook at least
    df.show() # text table

    ######################################### Date time manipulation ################################
    # Casting to timestamp from string with format 2015-01-01 23:59:59
    df = df.select( df.start_time.cast("timestamp").alias("start_time") )

    # Get all records that have a start_time and end_time in the same day, and the difference between the end_time and start_time is less or equal to 1 hour.
    condition = \
    (to_date(df.start_time) == to_date(df.end_time)) & \
    (df.start_time + expr("INTERVAL 1 HOUR") >= df.end_time)
    df.filter(condition).show()


    ######################### REFERENCE #########################

    # aggregate functions
    approxCountDistinct, avg, count, countDistinct, first, last, max, mean, min, sum, sumDistinct

    # window functions
    cumeDist, denseRank, lag, lead, ntile, percentRank, rank, rowNumber

    # string functions
    ascii, base64, concat, concat_ws, decode, encode, format_number, format_string, get_json_object, initcap, instr, length, levenshtein, locate, lower, lpad, ltrim, printf, regexp_extract, regexp_replace, repeat, reverse, rpad, rtrim, soundex, space, split, substring, substring_index, translate, trim, unbase64, upper

    # null and nan functions
    isNaN, isnotnull, isnull

    # misc functions
    array, bitwiseNOT, callUDF, coalesce, crc32, greatest, if, inputFileName, least, lit, md5, monotonicallyIncreasingId, nanvl, negate, not, rand, randn, sha, sha1, sparkPartitionId, struct, when

    # datetime
    current_date, current_timestamp, trunc, date_format
    datediff, date_add, date_sub, add_months, last_day, next_day, months_between
    year, month, dayofmonth, hour, minute, second
    unix_timestamp, from_unixtime, to_date, quarter, day, dayofyear, weekofyear, from_utc_timestamp, to_utc_timestamp
    19 changes: 0 additions & 19 deletions Timestamp manipulation - interval and casting.py
    Original file line number Diff line number Diff line change
    @@ -1,19 +0,0 @@
    # Create a simple DataFrame.
    data = [
    ("2015-01-01 23:59:59", "2015-01-02 00:01:02", 1),
    ("2015-01-02 23:00:00", "2015-01-02 23:59:59", 2),
    ("2015-01-02 22:59:58", "2015-01-02 23:59:59", 3)]
    df = sqlContext.createDataFrame(data, ["start_time", "end_time", "id"])
    df = df.select(
    df.start_time.cast("timestamp").alias("start_time"),
    df.end_time.cast("timestamp").alias("end_time"),
    df.id)

    # Get all records that have a start_time and end_time in the
    # same day, and the difference between the end_time and start_time
    # is less or equal to 1 hour.
    condition = \
    (to_date(df.start_time) == to_date(df.end_time)) & \
    (df.start_time + expr("INTERVAL 1 HOUR") >= df.end_time)

    df.filter(condition).show()
  3. @evenv evenv revised this gist Mar 22, 2016. 2 changed files with 1 addition and 1 deletion.
    File renamed without changes.
    2 changes: 1 addition & 1 deletion 1 - functions.py
    Original file line number Diff line number Diff line change
    @@ -14,4 +14,4 @@
    current_date, current_timestamp, trunc, date_format
    datediff, date_add, date_sub, add_months, last_day, next_day, months_between
    year, month, dayofmonth, hour, minute, second
    unix_timestamp, from_unixtime, to_date, quarter, day, dayofyear, weekofyear, from_utc_timestamp, to_utc_timestamp
    unix_timestamp, from_unixtime, to_date, quarter, day, dayofyear, weekofyear, from_utc_timestamp, to_utc_timestamp
  4. @evenv evenv renamed this gist Mar 22, 2016. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  5. @evenv evenv created this gist Mar 22, 2016.
    17 changes: 17 additions & 0 deletions Dataframe functions.markdown
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,17 @@
    # aggregate functions
    approxCountDistinct, avg, count, countDistinct, first, last, max, mean, min, sum, sumDistinct

    # window functions
    cumeDist, denseRank, lag, lead, ntile, percentRank, rank, rowNumber

    # string functions
    ascii, base64, concat, concat_ws, decode, encode, format_number, format_string, get_json_object, initcap, instr, length, levenshtein, locate, lower, lpad, ltrim, printf, regexp_extract, regexp_replace, repeat, reverse, rpad, rtrim, soundex, space, split, substring, substring_index, translate, trim, unbase64, upper

    # misc functions
    array, bitwiseNOT, callUDF, coalesce, crc32, greatest, if, inputFileName, isNaN, isnotnull, isnull, least, lit, md5, monotonicallyIncreasingId, nanvl, negate, not, rand, randn, sha, sha1, sparkPartitionId, struct, when

    # datetime
    current_date, current_timestamp, trunc, date_format
    datediff, date_add, date_sub, add_months, last_day, next_day, months_between
    year, month, dayofmonth, hour, minute, second
    unix_timestamp, from_unixtime, to_date, quarter, day, dayofyear, weekofyear, from_utc_timestamp, to_utc_timestamp
    36 changes: 36 additions & 0 deletions Spark Dataframe Tricks.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,36 @@
    # misc import statements
    from pyspark.sql import SQLContext
    from pyspark.sql.types import *
    from pyspark.sql.functions import *
    from pprint import pprint as pp

    ## creating dataframes

    df = sqlContext.createDataFrame([(1, 4), (2, 5), (3, 6)], ["A", "B"]) # from manual data
    df = sqlContext.read.format('com.databricks.spark.csv') \
    .options(delimiter=';',header='true', inferschema='true',mode="FAILFAST") \
    .load('csv_file_name_or_*_reference')

    # adding columns and keeping existing ones
    df.withColumn('zero', F.lit(0))
    df.withColumn('A_times_two', df.A * 2)

    # selecting columns, and creating new ones
    df.select(
    'A'
    , 'B'
    , col('A').alias('new_name_for_A') # col is nice for referring to columns for alias etc without having to repeat the dataframe name
    , ( col('B') > 0 ).alias('is_B_greater_than_zero')
    , unix_timestamp('A','dd.MM.yyyy HH:mm:ss').alias('A_in_unix_time') # convert to unix time from text
    )

    # filtering
    df.filter('A_in_unix_time > 946684800')

    # pivoting
    unpivoted.groupBy('A','B').pivot('C').agg(first('D')).orderBy(['A','B']) # first could be any aggregate function

    # inspecting dataframes
    display(df) # table in notebook at least
    df.show() # text table

    19 changes: 19 additions & 0 deletions Timestamp manipulation - interval and casting.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,19 @@
    # Create a simple DataFrame.
    data = [
    ("2015-01-01 23:59:59", "2015-01-02 00:01:02", 1),
    ("2015-01-02 23:00:00", "2015-01-02 23:59:59", 2),
    ("2015-01-02 22:59:58", "2015-01-02 23:59:59", 3)]
    df = sqlContext.createDataFrame(data, ["start_time", "end_time", "id"])
    df = df.select(
    df.start_time.cast("timestamp").alias("start_time"),
    df.end_time.cast("timestamp").alias("end_time"),
    df.id)

    # Get all records that have a start_time and end_time in the
    # same day, and the difference between the end_time and start_time
    # is less or equal to 1 hour.
    condition = \
    (to_date(df.start_time) == to_date(df.end_time)) & \
    (df.start_time + expr("INTERVAL 1 HOUR") >= df.end_time)

    df.filter(condition).show()