-
-
Save kenhui521/581fce32b62db90caae3 to your computer and use it in GitHub Desktop.
Revisions
-
dapangmao revised this gist
Mar 18, 2015 . 1 changed file with 3 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -6,8 +6,9 @@ import os rdd = sc.textFile('C:/Users/chao.huang.ctr/spark-playground//class.txt') def transform(x): args = x.split() funcs = [str, str, int, float, float] return [z(y) for z, y in zip(funcs, args)] varnames = Row("name", "sex", "age", "height", "weight") df = rdd.map(transform).map(lambda x: varnames(*x)).toDF() -
dapangmao revised this gist
Mar 18, 2015 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -4,7 +4,7 @@ from pyspark.sql import Row import os rdd = sc.textFile('C:/Users/chao.huang.ctr/spark-playground//class.txt') def transform(x): y = x.split() return str(y[0]), str(y[1]), int(y[2]), float(y[3]), float(y[4]) -
dapangmao revised this gist
Mar 18, 2015 . 1 changed file with 1 addition and 7 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -4,18 +4,12 @@ from pyspark.sql import Row import os rdd = sc.textFile("C:/Users/chao.huang.ctr/spark-playground//class.txt') def transform(x): y = x.split() return str(y[0]), str(y[1]), int(y[2]), float(y[3]), float(y[4]) varnames = Row("name", "sex", "age", "height", "weight") df = rdd.map(transform).map(lambda x: varnames(*x)).toDF() for x in df.collect(): -
dapangmao revised this gist
Mar 18, 2015 . 1 changed file with 40 additions and 43 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,11 +1,5 @@ [link](https://github.com/apache/spark/blob/master/examples/src/main/python/ml/simple_text_classification_pipeline.py) ```python """ A simple text classification pipeline that recognizes "spark" from @@ -14,42 +8,45 @@ pipeline in Python. Run with: bin/spark-submit examples/src/main/python/ml/simple_text_classification_pipeline.py """ from pyspark import SparkContext from pyspark.ml import Pipeline from pyspark.ml.classification import LogisticRegression from pyspark.ml.feature import HashingTF, Tokenizer from pyspark.sql import Row, SQLContext sc = SparkContext(appName="SimpleTextClassificationPipeline") sqlCtx = SQLContext(sc) # Prepare training documents, which are labeled. LabeledDocument = Row("id", "text", "label") training = sc.parallelize([(0L, "a b c d e spark", 1.0), (1L, "b d", 0.0), (2L, "spark f g h", 1.0), (3L, "hadoop mapreduce", 0.0)]) \ .map(lambda x: LabeledDocument(*x)).toDF() # Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr. tokenizer = Tokenizer(inputCol="text", outputCol="words") hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") lr = LogisticRegression(maxIter=10, regParam=0.01) pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) # Fit the pipeline to training documents. model = pipeline.fit(training) # Prepare test documents, which are unlabeled. Document = Row("id", "text") test = sc.parallelize([(4L, "spark i j k"), (5L, "l m n"), (6L, "mapreduce spark"), (7L, "apache hadoop")]) \ .map(lambda x: Document(*x)).toDF() # Make predictions on test documents and print columns of interest. prediction = model.transform(test) selected = prediction.select("id", "text", "prediction") for row in selected.collect(): print row sc.stop() ``` -
dapangmao revised this gist
Mar 18, 2015 . 1 changed file with 21 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,2 +1,23 @@ ###Transform RDD to DataFrame in Spark ```python from pyspark.sql import Row import os current_path = os.getcwd() rdd = sc.textFile("current_path" + '//class.txt') def transform(x): y = x.split() return str(y[0]), str(y[1]), int(y[2]), float(y[3]), float(y[4]) varnames = Row("name", "sex", "age", "height", "weight") df = rdd.map(transform).map(lambda x: varnames(*x)).toDF() for x in df.collect(): print x ``` -
dapangmao revised this gist
Mar 17, 2015 . 1 changed file with 2 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,2 @@ ###Transform RDD to DataFrame in Spark -
dapangmao created this gist
Mar 17, 2015 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,55 @@ [link](https://github.com/apache/spark/blob/master/examples/src/main/python/ml/simple_text_classification_pipeline.py) ```python from pyspark import SparkContext from pyspark.ml import Pipeline from pyspark.ml.classification import LogisticRegression from pyspark.ml.feature import HashingTF, Tokenizer from pyspark.sql import Row, SQLContext """ A simple text classification pipeline that recognizes "spark" from input text. This is to show how to create and configure a Spark ML pipeline in Python. Run with: bin/spark-submit examples/src/main/python/ml/simple_text_classification_pipeline.py """ if __name__ == "__main__": sc = SparkContext(appName="SimpleTextClassificationPipeline") sqlCtx = SQLContext(sc) # Prepare training documents, which are labeled. LabeledDocument = Row("id", "text", "label") training = sc.parallelize([(0L, "a b c d e spark", 1.0), (1L, "b d", 0.0), (2L, "spark f g h", 1.0), (3L, "hadoop mapreduce", 0.0)]) \ .map(lambda x: LabeledDocument(*x)).toDF() # Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr. tokenizer = Tokenizer(inputCol="text", outputCol="words") hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") lr = LogisticRegression(maxIter=10, regParam=0.01) pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) # Fit the pipeline to training documents. model = pipeline.fit(training) # Prepare test documents, which are unlabeled. Document = Row("id", "text") test = sc.parallelize([(4L, "spark i j k"), (5L, "l m n"), (6L, "mapreduce spark"), (7L, "apache hadoop")]) \ .map(lambda x: Document(*x)).toDF() # Make predictions on test documents and print columns of interest. prediction = model.transform(test) selected = prediction.select("id", "text", "prediction") for row in selected.collect(): print row sc.stop() ```