/ / RxJava observeOnスケジューラスレッドのクエリ - java、rx-java、vert.x、rx-java2

RxJavaのobserveOnスケジューラスレッドのクエリ - java、rx-java、vert.x、rx-java2

私は着信に基づいてファイルに書き込む必要がありますリクエスト。複数のリクエストが同時に発生する可能性があるため、複数のスレッドがファイルの内容を一緒に上書きしようとすると、データが失われる可能性があります。

したがって、私はすべてのリクエストを収集しようとしました "インスタンス変数を使用してデータ PublishSubject。私は購読しました publishSubject initの間にこのサブスクリプションは残りますアプリケーションのライフサイクルを通じてまた、ファイルを書き込むメソッドを呼び出す別のスレッド(Vertexイベントループによって提供される)で同じインスタンスを監視しています。

private PublishSubject<FileData> publishSubject = PublishSubject.create();

private void init() {
publishSubject.observeOn(RxHelper.blockingScheduler(vertx)).subscribe(fileData -> writeData(fileData));
}

後でリクエスト処理中に、私は onNext 以下のように:

handleRequest() {
//do some task
publishSubject.onNext(fileData);
}

私が理解すると、私が電話すると onNext、データはキューに入れられ、割り当てられた特定のスレッドによってファイルに書き込まれます。 observeOn オペレーター。しかし、私が理解しようとしていることは、

  1. このスレッドがこれだけのために待機状態でブロックされるかどうか 仕事?または、
  2. 他の活動にも使用されます。ファイル 書き込みは起こるか? このアプローチでは、待機状態で無駄になっていたvertexイベントループから1つのスレッドで終了したいとは思っていません。

前もって感謝します。

回答:

回答№1は1

実際にはRxJavaはあなたのためにそれを行います onNext() 排出物は連続的に作用する。

Observablesはオブザーバーに通知を出さなければなりません直列に(並列ではない)。彼らは異なるスレッドからこれらの通知を発行するかもしれませんが、通知の間に正式の先験的関係がなければなりません。 (観察可能な契約

だから、あなたがブロッキング呼び出しを実行する限り onNext() (そして、別のスレッドに手動で作業をフォークしない)あなたはうまくいくでしょう、そして、パラレルな書き込みは起こりません。

実際、あなたは逆の方向から来るべきか心配しています - 背圧。
背圧戦略を選択する必要がありますここでは、リクエストがより速く来るように、バッファをオーバーフローさせて問題に遭遇するかもしれない(ファイルへの書き込み)それらを処理します。 (Flowableの使用を検討し、必要に応じて背圧戦略を選択してください。

あなたの質問に関しては、スケジューラに依存しています。 RxHelper.blockingScheduler(vertx) あなたのカスタムコードのように思えるので、スケジューラが共有キューを作業キューのファッションで使用している場合、スケジューラはアイドル状態を維持できません。
とにかく、Rxはこれを決定しません。スケジューラの責任は、そのロジックに従ってあるスレッドに作業を割り当てることです。