In questo primo articolo Luca Marcato spiega i costrutti future e promise per Scala, un linguaggio funzionale a oggetti che permette di scrivere codice in maniera molto concisa, pulita e davvero intuitiva.
Costrutto future
Future è un costrutto usato per recuperare il risultato di un’operazione concorrente e che quindi potrebbe non essere ancora disponibile quando si cerca di accedervi; in pratica è una sorta di “placeholder”/segnaposto, creato per un valore(solitamente il risultato di una computazione) che potrebbe diventare disponibile. A questo risultato ci si può accedere in maniera sincrona, e quindi bloccante (da evitare il più possibile), o asincrona e quindi non bloccante.
Si dice completato(“completed”) se ha prodotto un risultato che sia un valore o un’eccezione, altrimenti si dice non completato(“not completed”); se è completato con un valore il future è completato con successo(“successfully completed”), se invece è completato con un’eccezione generata dalla computazione allora si dice fallito(“failed”)
Un future può essere assegnato una volta sola, dal momento in cui gli viene assegnato un valore o un’eccezione, diventa di fatto immutabile e non potrà essere sovrascritto.
Per creare un future possiamo utilizzare il metodo apply
del “companion-object” che richiede due argomenti come input: body
(il codice da eseguire nel future) e l’ ExecutionContext
che è implicito:
object Future { def apply[T](body: => T)(implicit execctx: ExecutionContext): Future[T] }
Execution Context
L’ ExecutionContext è il Thread-Pool dove sarà eseguito il processo; possiamo usare quello globale importando scala.concurrent.ExecutionContext.Implicits.global
oppure usarne uno specifico.
Un ExecutionContext
“ad hoc” è consigliato nel caso si debba eseguire del codice che richiede lunghi tempi di computazione o nel caso operazioni di I/O bloccanti.
Di seguito un esempio:
import java.util.concurrent.{Executors, ExecutorService} import scala.concurrent.{Future, ExecutionContext, ExecutionContextExecutor} object CustomExec { val threadNum = 4 val threadPool:ExecutorService = Executors.newFixedThreadPool(threadNum) val execctx:ExecutionContextExecutor = ExecutionContext.fromExecutor(threadPool) val f = Future("Hello World")(execctx) }
Accedere al risultato
Callback (asincrona)
import scala.concurrent.Future import scala.util.{Random, Failure, Success} import scala.concurrent.ExecutionContext.Implicits.global val f = Future(lungaOperazione()) f.onComplete { case Success(x) => println(x) case Failure(e) => e.printStackTrace() } println("1");Thread.sleep(20) println("2");Thread.sleep(20) println("3");Thread.sleep(20) println("4");Thread.sleep(20) println("5");Thread.sleep(20) Thread.sleep(120) def lungaOperazione():Int = { Thread.sleep(new Random().nextInt(100)) 42 } /** * risultato: * * 1 * 2 * 42 * 3 * 4 * 5 */
Nell’esempio ci siamo registrati alla callback utilizzando il metodo onComplete
def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit
Che riceve in ingresso una funzione del tipo Try[T] => U
.
La callback viene applicata a un valore di tipo Success[T]
se il future si completa con successo, altrimenti a un valore di tipo Failure[T]
.
Composizione di future
Supponiamo di dover applicare una funzione al risultato di più future in cascata (tratto dalla documentazione Scala):
val rateQuote = Future { connection.getCurrentValue(USD) } rateQuote onSuccess { quote => val purchase = Future { if (isProfitable(quote)) connection.buy(amount, quote) else throw new Exception("not profitable") } purchase onSuccess { case _ => println("Purchased " + amount + " USD") } }
Nell’esempio precedente si è creato il future rateQuote
che restituisce il tasso di conversione corrente. Quando il server ha restituito il tasso di conversione in dollari e il future si è completato con successo, applicando la callback si è recuperato il valore e si è applicata la logica per decidere se acquistare o meno utilizzando ancora una volta un future. A questo si è riapplicata di nuovo una callback per scrivere a monitor se è stato fatto l’acquisto.
…si può fare, ma non è proprio bello e comodo, ci sono però i combinators come map
, flatMap
, filter
ecc che ci permettono riscrivere l’esempio precedente in maniera molto più elegante:
val rateQuote = Future { connection.getCurrentValue(USD) } val purchase = rateQuote map { case _ => if (isProfitable(quote)) connection.buy(amount, quote) else throw new Exception("not profitable") } purchase onSuccess { case _ => println("Purchased " + amount + " USD") }
recover / recoverWith / fallbackTo
Tra gli altri combinators, particolare attenzione per recover, recoverWith
e fallbackTo
def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U] Esempio: future (6 / 0) recover { case e: ArithmeticException => 0 } // result: 0 future (6 / 0) recover { case e: NotFoundException => 0 } // result: exception future (6 / 2) recover { case e: ArithmeticException => 0 } // result: 3
recoverWith
crea un nuovo future che gestisce ognithrowable
del primo future che matcha (ArithmeticExpression nell’esempio); se non c’è match o il secondo future fallisce allora non farà nulla.
def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]])(implicit executor: ExecutionContext): Future[U] Esempio: val primoFuture = future (6 / 0) val secondoFuture = future { Int.MaxValue } primoFuture recoverWith { case e: ArithmeticException => secondoFuture } // result: Int.MaxValue
recoverWith
crea un nuovo future che gestisce ognithrowable
del primo future che matcha (ArithmeticExpression nell’esempio), assegnandogli il valore di un altro future; se non c’è match o il secondo future fallisce allora non farà nulla.
def fallbackTo[U >: T](that: Future[U]): Future[U] Esempio: primoFuture.fallbackTo(secondoFuture)
fallbackTo
restituisce un future con il valore diprimoFuture
se si completa con successo; seprimoFuture
fallisce restituisce un future con il valore disecondoFuture
; se anchesecondoFuture
dovesse fallire, fallbackTo restituirebbe un future con ilthrowable
diprimoFuture
.
Projections
Abbiamo visto che si possono usare operazioni monadi, come map
o flatMap
, e come vengano applicate solo nel caso di successo del future; se invece volessimo utilizzare le stesse funzioni nel caso in cui qualcosa vada storto e venga sollevata un’eccezione, possiamo farlo chiamando il metodo failed
del future che restituisce un Future[Throwable]
:
def failed: Future[Throwable] = { implicit val ec = internalExecutor val p = Promise[Throwable]() onComplete { case Failure(t) => p success t case Success(v) => p failure (new NoSuchElementException("Future.failed not completed with a throwable.")) } p.future }
Blocking (sincrona)
Nonostante i future tendano, per loro natura, a essere asincroni, per migliorare le performance e per evitare deadlock, a volte, è comunque necessario bloccare l’esecuzione del thread; per fare ciò, possiamo bloccare il future dall’esterno oppure possiamo fargli eseguire del codice che lo blocchi dall’interno.
Supponiamo di dover fare un’operazione IO bloccante, per esempio la read con un socket Java:
val is = socket.getInputStream.() Future{ is.read() }
In questo esempio viene notificato al thread pool che il future contiene delle operazioni potenzialmente lunghe o bloccanti e il thread pool fa in modo di creare temporaneamente nuovi thread di lavoro (workers) per fare in modo che non tutti i thread siano bloccati.
Un altro modo per bloccare l’esecuzione è usare l’oggetto Await
; riprendiamo l’esempio usato per la composizione di future:
import scala.concurrent._ import scala.concurrent.duration._ val rateQuote = Future { connection.getCurrentValue(USD) } val purchase = rateQuote map { quote => if (isProfitable(quote)) connection.buy(amount, quote) else throw new Exception("not profitable") }purchase onSuccess { case _ => println("Purchased " + amount + " USD") }//questa volta non usiamo l'onSuccess Await.result(purchase, 10 seconds)
L’oggetto Await
ci permette di assicurarci che purchase verrà eseguito dal thread chiamante obbligandolo ad aspettare che entrambi i future siano completati.
Il secondo parametro di result è il timeout; quando scade viene lanciata una TimeoutException
.
Costrutto promise
A differenza del future, che è in sola lettura, la promise è un costrutto che permette di completare un future con un valore, usando il metodo success
, o con un’eccezione, usando il metodo failure
.
A ogni promise è associato un future a cui si può accedere chiamando il metodo future
.
import scala.concurrent._ val promise = Promise[String]() //creiamo la promise specificandone il tipo val future:Future[String] = promise.future //recuperiamo il future associato alla promise appena creata promise.success("hurray!") //completiamo la promise con il valore "hurray!"
Completando la promise con il success
, abbiamo anche completato con successo il future associato e la promise non è più scrivibile. A questo punto viene anche chiamato l’ onSuccess
/onComplete
associato al future.
Un esempio più completo:
import scala.concurrent._ import scala.util.{Random, Failure, Success} def race[T](one: Future[T], two: Future[T]): Future[T] = { val p = Promise[T]() one onComplete { p tryComplete } two onComplete { p tryComplete } p.future } def getFuture(n:Int):Future[Int] = Future{ Thread.sleep(new Random().nextInt(50)) n/new Random().nextInt(2) } race[Int](getFuture(1),getFuture(2)).onComplete { case Success(x) => println(x) case Failure(e) => println(e.getMessage) } Thread.sleep(120)