Próbuję skonfigurować prosty proces za pomocą Spark Streaming, używając Apache Bahir do połączenia z Akka. Próbowałem podążać ich przykład razem z tym starszy. Mam prostego aktora spedytora
class ForwarderActor extends ActorReceiver {
def receive = {
case data: MyData => store(data)
}
}
i tworzę strumień za pomocą
val stream = AkkaUtils.createStream[RSVP](ssc, Props[ForwarderActor], actorName)
konfiguracja wygląda następująco:
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "localhost"
port = 7777
}
}
}
a moim problemem jest: jak wysłać wiadomości do aktora Forwardera? Może nie rozumiem, w jaki sposób Akka Remote jest używany w tym przypadku. Gdy aplikacja się uruchomi, widzę dziennik
[akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://test@localhost:7777]
a później widzę
[akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://streaming-actor-system-0@192.168.192.7:52369]
Co wydaje się przypominać opis w ScalaDoc:
/**
* A default ActorSystem creator. It will use a unique system name
* (streaming-actor-system-<spark-task-attempt-id>) to start an ActorSystem that supports remote
* communication.
*/
W sumie nie jestem pewien, jak mam wysyłać wiadomości do aktora Forwardera. Dzięki za pomoc!
Odpowiedzi:
0 dla odpowiedzi № 1Aktorzy Akka mogą wysyłać wiadomości do innych aktorów Akka działających na zdalnym JVM. Więc ... kiedy aktor nadawcy musi znać adres zamierzonego odbiorcy.
AkkaUtil (Bahir) umożliwia tworzenie iskiernika z wiadomości, które: ReceiverActor
odbiera. Ale skąd będą otrzymywać wiadomości? Cóż ... jakiś zdalny aktor. Aby wysyłać wiadomości, ten zdalny aktor będzie potrzebował adresu twojego ReceiverActor
który działa w aplikacji iskry.
Ogólnie rzecz biorąc, nie możesz być zbyt pewny co do IPktóry uruchomi aplikację iskry. Zrobimy to tak, aby aktor działający z iskrą poinformował producenta o referencjach i poprosił o przesłanie rzeczy.
Tylko upewnij się, że obie aplikacje są napisane przy użyciu tej samej wersji Scali i używają tego samego środowiska JRE.
Teraz ... najpierw napiszmy aktora, który będzie źródłem danych,
import akka.actor.{Actor, ActorRef, ActorLogging, ActorSystem, Props}
import akka.actor.Actor.Receive
import com.typesafe.config.{Config, ConfigFactory}
case class SendMeYourStringsRequest(requesterRef: ActorRef)
case class RequestedString(s: String)
class MyActor extends Actor with ActorLogging {
val theListOfMyStrings = List("one", "two", "three")
override def receive: Receive = {
case SendMeYourStringsRequest(requesterRef) => {
theListOfMyStrings.foreach(s => {
requesterRef ! RequestedString(s)
})
}
}
}
object MyApplication extends App {
val config = ConfigFactory.parseString(
"""
|akka{
| actor {
| provider = remote
| }
| remote {
| enabled-transports = ["akka.remote.netty.tcp"]
| untrusted-mode = off
| netty.tcp {
| hostname="my-ip-address"
| port=18000
| }
| }
|}
""".stripMargin
)
val actorSystem = ActorSystem("my-actor-system", config)
var myActor = actorSystem.actorOf(Props(classOf[MyActor]), "my-actor")
}
Teraz ... napiszmy naszą prostą aplikację,
import akka.actor.{Actor, ActorRef, ActorLogging, ActorSystem, Props}
import akka.actor.Actor.Receive
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils}
case class SendMeYourStringsRequest(requesterRef: ActorRef)
case class RequestedString(s: String)
class YourStringRequesterActor extends ActorReceiver {
def receive = {
case RequestedString(s) => store(s)
}
override def preStart(): Unit = {
val myActorPath = ActorPath.fromString("akka.tcp://my-actor-system@my-ip-address:18000/user/my-actor")
val myActorSelection = context.actorSelection(myActorPath)
myActorSelection ! SendMeYourStringsRequest(self)
}
}
object YourSparkApp {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("ActorWordCount")
if (!sparkConf.contains("spark.master")) {
sparkConf.setMaster("local[2]")
}
val ssc = new StreamingContext(sparkConf, Seconds(2))
val stringStream = AkkaUtils.createStream[String](
ssc,
Props(classOf[YourStringRequesterActor]),
"your-string-requester-actor"
)
stringStream.foreach(println)
}
}
Uwaga: po prostu zajmij się my-ip-address
. Jeśli są jakieś inne problemy, daj nam znać w komentarzach.