-
-
Save msampathkumar/52eb290a31b98c687237aa141be1e26b to your computer and use it in GitHub Desktop.
Revisions
-
evenv revised this gist
Mar 22, 2016 . 1 changed file with 21 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -27,23 +27,42 @@ # filtering df.filter('A_in_unix_time > 946684800') # grouping and aggregating df.groupBy("A").agg( first("B").alias("my first") , last("B").alias("my last") , sum("B").alias("my everything") ) # pivoting df.groupBy('A','B').pivot('C').agg(first('D')).orderBy(['A','B']) # first could be any aggregate function # inspecting dataframes display(df) # table in notebook at least df.show() # text table ######################################### Date time manipulation ################################ # Casting to timestamp from string with format 2015-01-01 23:59:59 df.select( df.start_time.cast("timestamp").alias("start_time") ) # Get all records that have a start_time and end_time in the same day, and the difference between the end_time and start_time is less or equal to 1 hour. condition = \ (to_date(df.start_time) == to_date(df.end_time)) & \ (df.start_time + expr("INTERVAL 1 HOUR") >= df.end_time) df.filter(condition).show() ############### WRITING TO AMAZON REDSHIFT ############### REDSHIFT_JDBC_URL = "jdbc:redshift://%s:5439/%s" % (REDSHIFT_SERVER,DATABASE) df.write \ .format("com.databricks.spark.redshift") \ .option("url", REDSHIFT_JDBC_URL) \ .option("dbtable", TABLE_NAME) \ .option("tempdir", "s3n://%s:%s@%s" % (ACCESS_KEY,SECRET, S3_BUCKET_PATH)) \ .mode("overwrite") \ .save() ######################### REFERENCE ######################### -
evenv revised this gist
Mar 22, 2016 . 4 changed files with 69 additions and 72 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,36 +0,0 @@ This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,17 +0,0 @@ This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,69 @@ # A simple cheat sheet of Spark Dataframe syntax # Current for Spark 1.6.1 # import statements from pyspark.sql import SQLContext from pyspark.sql.types import * from pyspark.sql.functions import * #creating dataframes df = sqlContext.createDataFrame([(1, 4), (2, 5), (3, 6)], ["A", "B"]) # from manual data df = sqlContext.read.format('com.databricks.spark.csv') \ .options(delimiter=';',header='true', inferschema='true',mode="FAILFAST") \ .load('csv_file_name_or_*_reference') # adding columns and keeping existing ones df.withColumn('zero', F.lit(0)) df.withColumn('A_times_two', df.A * 2) # selecting columns, and creating new ones df.select( 'A' # most of the time it's sufficient to just use the column name , col('A').alias('new_name_for_A') # in other cases the col method is nice for referring to columnswithout having to repeat the dataframe name , ( col('B') > 0 ).alias('is_B_greater_than_zero') , unix_timestamp('A','dd.MM.yyyy HH:mm:ss').alias('A_in_unix_time') # convert to unix time from text ) # filtering df.filter('A_in_unix_time > 946684800') # pivoting unpivoted.groupBy('A','B').pivot('C').agg(first('D')).orderBy(['A','B']) # first could be any aggregate function # inspecting dataframes display(df) # table in notebook at least df.show() # text table ######################################### Date time manipulation ################################ # Casting to timestamp from string with format 2015-01-01 23:59:59 df = df.select( df.start_time.cast("timestamp").alias("start_time") ) # Get all records that have a start_time and end_time in the same day, and the difference between the end_time and start_time is less or equal to 1 hour. condition = \ (to_date(df.start_time) == to_date(df.end_time)) & \ (df.start_time + expr("INTERVAL 1 HOUR") >= df.end_time) df.filter(condition).show() ######################### REFERENCE ######################### # aggregate functions approxCountDistinct, avg, count, countDistinct, first, last, max, mean, min, sum, sumDistinct # window functions cumeDist, denseRank, lag, lead, ntile, percentRank, rank, rowNumber # string functions ascii, base64, concat, concat_ws, decode, encode, format_number, format_string, get_json_object, initcap, instr, length, levenshtein, locate, lower, lpad, ltrim, printf, regexp_extract, regexp_replace, repeat, reverse, rpad, rtrim, soundex, space, split, substring, substring_index, translate, trim, unbase64, upper # null and nan functions isNaN, isnotnull, isnull # misc functions array, bitwiseNOT, callUDF, coalesce, crc32, greatest, if, inputFileName, least, lit, md5, monotonicallyIncreasingId, nanvl, negate, not, rand, randn, sha, sha1, sparkPartitionId, struct, when # datetime current_date, current_timestamp, trunc, date_format datediff, date_add, date_sub, add_months, last_day, next_day, months_between year, month, dayofmonth, hour, minute, second unix_timestamp, from_unixtime, to_date, quarter, day, dayofyear, weekofyear, from_utc_timestamp, to_utc_timestamp This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,19 +0,0 @@ -
evenv revised this gist
Mar 22, 2016 . 2 changed files with 1 addition and 1 deletion.There are no files selected for viewing
File renamed without changes.This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -14,4 +14,4 @@ current_date, current_timestamp, trunc, date_format datediff, date_add, date_sub, add_months, last_day, next_day, months_between year, month, dayofmonth, hour, minute, second unix_timestamp, from_unixtime, to_date, quarter, day, dayofyear, weekofyear, from_utc_timestamp, to_utc_timestamp -
evenv renamed this gist
Mar 22, 2016 . 1 changed file with 0 additions and 0 deletions.There are no files selected for viewing
File renamed without changes. -
evenv created this gist
Mar 22, 2016 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,17 @@ # aggregate functions approxCountDistinct, avg, count, countDistinct, first, last, max, mean, min, sum, sumDistinct # window functions cumeDist, denseRank, lag, lead, ntile, percentRank, rank, rowNumber # string functions ascii, base64, concat, concat_ws, decode, encode, format_number, format_string, get_json_object, initcap, instr, length, levenshtein, locate, lower, lpad, ltrim, printf, regexp_extract, regexp_replace, repeat, reverse, rpad, rtrim, soundex, space, split, substring, substring_index, translate, trim, unbase64, upper # misc functions array, bitwiseNOT, callUDF, coalesce, crc32, greatest, if, inputFileName, isNaN, isnotnull, isnull, least, lit, md5, monotonicallyIncreasingId, nanvl, negate, not, rand, randn, sha, sha1, sparkPartitionId, struct, when # datetime current_date, current_timestamp, trunc, date_format datediff, date_add, date_sub, add_months, last_day, next_day, months_between year, month, dayofmonth, hour, minute, second unix_timestamp, from_unixtime, to_date, quarter, day, dayofyear, weekofyear, from_utc_timestamp, to_utc_timestamp This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,36 @@ # misc import statements from pyspark.sql import SQLContext from pyspark.sql.types import * from pyspark.sql.functions import * from pprint import pprint as pp ## creating dataframes df = sqlContext.createDataFrame([(1, 4), (2, 5), (3, 6)], ["A", "B"]) # from manual data df = sqlContext.read.format('com.databricks.spark.csv') \ .options(delimiter=';',header='true', inferschema='true',mode="FAILFAST") \ .load('csv_file_name_or_*_reference') # adding columns and keeping existing ones df.withColumn('zero', F.lit(0)) df.withColumn('A_times_two', df.A * 2) # selecting columns, and creating new ones df.select( 'A' , 'B' , col('A').alias('new_name_for_A') # col is nice for referring to columns for alias etc without having to repeat the dataframe name , ( col('B') > 0 ).alias('is_B_greater_than_zero') , unix_timestamp('A','dd.MM.yyyy HH:mm:ss').alias('A_in_unix_time') # convert to unix time from text ) # filtering df.filter('A_in_unix_time > 946684800') # pivoting unpivoted.groupBy('A','B').pivot('C').agg(first('D')).orderBy(['A','B']) # first could be any aggregate function # inspecting dataframes display(df) # table in notebook at least df.show() # text table This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,19 @@ # Create a simple DataFrame. data = [ ("2015-01-01 23:59:59", "2015-01-02 00:01:02", 1), ("2015-01-02 23:00:00", "2015-01-02 23:59:59", 2), ("2015-01-02 22:59:58", "2015-01-02 23:59:59", 3)] df = sqlContext.createDataFrame(data, ["start_time", "end_time", "id"]) df = df.select( df.start_time.cast("timestamp").alias("start_time"), df.end_time.cast("timestamp").alias("end_time"), df.id) # Get all records that have a start_time and end_time in the # same day, and the difference between the end_time and start_time # is less or equal to 1 hour. condition = \ (to_date(df.start_time) == to_date(df.end_time)) & \ (df.start_time + expr("INTERVAL 1 HOUR") >= df.end_time) df.filter(condition).show()