Code

Future e promise in Scala

25 Agosto 2015 - 6 minuti di lettura

In questo primo articolo Luca Marcato spiega i costrutti future e promise per Scala, un linguaggio funzionale a oggetti che  permette di scrivere codice in maniera molto concisa, pulita e davvero intuitiva.

Costrutto future

Future è un costrutto usato per recuperare il risultato di un’operazione concorrente e che quindi potrebbe non essere ancora disponibile quando si cerca di accedervi; in pratica è una sorta di “placeholder”/segnaposto, creato per  un valore(solitamente il risultato di una computazione) che potrebbe diventare disponibile. A questo risultato ci si può accedere in maniera sincrona, e quindi bloccante (da evitare il più possibile), o asincrona e quindi non bloccante.

Si dice completato(“completed”) se ha prodotto un risultato che sia un valore o un’eccezione, altrimenti si dice non completato(“not completed”);  se è completato con un valore il future è completato con successo(“successfully completed”), se invece è completato con un’eccezione generata dalla computazione allora si dice fallito(“failed”)

Un future può essere assegnato una volta sola, dal momento in cui gli viene assegnato un valore o un’eccezione, diventa di fatto immutabile e non potrà essere sovrascritto.

Per creare un future possiamo utilizzare il metodo apply del “companion-object” che richiede due argomenti come input: body (il codice da eseguire nel future) e l’ ExecutionContext che è implicito:

object Future {
  def apply[T](body: => T)(implicit execctx: ExecutionContext): Future[T]
}

Execution Context

L’ ExecutionContext è il Thread-Pool dove sarà eseguito il processo; possiamo usare quello globale importando scala.concurrent.ExecutionContext.Implicits.global oppure usarne uno specifico.
Un ExecutionContext “ad hoc” è consigliato nel caso si debba eseguire del codice che richiede lunghi tempi di computazione o nel caso operazioni di I/O bloccanti.

Di seguito un esempio:

import java.util.concurrent.{Executors, ExecutorService}
import scala.concurrent.{Future, ExecutionContext, ExecutionContextExecutor}
object CustomExec {
  val threadNum = 4
  val threadPool:ExecutorService = Executors.newFixedThreadPool(threadNum)
  val execctx:ExecutionContextExecutor = ExecutionContext.fromExecutor(threadPool)
  val f = Future("Hello World")(execctx)
}

Accedere al risultato

Callback (asincrona)

import scala.concurrent.Future
import scala.util.{Random, Failure, Success}
import scala.concurrent.ExecutionContext.Implicits.global
val f = Future(lungaOperazione())
f.onComplete {
  case Success(x) => println(x)
  case Failure(e) => e.printStackTrace()
}
println("1");Thread.sleep(20)
println("2");Thread.sleep(20)
println("3");Thread.sleep(20)
println("4");Thread.sleep(20)
println("5");Thread.sleep(20)
Thread.sleep(120)
def lungaOperazione():Int = { Thread.sleep(new Random().nextInt(100)) 42 }
/**
* risultato:
*
* 1
* 2
* 42
* 3
* 4
* 5
*/

Nell’esempio ci siamo registrati alla callback utilizzando il metodo onComplete

def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit

Che riceve in ingresso una funzione del tipo Try[T] => U .
La callback viene applicata a un valore di tipo Success[T] se il future si completa con successo, altrimenti a un valore di tipo Failure[T].

Composizione di future

Supponiamo di dover applicare una funzione al risultato di più future in cascata (tratto dalla documentazione Scala):

val rateQuote = Future {
  connection.getCurrentValue(USD)
}
rateQuote onSuccess {
  quote =>
    val purchase = Future {
      if (isProfitable(quote)) connection.buy(amount, quote)
      else throw new Exception("not profitable")
    }
  purchase onSuccess {
    case _ => println("Purchased " + amount + " USD")
  }
}

Nell’esempio precedente si è creato il future rateQuote che restituisce il tasso di conversione corrente. Quando il server ha restituito il  tasso di conversione in dollari e il future si è completato con successo, applicando la callback si è recuperato il valore e si è applicata la logica per decidere se acquistare o meno utilizzando ancora una volta un future. A questo si è riapplicata di nuovo una callback per scrivere a monitor se è stato fatto l’acquisto.

…si può fare, ma non è proprio bello e comodo, ci sono però i combinators come map, flatMap, filter ecc che ci permettono riscrivere l’esempio precedente in maniera molto più elegante:

val rateQuote = Future {
  connection.getCurrentValue(USD)
}
val purchase = rateQuote map {
  case _ => 
    if (isProfitable(quote)) connection.buy(amount, quote)
    else throw new Exception("not profitable")
}
purchase onSuccess {
  case _ => println("Purchased " + amount + " USD")
}

recover / recoverWith / fallbackTo

Tra gli altri combinators, particolare attenzione per recover, recoverWith e fallbackTo

def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U]

Esempio:  
    future (6 / 0) recover { case e: ArithmeticException => 0 } // result: 0
    future (6 / 0) recover { case e: NotFoundException   => 0 } // result: exception
    future (6 / 2) recover { case e: ArithmeticException => 0 } // result: 3
  • recoverWith crea un nuovo future che gestisce ogni throwable del primo future che matcha (ArithmeticExpression nell’esempio); se non c’è match o il secondo future fallisce allora non farà nulla.
def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]])(implicit executor: ExecutionContext): Future[U]

Esempio: 
    val primoFuture = future (6 / 0)
    val secondoFuture = future { Int.MaxValue }
    primoFuture recoverWith { case e: ArithmeticException => secondoFuture } // result: Int.MaxValue
  • recoverWith crea un nuovo future che gestisce ogni throwable del primo future che matcha (ArithmeticExpression nell’esempio), assegnandogli il valore di un altro future; se non c’è match o il secondo future fallisce allora non farà nulla.
def fallbackTo[U >: T](that: Future[U]): Future[U]

Esempio: 
    primoFuture.fallbackTo(secondoFuture)
  • fallbackTo restituisce un future con il valore di primoFuture se si completa con successo; se primoFuture fallisce restituisce un future con il valore di secondoFuture; se anche secondoFuture dovesse fallire, fallbackTo restituirebbe un future con il throwable di primoFuture.

Projections

Abbiamo visto che si possono usare operazioni monadi, come map o flatMap, e come vengano applicate solo nel caso di successo del future; se invece volessimo utilizzare le stesse funzioni nel caso in cui qualcosa vada storto e venga sollevata un’eccezione, possiamo farlo chiamando il metodo failed del future che restituisce un Future[Throwable]:

def failed: Future[Throwable] = {
  implicit val ec = internalExecutor
  val p = Promise[Throwable]()
  onComplete {
    case Failure(t) => p success t
    case Success(v) => p failure (new NoSuchElementException("Future.failed not completed with a throwable."))
  }
  p.future
}

Blocking (sincrona)

Nonostante i future tendano, per loro natura, a essere asincroni, per migliorare le performance e per evitare deadlock, a volte, è comunque necessario bloccare l’esecuzione del thread; per fare ciò, possiamo bloccare il future dall’esterno oppure possiamo fargli eseguire del codice che lo blocchi dall’interno.

Supponiamo di dover fare un’operazione IO bloccante, per esempio la read con un socket Java:

val is = socket.getInputStream.()
Future{ is.read() }

In questo esempio viene notificato al thread pool che il future contiene delle operazioni potenzialmente lunghe o bloccanti e il thread pool fa in modo di creare temporaneamente nuovi thread di lavoro (workers) per fare in modo che non tutti i thread siano bloccati.

Un altro modo per bloccare l’esecuzione è usare l’oggetto Await; riprendiamo l’esempio usato per la composizione di future:

import scala.concurrent._
import scala.concurrent.duration._
val rateQuote = Future {
  connection.getCurrentValue(USD)
}
val purchase = rateQuote map {
  quote =>
    if (isProfitable(quote)) connection.buy(amount, quote)
    else throw new Exception("not profitable")
}
 purchase onSuccess {
  case _ => println("Purchased " + amount + " USD")
} //questa volta non usiamo l'onSuccess
Await.result(purchase, 10 seconds)

L’oggetto Await ci permette di assicurarci che purchase verrà eseguito dal thread chiamante obbligandolo ad aspettare che entrambi i future siano completati.

Il secondo parametro di result è il timeout; quando scade viene lanciata una TimeoutException.

Costrutto promise

A differenza del future, che è in sola lettura, la promise è un costrutto che permette di completare un future con un valore, usando il metodo success, o con un’eccezione, usando il metodo failure.

A ogni promise è associato un future a cui si può accedere chiamando il metodo future.

import scala.concurrent._
val promise = Promise[String]() //creiamo la promise specificandone il tipo
val future:Future[String] = promise.future //recuperiamo il future associato alla promise appena creata
promise.success("hurray!") //completiamo la promise con il valore "hurray!"

Completando la promise con il success, abbiamo anche completato con successo il future associato e la promise non è più scrivibile. A questo punto viene anche chiamato l’ onSuccess/onComplete associato al future.

Un esempio più completo:

import scala.concurrent._
import scala.util.{Random, Failure, Success}
def race[T](one: Future[T], two: Future[T]): Future[T] = {
  val p = Promise[T]()
  one onComplete { p tryComplete }
  two onComplete { p tryComplete }
  p.future
}
def getFuture(n:Int):Future[Int] = Future{
  Thread.sleep(new Random().nextInt(50))
  n/new Random().nextInt(2)
}
race[Int](getFuture(1),getFuture(2)).onComplete {
  case Success(x) => println(x)
  case Failure(e) => println(e.getMessage)
}
Thread.sleep(120)
Articolo scritto da