Skip to content

Instantly share code, notes, and snippets.

@AtlasPilotPuppy
Last active July 28, 2018 10:14
Show Gist options
  • Save AtlasPilotPuppy/c2d24fe340a7f3cf76d2 to your computer and use it in GitHub Desktop.
Save AtlasPilotPuppy/c2d24fe340a7f3cf76d2 to your computer and use it in GitHub Desktop.

Revisions

  1. AtlasPilotPuppy revised this gist Oct 22, 2014. 1 changed file with 3 additions and 0 deletions.
    3 changes: 3 additions & 0 deletions SparkSqlIntro.py
    Original file line number Diff line number Diff line change
    @@ -33,3 +33,6 @@
    INNER JOIN daily_prices prices ON(div.stock_symbol=prices.stock_symbol AND div.date=prices.date)
    GROUP BY div.stock_symbol LIMIT 10''').collect()

    join_group_agg = sqlContext.sql('''SELECT div.stock_symbol, max(prices.stock_price_close), min(prices.stock_price_close), avg(prices.stock_price_close) FROM dividends div
    INNER JOIN daily_prices prices ON(div.stock_symbol=prices.stock_symbol AND div.date=prices.date)
    GROUP BY div.stock_symbol LIMIT 10''').collect()
  2. AtlasPilotPuppy revised this gist Oct 22, 2014. 1 changed file with 6 additions and 2 deletions.
    8 changes: 6 additions & 2 deletions SparkSqlIntro.py
    Original file line number Diff line number Diff line change
    @@ -25,7 +25,11 @@
    daily_prices_schema.registerAsTable('daily_prices')
    result = sqlContext.sql('SELECT * FROM daily_prices LIMIT 10').collect()

    join = sqlContext.sql('SELECT div.exchange, div.stock_symbol, div.date, div.dividends, prices.stock_price_adj_close, prices.stock_price_close FROM dividends div INNER JOIN daily_prices prices ON(div.stock_symbol=prices.stock_symbol AND div.date=prices.date) LIMIT 10').collect()
    join = sqlContext.sql('''SELECT div.exchange, div.stock_symbol, div.date, div.dividends, prices.stock_price_adj_close,
    prices.stock_price_close FROM dividends div INNER JOIN daily_prices prices
    ON(div.stock_symbol=prices.stock_symbol AND div.date=prices.date) LIMIT 10''').collect()

    join_group = sqlContext.sql('SELECT div.stock_symbol, max(prices.stock_price_close) as max_close FROM dividends div INNER JOIN daily_prices prices ON(div.stock_symbol=prices.stock_symbol AND div.date=prices.date) GROUP BY div.stock_symbol LIMIT 10').collect()
    join_group = sqlContext.sql('''SELECT div.stock_symbol, max(prices.stock_price_close) as max_close FROM dividends div
    INNER JOIN daily_prices prices ON(div.stock_symbol=prices.stock_symbol AND div.date=prices.date)
    GROUP BY div.stock_symbol LIMIT 10''').collect()

  3. AtlasPilotPuppy revised this gist Oct 22, 2014. 1 changed file with 2 additions and 1 deletion.
    3 changes: 2 additions & 1 deletion SparkSqlIntro.py
    Original file line number Diff line number Diff line change
    @@ -27,4 +27,5 @@

    join = sqlContext.sql('SELECT div.exchange, div.stock_symbol, div.date, div.dividends, prices.stock_price_adj_close, prices.stock_price_close FROM dividends div INNER JOIN daily_prices prices ON(div.stock_symbol=prices.stock_symbol AND div.date=prices.date) LIMIT 10').collect()

    join_group = sqlContext.sql('SELECT div.stock_symbol, count(prices.stock_price_close) FROM dividends div INNER JOIN daily_prices prices ON(div.stock_symbol=prices.stock_symbol AND div.date=prices.date) GROUP BY div.stock_symbol LIMIT 10').collect()
    join_group = sqlContext.sql('SELECT div.stock_symbol, max(prices.stock_price_close) as max_close FROM dividends div INNER JOIN daily_prices prices ON(div.stock_symbol=prices.stock_symbol AND div.date=prices.date) GROUP BY div.stock_symbol LIMIT 10').collect()

  4. AtlasPilotPuppy revised this gist Oct 22, 2014. 1 changed file with 5 additions and 1 deletion.
    6 changes: 5 additions & 1 deletion SparkSqlIntro.py
    Original file line number Diff line number Diff line change
    @@ -19,8 +19,12 @@
    lambda row: {columns[0]: row[0], columns[1]: row[1], columns[2]: row[2],
    columns[3]: float(row[3]), columns[4]: float(row[4]), columns[5]: float(row[5]),
    columns[6]: float(row[6]), columns[7]: float(row[7]), columns[8]: float(row[8])})
    daily_prices_schema = sqlContext.inferSchema(daily_prices_parsed)

    daily_prices_schema = sqlContext.inferSchema(daily_prices_parsed)
    daily_prices_schema.printSchema()
    daily_prices_schema.registerAsTable('daily_prices')
    result = sqlContext.sql('SELECT * FROM daily_prices LIMIT 10').collect()

    join = sqlContext.sql('SELECT div.exchange, div.stock_symbol, div.date, div.dividends, prices.stock_price_adj_close, prices.stock_price_close FROM dividends div INNER JOIN daily_prices prices ON(div.stock_symbol=prices.stock_symbol AND div.date=prices.date) LIMIT 10').collect()

    join_group = sqlContext.sql('SELECT div.stock_symbol, count(prices.stock_price_close) FROM dividends div INNER JOIN daily_prices prices ON(div.stock_symbol=prices.stock_symbol AND div.date=prices.date) GROUP BY div.stock_symbol LIMIT 10').collect()
  5. AtlasPilotPuppy revised this gist Oct 22, 2014. 1 changed file with 4 additions and 1 deletion.
    5 changes: 4 additions & 1 deletion SparkSqlIntro.py
    Original file line number Diff line number Diff line change
    @@ -16,8 +16,11 @@
    daily_prices = sc.textFile("hdfs://master:9000/user/hdfs/NYSE_daily_prices_A.csv")
    columns = daily_prices.take(1)[0].split(',')
    daily_prices_parsed = daily_prices.filter(lambda r: not r.startswith('exchange')).map(lambda r: r.split(',')).map(
    lambda row: dict(zip(columns, row)))
    lambda row: {columns[0]: row[0], columns[1]: row[1], columns[2]: row[2],
    columns[3]: float(row[3]), columns[4]: float(row[4]), columns[5]: float(row[5]),
    columns[6]: float(row[6]), columns[7]: float(row[7]), columns[8]: float(row[8])})
    daily_prices_schema = sqlContext.inferSchema(daily_prices_parsed)

    daily_prices_schema.printSchema()
    daily_prices_schema.registerAsTable('daily_prices')
    result = sqlContext.sql('SELECT * FROM daily_prices LIMIT 10').collect()
  6. AtlasPilotPuppy revised this gist Oct 22, 2014. 1 changed file with 12 additions and 2 deletions.
    14 changes: 12 additions & 2 deletions SparkSqlIntro.py
    Original file line number Diff line number Diff line change
    @@ -3,11 +3,21 @@
    from pyspark.sql import Row, StructField, StructType, StringType, IntegerType

    sc = SparkContext('spark://master:7077', 'Spark SQL Intro')

    sqlContext = SQLContext(sc)
    dividends = sc.textFile("hdfs://master:9000/user/hdfs/NYSE_dividends_A.csv")
    dividends_parsed = dividends.filter(lambda r: not r.startswith('exchange')).map(lambda r: r.split(',')).map(lambda row: {'exchange': row[0], 'stock_symbol': row[1], 'date': row[2], 'dividends': float(row[3])})
    dividends_parsed = dividends.filter(lambda r: not r.startswith('exchange')).map(lambda r: r.split(',')).map(
    lambda row: {'exchange': row[0], 'stock_symbol': row[1], 'date': row[2], 'dividends': float(row[3])})

    dividends_schema = sqlContext.inferSchema(dividends_parsed)
    dividends_schema.printSchema()
    dividends_schema.registerAsTable('dividends')
    result = sqlContext.sql('SELECT * from dividends LIMIT 10').collect()

    daily_prices = sc.textFile("hdfs://master:9000/user/hdfs/NYSE_daily_prices_A.csv")
    columns = daily_prices.take(1)[0].split(',')
    daily_prices_parsed = daily_prices.filter(lambda r: not r.startswith('exchange')).map(lambda r: r.split(',')).map(
    lambda row: dict(zip(columns, row)))
    daily_prices_schema = sqlContext.inferSchema(daily_prices_parsed)
    daily_prices_schema.printSchema()
    daily_prices_schema.registerAsTable('daily_prices')
    result = sqlContext.sql('SELECT * FROM daily_prices LIMIT 10').collect()
  7. AtlasPilotPuppy created this gist Oct 22, 2014.
    13 changes: 13 additions & 0 deletions SparkSqlIntro.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,13 @@
    from pyspark import SparkContext
    from pyspark.sql import SQLContext
    from pyspark.sql import Row, StructField, StructType, StringType, IntegerType

    sc = SparkContext('spark://master:7077', 'Spark SQL Intro')

    dividends = sc.textFile("hdfs://master:9000/user/hdfs/NYSE_dividends_A.csv")
    dividends_parsed = dividends.filter(lambda r: not r.startswith('exchange')).map(lambda r: r.split(',')).map(lambda row: {'exchange': row[0], 'stock_symbol': row[1], 'date': row[2], 'dividends': float(row[3])})

    dividends_schema = sqlContext.inferSchema(dividends_parsed)
    dividends_schema.printSchema()
    dividends_schema.registerAsTable('dividends')