/ / Abfrage auf RxJava beobachtenOn Scheduler Thread - Java, RX-Java, vert.x, RX-Java2

Abfrage auf RxJava beobachtenOn Scheduler-Thread - Java, RX-Java, vert.x, RX-Java2

Ich muss in eine Datei basierend auf dem eingehenden schreibenAnfragen. Da mehrere Anfragen gleichzeitig eingehen können, möchte ich nicht, dass mehrere Threads versuchen, den Dateiinhalt zusammen zu überschreiben, was dazu führen kann, dass Daten verloren gehen.

Daher habe ich versucht, alle Anfragen "Daten mit einer Instanzvariable von PublishSubject. Ich habe abonniert publishSubject während init und dieses Abonnement wird bleibenwährend des gesamten Lebenszyklus der Anwendung. Außerdem beobachte ich dieselbe Instanz in einem separaten Thread (bereitgestellt von der Vertx-Ereignisschleife), der die für das Schreiben der Datei verantwortliche Methode aufruft.

private PublishSubject<FileData> publishSubject = PublishSubject.create();

private void init() {
publishSubject.observeOn(RxHelper.blockingScheduler(vertx)).subscribe(fileData -> writeData(fileData));
}

Später während der Bearbeitung der Anfrage rufe ich an onNext wie nachstehend:

handleRequest() {
//do some task
publishSubject.onNext(fileData);
}

Ich verstehe das, wenn ich anrufe onNext, werden die Daten in die Warteschlange gestellt, um von dem spezifischen Thread, der zugewiesen wurde, in die Datei geschrieben zu werden observeOn Operator. Was ich jedoch zu verstehen versuche, ist

  1. ob dieser Thread nur im WARTEN-Status blockiert wird Aufgabe? Oder,
  2. Wird es auch für andere Aktivitäten verwendet werden, wenn nicht?Datei Schreiben passiert? Ich möchte nicht mit einem Thread aus der vertx Event-Schleife enden, der im Wartezustand für diesen Ansatz verschwendet wird. Bitte schlagen Sie auch einen besseren Ansatz vor, falls verfügbar.

Danke im Voraus.

Antworten:

1 für die Antwort № 1

Eigentlich wird RxJava das per Definition tun onNext() Emissionen werden seriell wirken:

Beobachter müssen Benachrichtigungen an Beobachter ausgebenseriell (nicht parallel). Sie können diese Benachrichtigungen aus verschiedenen Threads ausgeben, aber zwischen den Benachrichtigungen muss eine formale "happen-before" -Beziehung bestehen. (Beobachtbarer Vertrag)

So lange wie Sie blockierende Anrufe in der onNext() Bei dem Subscriber (und die Arbeit nicht manuell zu einem anderen Thread abzweigen) wird es dir gut gehen, und es werden keine parallelen Schreibvorgänge stattfinden.

Eigentlich sollten Sie sich Sorgen aus der entgegengesetzten Richtung machen - Gegendruck.
Sie sollten Ihre Gegendruckstrategie wählenhier, als ob die Anfragen schneller kommen würden, als Sie sie verarbeiten (in Datei schreiben), könnten Sie den Puffer überlaufen und in Schwierigkeiten geraten. (Erwägen Sie die Verwendung von Flowable und wählen Sie eine Rückstau-Strategie nach Ihren Bedürfnissen.

Bei Ihren Fragen hängt das vom Scheduler ab, den Sie verwenden RxHelper.blockingScheduler(vertx) das scheint Ihr benutzerdefinierter Code zu sein, so kann ich nicht sagen, ob der Scheduler geteilten Thread in der Arbeitswarteschlangenmode verwendet, dann wird er nicht im Leerlauf bleiben.
Wie auch immer, Rx wird dies nicht für Sie bestimmen, die Aufgabe des Schedulers ist es, die Arbeit einem Thread entsprechend seiner Logik zuzuweisen.