/ / Tratando de obtener chispas para leer el flujo de datos del sitio web, ¿qué es el socket? - hadoop, apache-spark, spark-streaming, rdd

Intentando obtener una transmisión de chispa para leer el flujo de datos del sitio web, ¿qué es el socket? - hadoop, apache-spark, spark-streaming, rdd

Estoy tratando de obtener estos datos http://stream.meetup.com/2/rsvps en la corriente de chispa

Son objetos JSON, sé que las líneas serán cadenas, solo quiero que funcione antes de probar JSON.

No estoy seguro de qué poner como puerto, asumo que ese es el problema.

SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("Spark Streaming");

JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

JavaReceiverInputDStream<String> lines = jssc.socketTextStream("http://stream.meetup.com/2/rsvps", 80);


lines.print();

jssc.start();
jssc.awaitTermination();

Aqui esta mi error

java.net.UnknownHostException: http://stream.meetup.com/2/rsvps
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:178)
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:579)
at java.net.Socket.connect(Socket.java:528)
at java.net.Socket.<init>(Socket.java:425)
at java.net.Socket.<init>(Socket.java:208)

Respuestas

2 para la respuesta № 1

El socketTextStream no está diseñado para funcionar comocliente http Como se habrá dado cuenta, deberá crear un receptor personalizado, un lugar potencial para comenzar se basa en el receptor creado como parte de la fuente de datos de transmisión de datos de la reunión (consulte https://github.com/actions/meetup-stream/blob/master/src/main/scala/receiver/MeetupReceiver.scala )


0 para la respuesta № 2

Aquí es una costumbre UrlReceptor que sigue la documentación de la chispa en los receptores personalizados:

class UrlReceiver(urlStr: String) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {

override def onStart() = {
new Thread("Url Receiver") {
override def run() = {
val urlConnection: URLConnection = new URL(urlStr).openConnection
val bufferedReader: BufferedReader = new BufferedReader(
new InputStreamReader(urlConnection.getInputStream)
)
var msg = bufferedReader.readLine
while (msg != null) {
if (!msg.isEmpty) {
store(msg)
}
msg = bufferedReader.readLine
}
bufferedReader.close()
}
}.start()
}

override def onStop() = {
// nothing to do
}
}

Entonces úsalo así:

val lines = sc.receiverStream(new UrlReceiver("http://stream.meetup.com/2/rsvps"))