jest aspekt przyszłości, którego nie rozumiem dokładnie z oficjalnego samouczka. http://docs.scala-lang.org/overviews/core/futures.html
Czy kontrakty futures na scali mają wbudowany limit czasujakiś mechanizm? Powiedzmy, że poniższy przykład był 5-gigabajtowym plikiem tekstowym ... czy domyślny zakres „Implicits.global” ostatecznie powoduje, że onFailure uruchamia się w sposób nieblokujący, czy można to zdefiniować? I bez domyślnego limitu czasu pewnego rodzaju, czyż nie oznaczałoby to, że ani sukces, ani porażka nigdy nie wystrzelą?
import scala.concurrent._
import ExecutionContext.Implicits.global
val firstOccurence: Future[Int] = future {
val source = scala.io.Source.fromFile("myText.txt")
source.toSeq.indexOfSlice("myKeyword")
}
firstOccurence onSuccess {
case idx => println("The keyword first appears at position: " + idx)
}
firstOccurence onFailure {
case t => println("Could not process file: " + t.getMessage)
}
Odpowiedzi:
63 dla odpowiedzi № 1Limit czasu pojawia się tylko wtedy, gdy używasz blokowania, aby uzyskać wyniki Future
. Jeśli chcesz użyć nieblokujących wywołań zwrotnych onComplete
, onSuccess
lub onFailure
, wtedy musisz wprowadzić własne limity czasu. Akka ma wbudowaną obsługę limitów czasu dla żądania / odpowiedzi (?
) przesyłanie wiadomości między aktorami, ale nie wiadomo, czy chcesz zacząć korzystać z Akka. FWIW, w Akka, do obsługi przekroczenia limitu czasu, składają się z dwóch Futures
razem przez Future.firstCompletedOf
, który reprezentuje faktyczne zadanie asynchroniczne i ten, który reprezentuje limit czasu. Jeśli upłynął limit czasu (za pomocą HashedWheelTimer
) wyskakuje pierwszy, pojawia się błąd przy wywołaniu zwrotnym asynchronicznym.
Bardzo uproszczony przykład tworzenia własnych może wyglądać mniej więcej tak. Po pierwsze, obiekt do planowania limitów czasu:
import org.jboss.netty.util.{HashedWheelTimer, TimerTask, Timeout}
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
import scala.concurrent.Promise
import java.util.concurrent.TimeoutException
object TimeoutScheduler{
val timer = new HashedWheelTimer(10, TimeUnit.MILLISECONDS)
def scheduleTimeout(promise:Promise[_], after:Duration) = {
timer.newTimeout(new TimerTask{
def run(timeout:Timeout){
promise.failure(new TimeoutException("Operation timed out after " + after.toMillis + " millis"))
}
}, after.toNanos, TimeUnit.NANOSECONDS)
}
}
Następnie funkcja wzięcia przyszłości i dodania do niej zachowania związanego z przekroczeniem limitu czasu:
import scala.concurrent.{Future, ExecutionContext, Promise}
import scala.concurrent.duration.Duration
def withTimeout[T](fut:Future[T])(implicit ec:ExecutionContext, after:Duration) = {
val prom = Promise[T]()
val timeout = TimeoutScheduler.scheduleTimeout(prom, after)
val combinedFut = Future.firstCompletedOf(List(fut, prom.future))
fut onComplete{case result => timeout.cancel()}
combinedFut
}
Zauważ, że HashedWheelTimer
Używam tutaj jest z Netty.
21 dla odpowiedzi nr 2
Właśnie stworzyłem TimeoutFuture
klasa dla współpracownika:
TimeoutFuture
package model
import scala.concurrent._
import scala.concurrent.duration._
import play.libs.Akka
import play.api.libs.concurrent.Execution.Implicits._
object TimeoutFuture {
def apply[A](timeout: FiniteDuration)(block: => A): Future[A] = {
val prom = promise[A]
// timeout logic
Akka.system.scheduler.scheduleOnce(timeout) {
prom tryFailure new java.util.concurrent.TimeoutException
}
// business logic
Future {
prom success block
}
prom.future
}
}
Stosowanie
val future = TimeoutFuture(10 seconds) {
// do stuff here
}
future onComplete {
case Success(stuff) => // use "stuff"
case Failure(exception) => // catch exception (either TimeoutException or an exception inside the given block)
}
Uwagi:
- Zakłada się grać! framework (ale łatwo go dostosować)
- Każdy fragment kodu działa w ten sam sposób
ExecutionContext
co może nie być idealne.
12 dla odpowiedzi nr 3
Wszystkie te odpowiedzi wymagają dodatkowych zależności. Postanowiłem napisać wersję przy użyciu java.util.Timer, która jest wydajnym sposobem na uruchomienie funkcji w przyszłości, w tym przypadku na wyzwolenie limitu czasu.
Wpis na blogu z dalszymi szczegółami tutaj
Używając tego z Obietnicą Scali, możemy stworzyć Przyszłość z limitem czasu w następujący sposób:
package justinhj.concurrency
import java.util.concurrent.TimeoutException
import java.util.{Timer, TimerTask}
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.language.postfixOps
object FutureUtil {
// All Future"s that use futureWithTimeout will use the same Timer object
// it is thread safe and scales to thousands of active timers
// The true parameter ensures that timeout timers are daemon threads and do not stop
// the program from shutting down
val timer: Timer = new Timer(true)
/**
* Returns the result of the provided future within the given time or a timeout exception, whichever is first
* This uses Java Timer which runs a single thread to handle all futureWithTimeouts and does not block like a
* Thread.sleep would
* @param future Caller passes a future to execute
* @param timeout Time before we return a Timeout exception instead of future"s outcome
* @return Future[T]
*/
def futureWithTimeout[T](future : Future[T], timeout : FiniteDuration)(implicit ec: ExecutionContext): Future[T] = {
// Promise will be fulfilled with either the callers Future or the timer task if it times out
val p = Promise[T]
// and a Timer task to handle timing out
val timerTask = new TimerTask() {
def run() : Unit = {
p.tryFailure(new TimeoutException())
}
}
// Set the timeout to check in the future
timer.schedule(timerTask, timeout.toMillis)
future.map {
a =>
if(p.trySuccess(a)) {
timerTask.cancel()
}
}
.recover {
case e: Exception =>
if(p.tryFailure(e)) {
timerTask.cancel()
}
}
p.future
}
}
5 dla odpowiedzi № 4
Struktura Play zawiera Promise.timeout, dzięki czemu możesz pisać kod w następujący sposób
private def get(): Future[Option[Boolean]] = {
val timeoutFuture = Promise.timeout(None, Duration("1s"))
val mayBeHaveData = Future{
// do something
Some(true)
}
// if timeout occurred then None will be result of method
Future.firstCompletedOf(List(mayBeHaveData, timeoutFuture))
}
3 dla odpowiedzi № 5
Możesz określić limit czasu oczekiwania na przyszłość:
Dla scala.concurrent.Future
, result
Metoda pozwala określić limit czasu.
Dla scala.actors.Future
, Futures.awaitAll
pozwala określić limit czasu.
Nie wydaje mi się, żeby w Future miał wbudowany limit czasu.
3 dla odpowiedzi № 6
Nikt nie jest wspomniany akka-streams
jeszcze Przepływy są łatwe completionTimeout
i zastosowanie tego w strumieniu z jednego źródła działa jak przyszłość.
Ale akka-strumienie również anuluje, więc może faktycznie zakończyć działanie źródła, tj. Sygnalizuje przekroczenie limitu czasu dla źródła.
3 dla odpowiedzi № 7
Jeśli chcesz, aby pisarz (posiadacz obietnicy) był tym, który kontroluje logikę limitu czasu, użyj akka.pattern.after, w następujący sposób:
val timeout = akka.pattern.after(10 seconds, system.scheduler)(Future.failed(new TimeoutException(s"timed out during...")))
Future.firstCompletedOf(Seq(promiseRef.future, timeout))
W ten sposób, jeśli logika realizacji obietnicy nigdy się nie wydarzy, przyszłość dzwoniącego nadal będzie w pewnym momencie zakończona niepowodzeniem.
2 dla odpowiedzi № 8
Jestem dość zaskoczony, że nie jest to standard w Scali. Moje wersje są krótkie i nie ma zależności
import scala.concurrent.Future
sealed class TimeoutException extends RuntimeException
object FutureTimeout {
import scala.concurrent.ExecutionContext.Implicits.global
implicit class FutureTimeoutLike[T](f: Future[T]) {
def withTimeout(ms: Long): Future[T] = Future.firstCompletedOf(List(f, Future {
Thread.sleep(ms)
throw new TimeoutException
}))
lazy val withTimeout: Future[T] = withTimeout(2000) // default 2s timeout
}
}
Przykład użycia
import FutureTimeout._
Future { /* do smth */ } withTimeout
1 dla odpowiedzi № 9
Monix Task
ma limit czasu wsparcie
import monix.execution.Scheduler.Implicits.global
import monix.eval._
import scala.concurrent.duration._
import scala.concurrent.TimeoutException
val source = Task("Hello!").delayExecution(10.seconds)
// Triggers error if the source does not complete in 3 seconds after runOnComplete
val timedOut = source.timeout(3.seconds)
timedOut.runOnComplete(r => println(r))
//=> Failure(TimeoutException)
0 dla odpowiedzi № 10
Korzystam z tej wersji (na podstawie powyższego przykładu Play), która korzysta z dyspozytora systemu Akka:
object TimeoutFuture {
def apply[A](system: ActorSystem, timeout: FiniteDuration)(block: => A): Future[A] = {
implicit val executionContext = system.dispatcher
val prom = Promise[A]
// timeout logic
system.scheduler.scheduleOnce(timeout) {
prom tryFailure new java.util.concurrent.TimeoutException
}
// business logic
Future {
try {
prom success block
} catch {
case t: Throwable => prom tryFailure t
}
}
prom.future
}
}