Skip to content

Instantly share code, notes, and snippets.

@kenhui521
Forked from dapangmao/blog.md
Last active August 29, 2015 14:23
Show Gist options
  • Select an option

  • Save kenhui521/581fce32b62db90caae3 to your computer and use it in GitHub Desktop.

Select an option

Save kenhui521/581fce32b62db90caae3 to your computer and use it in GitHub Desktop.

Revisions

  1. @dapangmao dapangmao revised this gist Mar 18, 2015. 1 changed file with 3 additions and 2 deletions.
    5 changes: 3 additions & 2 deletions blog.md
    Original file line number Diff line number Diff line change
    @@ -6,8 +6,9 @@ import os

    rdd = sc.textFile('C:/Users/chao.huang.ctr/spark-playground//class.txt')
    def transform(x):
    y = x.split()
    return str(y[0]), str(y[1]), int(y[2]), float(y[3]), float(y[4])
    args = x.split()
    funcs = [str, str, int, float, float]
    return [z(y) for z, y in zip(funcs, args)]

    varnames = Row("name", "sex", "age", "height", "weight")
    df = rdd.map(transform).map(lambda x: varnames(*x)).toDF()
  2. @dapangmao dapangmao revised this gist Mar 18, 2015. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion blog.md
    Original file line number Diff line number Diff line change
    @@ -4,7 +4,7 @@
    from pyspark.sql import Row
    import os

    rdd = sc.textFile("C:/Users/chao.huang.ctr/spark-playground//class.txt')
    rdd = sc.textFile('C:/Users/chao.huang.ctr/spark-playground//class.txt')
    def transform(x):
    y = x.split()
    return str(y[0]), str(y[1]), int(y[2]), float(y[3]), float(y[4])
  3. @dapangmao dapangmao revised this gist Mar 18, 2015. 1 changed file with 1 addition and 7 deletions.
    8 changes: 1 addition & 7 deletions blog.md
    Original file line number Diff line number Diff line change
    @@ -4,18 +4,12 @@
    from pyspark.sql import Row
    import os

    current_path = os.getcwd()

    rdd = sc.textFile("current_path" + '//class.txt')

    rdd = sc.textFile("C:/Users/chao.huang.ctr/spark-playground//class.txt')
    def transform(x):
    y = x.split()
    return str(y[0]), str(y[1]), int(y[2]), float(y[3]), float(y[4])


    varnames = Row("name", "sex", "age", "height", "weight")


    df = rdd.map(transform).map(lambda x: varnames(*x)).toDF()

    for x in df.collect():
  4. @dapangmao dapangmao revised this gist Mar 18, 2015. 1 changed file with 40 additions and 43 deletions.
    83 changes: 40 additions & 43 deletions simple_text_classification_pipeline.md
    Original file line number Diff line number Diff line change
    @@ -1,11 +1,5 @@
    [link](https://github.com/apache/spark/blob/master/examples/src/main/python/ml/simple_text_classification_pipeline.py)
    ```python
    from pyspark import SparkContext
    from pyspark.ml import Pipeline
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml.feature import HashingTF, Tokenizer
    from pyspark.sql import Row, SQLContext


    """
    A simple text classification pipeline that recognizes "spark" from
    @@ -14,42 +8,45 @@ pipeline in Python. Run with:
    bin/spark-submit examples/src/main/python/ml/simple_text_classification_pipeline.py
    """
    from pyspark import SparkContext
    from pyspark.ml import Pipeline
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml.feature import HashingTF, Tokenizer
    from pyspark.sql import Row, SQLContext


    if __name__ == "__main__":
    sc = SparkContext(appName="SimpleTextClassificationPipeline")
    sqlCtx = SQLContext(sc)

    # Prepare training documents, which are labeled.
    LabeledDocument = Row("id", "text", "label")
    training = sc.parallelize([(0L, "a b c d e spark", 1.0),
    (1L, "b d", 0.0),
    (2L, "spark f g h", 1.0),
    (3L, "hadoop mapreduce", 0.0)]) \
    .map(lambda x: LabeledDocument(*x)).toDF()

    # Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
    tokenizer = Tokenizer(inputCol="text", outputCol="words")
    hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
    lr = LogisticRegression(maxIter=10, regParam=0.01)
    pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

    # Fit the pipeline to training documents.
    model = pipeline.fit(training)

    # Prepare test documents, which are unlabeled.
    Document = Row("id", "text")
    test = sc.parallelize([(4L, "spark i j k"),
    (5L, "l m n"),
    (6L, "mapreduce spark"),
    (7L, "apache hadoop")]) \
    .map(lambda x: Document(*x)).toDF()

    # Make predictions on test documents and print columns of interest.
    prediction = model.transform(test)
    selected = prediction.select("id", "text", "prediction")
    for row in selected.collect():
    print row

    sc.stop()
    sc = SparkContext(appName="SimpleTextClassificationPipeline")
    sqlCtx = SQLContext(sc)

    # Prepare training documents, which are labeled.
    LabeledDocument = Row("id", "text", "label")
    training = sc.parallelize([(0L, "a b c d e spark", 1.0),
    (1L, "b d", 0.0),
    (2L, "spark f g h", 1.0),
    (3L, "hadoop mapreduce", 0.0)]) \
    .map(lambda x: LabeledDocument(*x)).toDF()

    # Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
    tokenizer = Tokenizer(inputCol="text", outputCol="words")
    hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
    lr = LogisticRegression(maxIter=10, regParam=0.01)
    pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

    # Fit the pipeline to training documents.
    model = pipeline.fit(training)

    # Prepare test documents, which are unlabeled.
    Document = Row("id", "text")
    test = sc.parallelize([(4L, "spark i j k"),
    (5L, "l m n"),
    (6L, "mapreduce spark"),
    (7L, "apache hadoop")]) \
    .map(lambda x: Document(*x)).toDF()

    # Make predictions on test documents and print columns of interest.
    prediction = model.transform(test)
    selected = prediction.select("id", "text", "prediction")
    for row in selected.collect():
    print row

    sc.stop()
    ```
  5. @dapangmao dapangmao revised this gist Mar 18, 2015. 1 changed file with 21 additions and 0 deletions.
    21 changes: 21 additions & 0 deletions blog.md
    Original file line number Diff line number Diff line change
    @@ -1,2 +1,23 @@
    ###Transform RDD to DataFrame in Spark

    ```python
    from pyspark.sql import Row
    import os

    current_path = os.getcwd()

    rdd = sc.textFile("current_path" + '//class.txt')

    def transform(x):
    y = x.split()
    return str(y[0]), str(y[1]), int(y[2]), float(y[3]), float(y[4])


    varnames = Row("name", "sex", "age", "height", "weight")


    df = rdd.map(transform).map(lambda x: varnames(*x)).toDF()

    for x in df.collect():
    print x
    ```
  6. @dapangmao dapangmao revised this gist Mar 17, 2015. 1 changed file with 2 additions and 0 deletions.
    2 changes: 2 additions & 0 deletions blog.md
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,2 @@
    ###Transform RDD to DataFrame in Spark

  7. @dapangmao dapangmao created this gist Mar 17, 2015.
    55 changes: 55 additions & 0 deletions simple_text_classification_pipeline.md
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,55 @@
    [link](https://github.com/apache/spark/blob/master/examples/src/main/python/ml/simple_text_classification_pipeline.py)
    ```python
    from pyspark import SparkContext
    from pyspark.ml import Pipeline
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml.feature import HashingTF, Tokenizer
    from pyspark.sql import Row, SQLContext


    """
    A simple text classification pipeline that recognizes "spark" from
    input text. This is to show how to create and configure a Spark ML
    pipeline in Python. Run with:
    bin/spark-submit examples/src/main/python/ml/simple_text_classification_pipeline.py
    """


    if __name__ == "__main__":
    sc = SparkContext(appName="SimpleTextClassificationPipeline")
    sqlCtx = SQLContext(sc)

    # Prepare training documents, which are labeled.
    LabeledDocument = Row("id", "text", "label")
    training = sc.parallelize([(0L, "a b c d e spark", 1.0),
    (1L, "b d", 0.0),
    (2L, "spark f g h", 1.0),
    (3L, "hadoop mapreduce", 0.0)]) \
    .map(lambda x: LabeledDocument(*x)).toDF()

    # Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
    tokenizer = Tokenizer(inputCol="text", outputCol="words")
    hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
    lr = LogisticRegression(maxIter=10, regParam=0.01)
    pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

    # Fit the pipeline to training documents.
    model = pipeline.fit(training)

    # Prepare test documents, which are unlabeled.
    Document = Row("id", "text")
    test = sc.parallelize([(4L, "spark i j k"),
    (5L, "l m n"),
    (6L, "mapreduce spark"),
    (7L, "apache hadoop")]) \
    .map(lambda x: Document(*x)).toDF()

    # Make predictions on test documents and print columns of interest.
    prediction = model.transform(test)
    selected = prediction.select("id", "text", "prediction")
    for row in selected.collect():
    print row

    sc.stop()
    ```