def toRedshift(time, rdd): try: sqlContext = getSqlContextInstance(rdd.context) schema = StructType([ StructField('user_id', StringType(), True), StructField('device_id', StringType(), True), StructField('steps', IntegerType(), True), StructField('battery_level', IntegerType(), True), StructField('calories_spent', IntegerType(), True), StructField('distance', FloatType(), True), StructField('current_time', IntegerType(), True), ]) df = sqlContext.createDataFrame(rdd, schema) df.registerTempTable("activity_log") df.write \ .format("com.databricks.spark.redshift") \ .option("url", "jdbc:redshiftURL.com:5439/database?user=USERNAME&password=PASSWORD") \ .option("dbtable", "activity_log") \ .option("tempdir", "s3n://spark-temp-data/") \ .mode("append") \ .save() except Exception as e: raise(e) py_rdd.foreachRDD(process)