-
-
Save msampathkumar/52eb290a31b98c687237aa141be1e26b to your computer and use it in GitHub Desktop.
Cheat sheet for Spark Dataframes (using Python)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # misc import statements | |
| from pyspark.sql import SQLContext | |
| from pyspark.sql.types import * | |
| from pyspark.sql.functions import * | |
| from pprint import pprint as pp | |
| ## creating dataframes | |
| df = sqlContext.createDataFrame([(1, 4), (2, 5), (3, 6)], ["A", "B"]) # from manual data | |
| df = sqlContext.read.format('com.databricks.spark.csv') \ | |
| .options(delimiter=';',header='true', inferschema='true',mode="FAILFAST") \ | |
| .load('csv_file_name_or_*_reference') | |
| # adding columns and keeping existing ones | |
| df.withColumn('zero', F.lit(0)) | |
| df.withColumn('A_times_two', df.A * 2) | |
| # selecting columns, and creating new ones | |
| df.select( | |
| 'A' | |
| , 'B' | |
| , col('A').alias('new_name_for_A') # col is nice for referring to columns for alias etc without having to repeat the dataframe name | |
| , ( col('B') > 0 ).alias('is_B_greater_than_zero') | |
| , unix_timestamp('A','dd.MM.yyyy HH:mm:ss').alias('A_in_unix_time') # convert to unix time from text | |
| ) | |
| # filtering | |
| df.filter('A_in_unix_time > 946684800') | |
| # pivoting | |
| unpivoted.groupBy('A','B').pivot('C').agg(first('D')).orderBy(['A','B']) # first could be any aggregate function | |
| # inspecting dataframes | |
| display(df) # table in notebook at least | |
| df.show() # text table | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # aggregate functions | |
| approxCountDistinct, avg, count, countDistinct, first, last, max, mean, min, sum, sumDistinct | |
| # window functions | |
| cumeDist, denseRank, lag, lead, ntile, percentRank, rank, rowNumber | |
| # string functions | |
| ascii, base64, concat, concat_ws, decode, encode, format_number, format_string, get_json_object, initcap, instr, length, levenshtein, locate, lower, lpad, ltrim, printf, regexp_extract, regexp_replace, repeat, reverse, rpad, rtrim, soundex, space, split, substring, substring_index, translate, trim, unbase64, upper | |
| # misc functions | |
| array, bitwiseNOT, callUDF, coalesce, crc32, greatest, if, inputFileName, isNaN, isnotnull, isnull, least, lit, md5, monotonicallyIncreasingId, nanvl, negate, not, rand, randn, sha, sha1, sparkPartitionId, struct, when | |
| # datetime | |
| current_date, current_timestamp, trunc, date_format | |
| datediff, date_add, date_sub, add_months, last_day, next_day, months_between | |
| year, month, dayofmonth, hour, minute, second | |
| unix_timestamp, from_unixtime, to_date, quarter, day, dayofyear, weekofyear, from_utc_timestamp, to_utc_timestamp |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Create a simple DataFrame. | |
| data = [ | |
| ("2015-01-01 23:59:59", "2015-01-02 00:01:02", 1), | |
| ("2015-01-02 23:00:00", "2015-01-02 23:59:59", 2), | |
| ("2015-01-02 22:59:58", "2015-01-02 23:59:59", 3)] | |
| df = sqlContext.createDataFrame(data, ["start_time", "end_time", "id"]) | |
| df = df.select( | |
| df.start_time.cast("timestamp").alias("start_time"), | |
| df.end_time.cast("timestamp").alias("end_time"), | |
| df.id) | |
| # Get all records that have a start_time and end_time in the | |
| # same day, and the difference between the end_time and start_time | |
| # is less or equal to 1 hour. | |
| condition = \ | |
| (to_date(df.start_time) == to_date(df.end_time)) & \ | |
| (df.start_time + expr("INTERVAL 1 HOUR") >= df.end_time) | |
| df.filter(condition).show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment