/ / spark-sql-streaming-mqtt cattivo utente o password - apache-spark, spark-streaming, mqtt, paho

spark-sql-streaming-mqtt cattivo utente o password - apache-spark, spark-streaming, mqtt, paho

Sto cercando di consumare mqtt come stream in apache spark la lib utilizzata è apache bahir spark-sql-streaming-mqtt. Questa libreria usa la libreria di mqm paho.

Sto usando la lib come segue:

val spark = SparkSession
.builder
.appName("MQTTStreamWordCount")
.master("local[4]")
.getOrCreate()

import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to mqtt server
val lines = spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("clientId", "sparkTest")
.option("username", "user")
.option("password", "psw")
.option("brokerUrl", "tcp://ip:1883")
.option("topic", "/bikes")
.option("cleanSession", "true")
.load("tcp://ip:1883").as[(String, Timestamp)]

val query = lines.select("value").writeStream
.outputMode("append")
.format("console")
.start()



query.awaitTermination()

E ottengo questo errore: "nome utente o password errati".

Ma in un altro progetto akka / scala uso la lib paho-mqtt sullo stesso broker, con lo stesso user / psw e funziona bene.

quindi sono confuso con questo errore

risposte:

0 per risposta № 1

La soluzione :

  1. Usa la versione 1.1.0 di paho-mqtt per avere il metodo autoReconnect:

    "org.eclipse.paho"% "org.eclipse.paho.client.mqttv3"% "1.1.0"

  2. crea il tuo bahir spark-sql-streaming-mqtt dall'origine github perché l'autenticazione non è presente sulla versione esistente. https://github.com/apache/bahir