/ / Scala Futures-組み込みのタイムアウト? -scala、並行性

Scala Futures - タイムアウト時に組み込まれていますか? - スカラ、並行性

公式チュートリアルrefから正確に理解できない先物の側面があります。 http://docs.scala-lang.org/overviews/core/futures.html

Scalaの先物にはタイムアウトが組み込まれていますか何らかのメカニズム?たとえば、次の例が5ギガバイトのテキストファイルだったとしましょう。「Implicits.global」の暗黙のスコープは、最終的にonFailureを非ブロッキング方式で起動させるのですか、それとも定義できますか?そして、デフォルトのタイムアウトなしでなんらかの「成功」も「失敗」も起こらないということを「それはほのめかさない」のでしょうか?

import scala.concurrent._
import ExecutionContext.Implicits.global

val firstOccurence: Future[Int] = future {
val source = scala.io.Source.fromFile("myText.txt")
source.toSeq.indexOfSlice("myKeyword")
}
firstOccurence onSuccess {
case idx => println("The keyword first appears at position: " + idx)
}
firstOccurence onFailure {
case t => println("Could not process file: " + t.getMessage)
}

回答:

回答№1の63

ブロッキングを使用して結果を取得する場合にのみ、タイムアウト動作が発生します。 Future。ノンブロッキングコールバックを使用する場合 onComplete, onSuccess または onFailure、その後、独自のタイムアウト処理をロールする必要があります。 Akkaには、要求/応答のタイムアウト処理が組み込まれています(?)アクター間のメッセージング。ただし、Akkaの使用を開始するかどうかは不明です。 AkkaのFWIWは、タイムアウト処理のために2つを構成します Futures 一緒に Future.firstCompletedOf、1つは実際の非同期タスクを表し、もう1つはタイムアウトを表します。タイムアウトタイマー( HashedWheelTimer)最初にポップすると、非同期コールバックでエラーが発生します。

独自のローリングの非常に単純化された例は、次のようになります。まず、タイムアウトをスケジュールするためのオブジェクト:

import org.jboss.netty.util.{HashedWheelTimer, TimerTask, Timeout}
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
import scala.concurrent.Promise
import java.util.concurrent.TimeoutException

object TimeoutScheduler{
val timer = new HashedWheelTimer(10, TimeUnit.MILLISECONDS)
def scheduleTimeout(promise:Promise[_], after:Duration) = {
timer.newTimeout(new TimerTask{
def run(timeout:Timeout){
promise.failure(new TimeoutException("Operation timed out after " + after.toMillis + " millis"))
}
}, after.toNanos, TimeUnit.NANOSECONDS)
}
}

次に、Futureを取得してタイムアウト動作を追加する関数:

import scala.concurrent.{Future, ExecutionContext, Promise}
import scala.concurrent.duration.Duration

def withTimeout[T](fut:Future[T])(implicit ec:ExecutionContext, after:Duration) = {
val prom = Promise[T]()
val timeout = TimeoutScheduler.scheduleTimeout(prom, after)
val combinedFut = Future.firstCompletedOf(List(fut, prom.future))
fut onComplete{case result => timeout.cancel()}
combinedFut
}

なお、 HashedWheelTimer 私がここで使用しているのはNettyからです。


回答№2の21

私はちょうど作成しました TimeoutFuture 同僚のクラス:

TimeoutFuture

package model

import scala.concurrent._
import scala.concurrent.duration._
import play.libs.Akka
import play.api.libs.concurrent.Execution.Implicits._

object TimeoutFuture {
def apply[A](timeout: FiniteDuration)(block: => A): Future[A] = {

val prom = promise[A]

// timeout logic
Akka.system.scheduler.scheduleOnce(timeout) {
prom tryFailure new java.util.concurrent.TimeoutException
}

// business logic
Future {
prom success block
}

prom.future
}
}

使用法

val future = TimeoutFuture(10 seconds) {
// do stuff here
}

future onComplete {
case Success(stuff) => // use "stuff"
case Failure(exception) => // catch exception (either TimeoutException or an exception inside the given block)
}

ノート:

  • プレイを想定!フレームワーク(ただし、簡単に適応できます)
  • すべてのコードが同じで実行されます ExecutionContext これは理想的ではないかもしれません。

回答№3の12

これらの答えにはすべて、追加の依存関係が必要です。私はjava.util.Timerを使用してバージョンを作成することにしました。これは将来、関数を実行する効率的な方法です。この場合、タイムアウトをトリガーします。

詳細を記載したブログ投稿はこちら

これをScalaのPromiseで使用すると、次のようにタイムアウトのあるFutureを作成できます。

package justinhj.concurrency

import java.util.concurrent.TimeoutException
import java.util.{Timer, TimerTask}

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.language.postfixOps

object FutureUtil {

// All Future"s that use futureWithTimeout will use the same Timer object
// it is thread safe and scales to thousands of active timers
// The true parameter ensures that timeout timers are daemon threads and do not stop
// the program from shutting down

val timer: Timer = new Timer(true)

/**
* Returns the result of the provided future within the given time or a timeout exception, whichever is first
* This uses Java Timer which runs a single thread to handle all futureWithTimeouts and does not block like a
* Thread.sleep would
* @param future Caller passes a future to execute
* @param timeout Time before we return a Timeout exception instead of future"s outcome
* @return Future[T]
*/
def futureWithTimeout[T](future : Future[T], timeout : FiniteDuration)(implicit ec: ExecutionContext): Future[T] = {

// Promise will be fulfilled with either the callers Future or the timer task if it times out
val p = Promise[T]

// and a Timer task to handle timing out

val timerTask = new TimerTask() {
def run() : Unit = {
p.tryFailure(new TimeoutException())
}
}

// Set the timeout to check in the future
timer.schedule(timerTask, timeout.toMillis)

future.map {
a =>
if(p.trySuccess(a)) {
timerTask.cancel()
}
}
.recover {
case e: Exception =>
if(p.tryFailure(e)) {
timerTask.cancel()
}
}

p.future
}

}

答え№4のための5

PlayフレームワークにはPromise.timeoutが含まれているため、次のようなコードを記述できます

private def get(): Future[Option[Boolean]] = {
val timeoutFuture = Promise.timeout(None, Duration("1s"))
val mayBeHaveData = Future{
// do something
Some(true)
}

// if timeout occurred then None will be result of method
Future.firstCompletedOf(List(mayBeHaveData, timeoutFuture))
}

答え№5の場合は3

将来の待機時にタイムアウトを指定できます。

ために scala.concurrent.Futureresult メソッドでは、タイムアウトを指定できます。

ために scala.actors.Future, Futures.awaitAll タイムアウトを指定できます。

Futureの実行にタイムアウトが組み込まれているとは思わない。


回答№6の場合は3

誰も言及していません akka-streams、まだ。フローには簡単な completionTimeout メソッド、およびそれを単一ソースストリームに適用すると、Futureのように機能します。

ただし、akka-streamsもキャンセルを行うため、ソースの実行を実際に終了できます。つまり、ソースにタイムアウトを通知します。


回答№7の3

ライター(約束保持者)をタイムアウトロジックを制御するものにしたい場合は、 akka.pattern.after次のようにします。

val timeout = akka.pattern.after(10 seconds, system.scheduler)(Future.failed(new TimeoutException(s"timed out during...")))
Future.firstCompletedOf(Seq(promiseRef.future, timeout))

このように、約束の完了ロジックが実行されない場合、呼び出し元の未来はある時点で失敗してまだ完了しています。


答え№8の2

これがScalaの標準ではないことに驚いています。私のバージョンは短く、依存関係はありません。

import scala.concurrent.Future

sealed class TimeoutException extends RuntimeException

object FutureTimeout {

import scala.concurrent.ExecutionContext.Implicits.global

implicit class FutureTimeoutLike[T](f: Future[T]) {
def withTimeout(ms: Long): Future[T] = Future.firstCompletedOf(List(f, Future {
Thread.sleep(ms)
throw new TimeoutException
}))

lazy val withTimeout: Future[T] = withTimeout(2000) // default 2s timeout
}

}

使用例

import FutureTimeout._
Future { /* do smth */ } withTimeout

答え№9の場合は1

モニックス Task タイムアウトがあります サポート

import monix.execution.Scheduler.Implicits.global
import monix.eval._
import scala.concurrent.duration._
import scala.concurrent.TimeoutException

val source = Task("Hello!").delayExecution(10.seconds)

// Triggers error if the source does not complete in 3 seconds after runOnComplete
val timedOut = source.timeout(3.seconds)

timedOut.runOnComplete(r => println(r))
//=> Failure(TimeoutException)

回答№10は0

私は、Akkaシステムディスパッチャーを使用するこのバージョン(上記のPlayの例に基づく)を使用しています。

object TimeoutFuture {
def apply[A](system: ActorSystem, timeout: FiniteDuration)(block: => A): Future[A] = {
implicit val executionContext = system.dispatcher

val prom = Promise[A]

// timeout logic
system.scheduler.scheduleOnce(timeout) {
prom tryFailure new java.util.concurrent.TimeoutException
}

// business logic
Future {
try {
prom success block
} catch {
case t: Throwable => prom tryFailure t
}
}

prom.future
}
}