Skip to content

Instantly share code, notes, and snippets.

@quydx
Forked from lucidfrontier45/sklearn-spark.py
Created October 12, 2020 03:13
Show Gist options
  • Select an option

  • Save quydx/51d22ce08aae71828a6d8fc4b509748f to your computer and use it in GitHub Desktop.

Select an option

Save quydx/51d22ce08aae71828a6d8fc4b509748f to your computer and use it in GitHub Desktop.
Use trained sklearn model with pyspark
from pyspark import SparkContext
import numpy as np
from sklearn import ensemble
def batch(xs):
yield list(xs)
N = 1000
train_x = np.random.randn(N, 10)
train_y = np.random.binomial(1, 0.5, N)
model = ensemble.RandomForestClassifier(10).fit(train_x, train_y)
test_x = np.random.randn(N * 100, 10)
sc = SparkContext()
n_partitions = 10
rdd = sc.parallelize(test_x, n_partitions).zipWithIndex()
b_model = sc.broadcast(model)
result = rdd.mapPartitions(batch) \
.map(lambda xs: ([x[0] for x in xs], [x[1] for x in xs])) \
.flatMap(lambda x: zip(x[1], b_model.value.predict(x[0])))
print(result.take(100))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment