Skip to content

Instantly share code, notes, and snippets.

@pdmack
Last active December 20, 2017 00:04
Show Gist options
  • Select an option

  • Save pdmack/f0a6e1e02be098ad5d3a4b47e28ab0ba to your computer and use it in GitHub Desktop.

Select an option

Save pdmack/f0a6e1e02be098ad5d3a4b47e28ab0ba to your computer and use it in GitHub Desktop.

Revisions

  1. Pete MacKinnon revised this gist Dec 20, 2017. 1 changed file with 13 additions and 2 deletions.
    15 changes: 13 additions & 2 deletions spongodb.py
    Original file line number Diff line number Diff line change
    @@ -16,7 +16,12 @@
    # limitations under the License.
    #

    import sys
    import pyspark
    from pyspark.sql import SparkSession
    from pyspark.sql import SQLContext

    mongo_jars = ["/root/spark2.2/mongo-spark-connector_2.11-2.2.1.jar","/root/spark2.2/mongo-java-driver-3.5.0.jar"]

    spark = SparkSession \
    .builder \
    @@ -25,12 +30,18 @@
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.coll") \
    .getOrCreate()

    # get the SparkContext singleton from the JVM (not the pyspark API)
    context = spark._jvm.org.apache.spark.SparkContext.getOrCreate()
    # get the MutableURLClassLoader from the JVM
    loader = spark._jvm.Thread.currentThread().getContextClassLoader()
    url = spark._jvm.java.net.URL("file:/root/spark2.2/mongo-spark-connector_2.11-2.2.1.jar")

    # load jars for our driver AND executors
    url = spark._jvm.java.net.URL("file:"+mongo_jars[0])
    loader.addURL(url)
    url = spark._jvm.java.net.URL("file:/root/spark2.2/mongo-java-driver-3.5.0.jar")
    context.addJar(mongo_jars[0])
    url = spark._jvm.java.net.URL("file:"+mongo_jars[1])
    loader.addURL(url)
    context.addJar(mongo_jars[1])
    urls = loader.getURLs()
    for p in urls:
    print(p)
  2. Pete MacKinnon revised this gist Dec 5, 2017. 1 changed file with 0 additions and 2 deletions.
    2 changes: 0 additions & 2 deletions spongodb.py
    Original file line number Diff line number Diff line change
    @@ -16,9 +16,7 @@
    # limitations under the License.
    #

    import pyspark
    from pyspark.sql import SparkSession
    from pyspark.sql import SQLContext

    spark = SparkSession \
    .builder \
  3. Pete MacKinnon created this gist Dec 5, 2017.
    58 changes: 58 additions & 0 deletions spongodb.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,58 @@
    from __future__ import print_function
    #
    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements. See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License. You may obtain a copy of the License at
    #
    # http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    #

    import pyspark
    from pyspark.sql import SparkSession
    from pyspark.sql import SQLContext

    spark = SparkSession \
    .builder \
    .appName("PySpark dynamic jar loading example") \
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.coll") \
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.coll") \
    .getOrCreate()

    # get the MutableURLClassLoader from the JVM
    loader = spark._jvm.Thread.currentThread().getContextClassLoader()
    url = spark._jvm.java.net.URL("file:/root/spark2.2/mongo-spark-connector_2.11-2.2.1.jar")
    loader.addURL(url)
    url = spark._jvm.java.net.URL("file:/root/spark2.2/mongo-java-driver-3.5.0.jar")
    loader.addURL(url)
    urls = loader.getURLs()
    for p in urls:
    print(p)

    logger = spark._jvm.org.apache.log4j
    logger.LogManager.getRootLogger().setLevel(logger.Level.FATAL)

    # Save some data
    characters = spark.createDataFrame([("Bilbo Baggins", 50), ("Gandalf", 1000), ("Thorin", 195), ("Balin", 178), ("Kili", 77), ("Dwalin", 169), ("Oin", 167), ("Gloin", 158), ("Fili", 82), ("Bombur", None)], ["name", "age"])
    characters.write.format("com.mongodb.spark.sql").mode("overwrite").save()

    # print the schema
    print("Schema:")
    characters.printSchema()

    # read from MongoDB collection
    df = spark.read.format("com.mongodb.spark.sql").load()

    # SQL
    df.registerTempTable("temp")
    centenarians = spark.sql("SELECT name, age FROM temp WHERE age >= 100")
    print("Centenarians:")
    centenarians.show()