/ / Apache Bahir, wyślij rzeczy do ActorReceiver - scala, apasz-iskra, iskrzenie strumieniowe, apache-bahir

Apache Bahir, wyślij rzeczy do ActorReceiver - scala, apache-iskierka, iskra-streaming, apache-bahir

Próbuję skonfigurować prosty proces za pomocą Spark Streaming, używając Apache Bahir do połączenia z Akka. Próbowałem podążać ich przykład razem z tym starszy. Mam prostego aktora spedytora

class ForwarderActor extends ActorReceiver {
def receive = {
case data: MyData => store(data)
}
}

i tworzę strumień za pomocą

val stream = AkkaUtils.createStream[RSVP](ssc, Props[ForwarderActor], actorName)

konfiguracja wygląda następująco:

akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "localhost"
port = 7777
}
}
}

a moim problemem jest: jak wysłać wiadomości do aktora Forwardera? Może nie rozumiem, w jaki sposób Akka Remote jest używany w tym przypadku. Gdy aplikacja się uruchomi, widzę dziennik

[akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://test@localhost:7777]

a później widzę

[akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://streaming-actor-system-0@192.168.192.7:52369]

Co wydaje się przypominać opis w ScalaDoc:

 /**
* A default ActorSystem creator. It will use a unique system name
* (streaming-actor-system-<spark-task-attempt-id>) to start an ActorSystem that supports remote
* communication.
*/

W sumie nie jestem pewien, jak mam wysyłać wiadomości do aktora Forwardera. Dzięki za pomoc!

Odpowiedzi:

0 dla odpowiedzi № 1

Aktorzy Akka mogą wysyłać wiadomości do innych aktorów Akka działających na zdalnym JVM. Więc ... kiedy aktor nadawcy musi znać adres zamierzonego odbiorcy.

AkkaUtil (Bahir) umożliwia tworzenie iskiernika z wiadomości, które: ReceiverActor odbiera. Ale skąd będą otrzymywać wiadomości? Cóż ... jakiś zdalny aktor. Aby wysyłać wiadomości, ten zdalny aktor będzie potrzebował adresu twojego ReceiverActor który działa w aplikacji iskry.

Ogólnie rzecz biorąc, nie możesz być zbyt pewny co do IPktóry uruchomi aplikację iskry. Zrobimy to tak, aby aktor działający z iskrą poinformował producenta o referencjach i poprosił o przesłanie rzeczy.

Tylko upewnij się, że obie aplikacje są napisane przy użyciu tej samej wersji Scali i używają tego samego środowiska JRE.

Teraz ... najpierw napiszmy aktora, który będzie źródłem danych,

import akka.actor.{Actor, ActorRef, ActorLogging, ActorSystem, Props}
import akka.actor.Actor.Receive
import com.typesafe.config.{Config, ConfigFactory}

case class SendMeYourStringsRequest(requesterRef: ActorRef)
case class RequestedString(s: String)

class MyActor extends Actor with ActorLogging {

val theListOfMyStrings = List("one", "two", "three")

override def receive: Receive = {
case SendMeYourStringsRequest(requesterRef) => {
theListOfMyStrings.foreach(s => {
requesterRef ! RequestedString(s)
})
}
}
}

object MyApplication extends App {

val config = ConfigFactory.parseString(
"""
|akka{
|  actor {
|    provider = remote
|  }
|  remote {
|    enabled-transports = ["akka.remote.netty.tcp"]
|    untrusted-mode = off
|    netty.tcp {
|      hostname="my-ip-address"
|      port=18000
|    }
|  }
|}
""".stripMargin
)

val actorSystem = ActorSystem("my-actor-system", config)

var myActor = actorSystem.actorOf(Props(classOf[MyActor]), "my-actor")

}

Teraz ... napiszmy naszą prostą aplikację,

import akka.actor.{Actor, ActorRef, ActorLogging, ActorSystem, Props}
import akka.actor.Actor.Receive
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils}

case class SendMeYourStringsRequest(requesterRef: ActorRef)
case class RequestedString(s: String)

class YourStringRequesterActor extends ActorReceiver {
def receive = {
case RequestedString(s) => store(s)
}

override def preStart(): Unit = {
val myActorPath = ActorPath.fromString("akka.tcp://my-actor-system@my-ip-address:18000/user/my-actor")
val myActorSelection = context.actorSelection(myActorPath)

myActorSelection ! SendMeYourStringsRequest(self)
}
}

object YourSparkApp {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("ActorWordCount")

if (!sparkConf.contains("spark.master")) {
sparkConf.setMaster("local[2]")
}

val ssc = new StreamingContext(sparkConf, Seconds(2))

val stringStream = AkkaUtils.createStream[String](
ssc,
Props(classOf[YourStringRequesterActor]),
"your-string-requester-actor"
)

stringStream.foreach(println)

}
}

Uwaga: po prostu zajmij się my-ip-address. Jeśli są jakieś inne problemy, daj nam znać w komentarzach.