/ / Scala Futures - wbudowany limit czasu? - Scala, współbieżność

Scala Futures - wbudowany limit czasu? - scala, współbieżność

jest aspekt przyszłości, którego nie rozumiem dokładnie z oficjalnego samouczka. http://docs.scala-lang.org/overviews/core/futures.html

Czy kontrakty futures na scali mają wbudowany limit czasujakiś mechanizm? Powiedzmy, że poniższy przykład był 5-gigabajtowym plikiem tekstowym ... czy domyślny zakres „Implicits.global” ostatecznie powoduje, że onFailure uruchamia się w sposób nieblokujący, czy można to zdefiniować? I bez domyślnego limitu czasu pewnego rodzaju, czyż nie oznaczałoby to, że ani sukces, ani porażka nigdy nie wystrzelą?

import scala.concurrent._
import ExecutionContext.Implicits.global

val firstOccurence: Future[Int] = future {
val source = scala.io.Source.fromFile("myText.txt")
source.toSeq.indexOfSlice("myKeyword")
}
firstOccurence onSuccess {
case idx => println("The keyword first appears at position: " + idx)
}
firstOccurence onFailure {
case t => println("Could not process file: " + t.getMessage)
}

Odpowiedzi:

63 dla odpowiedzi № 1

Limit czasu pojawia się tylko wtedy, gdy używasz blokowania, aby uzyskać wyniki Future. Jeśli chcesz użyć nieblokujących wywołań zwrotnych onComplete, onSuccess lub onFailure, wtedy musisz wprowadzić własne limity czasu. Akka ma wbudowaną obsługę limitów czasu dla żądania / odpowiedzi (?) przesyłanie wiadomości między aktorami, ale nie wiadomo, czy chcesz zacząć korzystać z Akka. FWIW, w Akka, do obsługi przekroczenia limitu czasu, składają się z dwóch Futures razem przez Future.firstCompletedOf, który reprezentuje faktyczne zadanie asynchroniczne i ten, który reprezentuje limit czasu. Jeśli upłynął limit czasu (za pomocą HashedWheelTimer) wyskakuje pierwszy, pojawia się błąd przy wywołaniu zwrotnym asynchronicznym.

Bardzo uproszczony przykład tworzenia własnych może wyglądać mniej więcej tak. Po pierwsze, obiekt do planowania limitów czasu:

import org.jboss.netty.util.{HashedWheelTimer, TimerTask, Timeout}
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
import scala.concurrent.Promise
import java.util.concurrent.TimeoutException

object TimeoutScheduler{
val timer = new HashedWheelTimer(10, TimeUnit.MILLISECONDS)
def scheduleTimeout(promise:Promise[_], after:Duration) = {
timer.newTimeout(new TimerTask{
def run(timeout:Timeout){
promise.failure(new TimeoutException("Operation timed out after " + after.toMillis + " millis"))
}
}, after.toNanos, TimeUnit.NANOSECONDS)
}
}

Następnie funkcja wzięcia przyszłości i dodania do niej zachowania związanego z przekroczeniem limitu czasu:

import scala.concurrent.{Future, ExecutionContext, Promise}
import scala.concurrent.duration.Duration

def withTimeout[T](fut:Future[T])(implicit ec:ExecutionContext, after:Duration) = {
val prom = Promise[T]()
val timeout = TimeoutScheduler.scheduleTimeout(prom, after)
val combinedFut = Future.firstCompletedOf(List(fut, prom.future))
fut onComplete{case result => timeout.cancel()}
combinedFut
}

Zauważ, że HashedWheelTimer Używam tutaj jest z Netty.


21 dla odpowiedzi nr 2

Właśnie stworzyłem TimeoutFuture klasa dla współpracownika:

TimeoutFuture

package model

import scala.concurrent._
import scala.concurrent.duration._
import play.libs.Akka
import play.api.libs.concurrent.Execution.Implicits._

object TimeoutFuture {
def apply[A](timeout: FiniteDuration)(block: => A): Future[A] = {

val prom = promise[A]

// timeout logic
Akka.system.scheduler.scheduleOnce(timeout) {
prom tryFailure new java.util.concurrent.TimeoutException
}

// business logic
Future {
prom success block
}

prom.future
}
}

Stosowanie

val future = TimeoutFuture(10 seconds) {
// do stuff here
}

future onComplete {
case Success(stuff) => // use "stuff"
case Failure(exception) => // catch exception (either TimeoutException or an exception inside the given block)
}

Uwagi:

  • Zakłada się grać! framework (ale łatwo go dostosować)
  • Każdy fragment kodu działa w ten sam sposób ExecutionContext co może nie być idealne.

12 dla odpowiedzi nr 3

Wszystkie te odpowiedzi wymagają dodatkowych zależności. Postanowiłem napisać wersję przy użyciu java.util.Timer, która jest wydajnym sposobem na uruchomienie funkcji w przyszłości, w tym przypadku na wyzwolenie limitu czasu.

Wpis na blogu z dalszymi szczegółami tutaj

Używając tego z Obietnicą Scali, możemy stworzyć Przyszłość z limitem czasu w następujący sposób:

package justinhj.concurrency

import java.util.concurrent.TimeoutException
import java.util.{Timer, TimerTask}

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.language.postfixOps

object FutureUtil {

// All Future"s that use futureWithTimeout will use the same Timer object
// it is thread safe and scales to thousands of active timers
// The true parameter ensures that timeout timers are daemon threads and do not stop
// the program from shutting down

val timer: Timer = new Timer(true)

/**
* Returns the result of the provided future within the given time or a timeout exception, whichever is first
* This uses Java Timer which runs a single thread to handle all futureWithTimeouts and does not block like a
* Thread.sleep would
* @param future Caller passes a future to execute
* @param timeout Time before we return a Timeout exception instead of future"s outcome
* @return Future[T]
*/
def futureWithTimeout[T](future : Future[T], timeout : FiniteDuration)(implicit ec: ExecutionContext): Future[T] = {

// Promise will be fulfilled with either the callers Future or the timer task if it times out
val p = Promise[T]

// and a Timer task to handle timing out

val timerTask = new TimerTask() {
def run() : Unit = {
p.tryFailure(new TimeoutException())
}
}

// Set the timeout to check in the future
timer.schedule(timerTask, timeout.toMillis)

future.map {
a =>
if(p.trySuccess(a)) {
timerTask.cancel()
}
}
.recover {
case e: Exception =>
if(p.tryFailure(e)) {
timerTask.cancel()
}
}

p.future
}

}

5 dla odpowiedzi № 4

Struktura Play zawiera Promise.timeout, dzięki czemu możesz pisać kod w następujący sposób

private def get(): Future[Option[Boolean]] = {
val timeoutFuture = Promise.timeout(None, Duration("1s"))
val mayBeHaveData = Future{
// do something
Some(true)
}

// if timeout occurred then None will be result of method
Future.firstCompletedOf(List(mayBeHaveData, timeoutFuture))
}

3 dla odpowiedzi № 5

Możesz określić limit czasu oczekiwania na przyszłość:

Dla scala.concurrent.Future, result Metoda pozwala określić limit czasu.

Dla scala.actors.Future, Futures.awaitAll pozwala określić limit czasu.

Nie wydaje mi się, żeby w Future miał wbudowany limit czasu.


3 dla odpowiedzi № 6

Nikt nie jest wspomniany akka-streamsjeszcze Przepływy są łatwe completionTimeout i zastosowanie tego w strumieniu z jednego źródła działa jak przyszłość.

Ale akka-strumienie również anuluje, więc może faktycznie zakończyć działanie źródła, tj. Sygnalizuje przekroczenie limitu czasu dla źródła.


3 dla odpowiedzi № 7

Jeśli chcesz, aby pisarz (posiadacz obietnicy) był tym, który kontroluje logikę limitu czasu, użyj akka.pattern.after, w następujący sposób:

val timeout = akka.pattern.after(10 seconds, system.scheduler)(Future.failed(new TimeoutException(s"timed out during...")))
Future.firstCompletedOf(Seq(promiseRef.future, timeout))

W ten sposób, jeśli logika realizacji obietnicy nigdy się nie wydarzy, przyszłość dzwoniącego nadal będzie w pewnym momencie zakończona niepowodzeniem.


2 dla odpowiedzi № 8

Jestem dość zaskoczony, że nie jest to standard w Scali. Moje wersje są krótkie i nie ma zależności

import scala.concurrent.Future

sealed class TimeoutException extends RuntimeException

object FutureTimeout {

import scala.concurrent.ExecutionContext.Implicits.global

implicit class FutureTimeoutLike[T](f: Future[T]) {
def withTimeout(ms: Long): Future[T] = Future.firstCompletedOf(List(f, Future {
Thread.sleep(ms)
throw new TimeoutException
}))

lazy val withTimeout: Future[T] = withTimeout(2000) // default 2s timeout
}

}

Przykład użycia

import FutureTimeout._
Future { /* do smth */ } withTimeout

1 dla odpowiedzi № 9

Monix Task ma limit czasu wsparcie

import monix.execution.Scheduler.Implicits.global
import monix.eval._
import scala.concurrent.duration._
import scala.concurrent.TimeoutException

val source = Task("Hello!").delayExecution(10.seconds)

// Triggers error if the source does not complete in 3 seconds after runOnComplete
val timedOut = source.timeout(3.seconds)

timedOut.runOnComplete(r => println(r))
//=> Failure(TimeoutException)

0 dla odpowiedzi № 10

Korzystam z tej wersji (na podstawie powyższego przykładu Play), która korzysta z dyspozytora systemu Akka:

object TimeoutFuture {
def apply[A](system: ActorSystem, timeout: FiniteDuration)(block: => A): Future[A] = {
implicit val executionContext = system.dispatcher

val prom = Promise[A]

// timeout logic
system.scheduler.scheduleOnce(timeout) {
prom tryFailure new java.util.concurrent.TimeoutException
}

// business logic
Future {
try {
prom success block
} catch {
case t: Throwable => prom tryFailure t
}
}

prom.future
}
}