Sto cercando di consumare mqtt come stream in apache spark la lib utilizzata è apache bahir spark-sql-streaming-mqtt. Questa libreria usa la libreria di mqm paho.
Sto usando la lib come segue:
val spark = SparkSession
.builder
.appName("MQTTStreamWordCount")
.master("local[4]")
.getOrCreate()
import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to mqtt server
val lines = spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("clientId", "sparkTest")
.option("username", "user")
.option("password", "psw")
.option("brokerUrl", "tcp://ip:1883")
.option("topic", "/bikes")
.option("cleanSession", "true")
.load("tcp://ip:1883").as[(String, Timestamp)]
val query = lines.select("value").writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
E ottengo questo errore: "nome utente o password errati".
Ma in un altro progetto akka / scala uso la lib paho-mqtt sullo stesso broker, con lo stesso user / psw e funziona bene.
quindi sono confuso con questo errore
risposte:
0 per risposta № 1La soluzione :
Usa la versione 1.1.0 di paho-mqtt per avere il metodo autoReconnect:
"org.eclipse.paho"% "org.eclipse.paho.client.mqttv3"% "1.1.0"
crea il tuo bahir spark-sql-streaming-mqtt dall'origine github perché l'autenticazione non è presente sulla versione esistente. https://github.com/apache/bahir