/ / Apache Bahir, invia materiale ad ActorReceiver - scala, apache-spark, spark-streaming, apache-bahir

Apache Bahir, invia materiale ad ActorReceiver - scala, apache-spark, spark-streaming, apache-bahir

Sto cercando di impostare un processo semplice con Spark Streaming, utilizzando Apache Bahir per connettersi ad Akka. Ho provato a seguire il loro esempio insieme a questo uno più vecchio. Ho un semplice attore spedizioniere

class ForwarderActor extends ActorReceiver {
def receive = {
case data: MyData => store(data)
}
}

e creo un flusso con

val stream = AkkaUtils.createStream[RSVP](ssc, Props[ForwarderActor], actorName)

la configurazione è simile a questa:

akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "localhost"
port = 7777
}
}
}

e il mio problema è: come posso inviare messaggi all'attore dello spedizioniere? Forse non capisco come si usa Akka Remote in questo caso: quando l'app inizia, vedo un log

[akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://test@localhost:7777]

e più tardi vedo

[akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://streaming-actor-system-0@192.168.192.7:52369]

Che sembra ricordare la descrizione nel scaladoc:

 /**
* A default ActorSystem creator. It will use a unique system name
* (streaming-actor-system-<spark-task-attempt-id>) to start an ActorSystem that supports remote
* communication.
*/

Tutto sommato non sono sicuro di come dovrei inviare messaggi all'attore dello spedizioniere. Grazie per qualsiasi aiuto!

risposte:

0 per risposta № 1

Gli attori Akka possono inviare messaggi ad altri attori Akka che girano su una JVM remota. Quindi ... quando l'attore mittente ha bisogno di conoscere l'indirizzo dell'attore del destinatario desiderato.

AkkaUtil (Bahir) ti consente di creare un flusso di scintilla dai messaggi che a ReceiverActor riceve. Ma da dove riceveranno i messaggi? Bene ... un attore remoto. E per inviare messaggi questo attore remoto avrà bisogno dell'indirizzo del tuo ReceiverActor che sta funzionando nella tua applicazione scintilla.

In generale, non si può essere troppo sicuri dell'ipche eseguirà la tua applicazione scintilla. Quindi, faremo in modo che l'attore che corre con la scintilla possa dire al produttore attore il suo riferimento e chiedergli di inviare le sue cose.

Assicurati che entrambe le applicazioni siano scritte usando la stessa versione di Scala e che eseguano lo stesso JRE.

Ora ... prima scriviamo l'attore che sarà la fonte dei dati,

import akka.actor.{Actor, ActorRef, ActorLogging, ActorSystem, Props}
import akka.actor.Actor.Receive
import com.typesafe.config.{Config, ConfigFactory}

case class SendMeYourStringsRequest(requesterRef: ActorRef)
case class RequestedString(s: String)

class MyActor extends Actor with ActorLogging {

val theListOfMyStrings = List("one", "two", "three")

override def receive: Receive = {
case SendMeYourStringsRequest(requesterRef) => {
theListOfMyStrings.foreach(s => {
requesterRef ! RequestedString(s)
})
}
}
}

object MyApplication extends App {

val config = ConfigFactory.parseString(
"""
|akka{
|  actor {
|    provider = remote
|  }
|  remote {
|    enabled-transports = ["akka.remote.netty.tcp"]
|    untrusted-mode = off
|    netty.tcp {
|      hostname="my-ip-address"
|      port=18000
|    }
|  }
|}
""".stripMargin
)

val actorSystem = ActorSystem("my-actor-system", config)

var myActor = actorSystem.actorOf(Props(classOf[MyActor]), "my-actor")

}

Ora ... lascia scrivere la nostra semplice app scintilla,

import akka.actor.{Actor, ActorRef, ActorLogging, ActorSystem, Props}
import akka.actor.Actor.Receive
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils}

case class SendMeYourStringsRequest(requesterRef: ActorRef)
case class RequestedString(s: String)

class YourStringRequesterActor extends ActorReceiver {
def receive = {
case RequestedString(s) => store(s)
}

override def preStart(): Unit = {
val myActorPath = ActorPath.fromString("akka.tcp://my-actor-system@my-ip-address:18000/user/my-actor")
val myActorSelection = context.actorSelection(myActorPath)

myActorSelection ! SendMeYourStringsRequest(self)
}
}

object YourSparkApp {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("ActorWordCount")

if (!sparkConf.contains("spark.master")) {
sparkConf.setMaster("local[2]")
}

val ssc = new StreamingContext(sparkConf, Seconds(2))

val stringStream = AkkaUtils.createStream[String](
ssc,
Props(classOf[YourStringRequesterActor]),
"your-string-requester-actor"
)

stringStream.foreach(println)

}
}

Nota: basta prendersi cura di my-ip-address. Se ci sono altri problemi, per favore fatemelo sapere nei commenti.