-
-
Save kartben/614fea74e9c67df0aae0 to your computer and use it in GitHub Desktop.
| # | |
| # Licensed to the Apache Software Foundation (ASF) under one or more | |
| # contributor license agreements. See the NOTICE file distributed with | |
| # this work for additional information regarding copyright ownership. | |
| # The ASF licenses this file to You under the Apache License, Version 2.0 | |
| # (the "License"); you may not use this file except in compliance with | |
| # the License. You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| # | |
| """ | |
| This example will consume temperature data (or any other numerical values, really) | |
| from an MQTT broker, and consolidate/graph this data on a 15-second sliding window. | |
| This work is based on the original mqtt_wordcount.py sample from the Apache Spark codebase | |
| Running the example: | |
| `$ bin/spark-submit --jars \ | |
| external/mqtt-assembly/target/spark-streaming-mqtt-assembly_*.jar \ | |
| mqtt_spark_streaming.py` | |
| """ | |
| def is_number(s): | |
| try: | |
| float(s) | |
| return True | |
| except ValueError: | |
| return False | |
| import sys | |
| import operator | |
| from pyspark import SparkContext | |
| from pyspark.streaming import StreamingContext | |
| from pyspark.streaming.mqtt import MQTTUtils | |
| sc = SparkContext(appName="TemperatureHistory") | |
| ssc = StreamingContext(sc, 1) | |
| ssc.checkpoint("checkpoint") | |
| # broker URI | |
| brokerUrl = "tcp://192.168.2.26:1883" # "tcp://iot.eclipse.org:1883" | |
| # topic or topic pattern where temperature data is being sent | |
| topic = "+/+/sensors/temperature" | |
| mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic) | |
| counts = mqttStream \ | |
| .filter(lambda message: is_number(message)) \ | |
| .map(lambda message: ( round(float(message) * 2, 0) / 2, 1 )) \ | |
| .reduceByKeyAndWindow(operator.add, operator.sub, 15, 1) \ | |
| .transform(lambda rdd: rdd.sortByKey()) | |
| def printHistogram(time, rdd): | |
| c = rdd.collect() | |
| print("-------------------------------------------") | |
| print("Time: %s" % time) | |
| print("-------------------------------------------") | |
| for record in c: | |
| # "draw" our lil' ASCII-based histogram | |
| print(str(record[0]) + ': ' + '#'*record[1]) | |
| print("") | |
| counts.foreachRDD(printHistogram) | |
| ssc.start() | |
| ssc.awaitTermination() |
Hi,
I had the same problem but running the code using Apache Bahir package worked for me. However I made some changes to the code.
My Apache Spark version is 2.45.
First I started pyspark with this command:
pyspark --packages org.apache.bahir:spark-streaming-mqtt_2.11:2.4.0
When the console is ready I can run the code example changing
from pyspark.streaming.mqtt import MQTTUtils to from mqtt import MQTTUtils
Also I had to change also the MQTT connection to:
mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic, username=None, password=None)
@massimocallisto When I run the command pyspark --packages org.apache.bahir:spark-streaming-mqtt_2.11:3.3.1, I got the following error
module not found: org.apache.bahir#spark-streaming-mqtt_2.11;3.3.1
Please let me know if you have any solution?
This code is really old so I doubt it still works, unfortunately.
@kartben , It is not working, So is there any other way to do spark streaming with MQTT protocol?
@BennisonDevadoss have you tried the latest version of the official samples which this code was originally inspired from?
https://github.com/apache/bahir/tree/master/streaming-mqtt/examples
Any way to retrieve actual topic name from the stream if using wildcard characters like '#' or '+' to subscribe to multiple topics. MqttUtils also has a method createPairedStream but it does not allow username, password authentication. Need help
Hello ! i am facing an issue while running this, it says "ModuleNotFoundError: No module named 'pyspark.streaming.mqtt'" what could be the solution ?