Learn

Reactive Programming: dalle origini alle librerie Reactor e RxJava

3 Dicembre 2020 - 7 minuti di lettura
Reactive programming [1] è un paradigma di programmazione che promuove un approccio asincrono, non bloccante ed event-driven all’elaborazione dei dati.
E’ basato sul modello publish/subscribe di stream di dati, con due principali protagonisti: il Publisher per la pubblicazione di dati e il Subscriber per la loro elaborazione in modo asincrono.
Potremmo quindi sintetizzare il tutto come  programmazione con data stream asincroni.
Inizieremo l’articolo spiegandovi il significato di programmazione asincrona per poi passare più nel dettaglio agli stream asincroni, per finire con un approfondimento della libreria WebFlux.

Programmazione asincrona

Dal monotasking al multitasking

Qualcuno probabilmente si ricorderà degli Intel 8088, processori monotasking che potevano eseguire solamente un processo alla volta. Non erano sicuramente i più evoluti all’epoca ma erano decisamente i più abbordabili.
Con il termine monotasking si intende l’esistenza di un solo processo attivo sulla CPU ed eventuali altri processi in attesa che il primo termini prima di poter partire. Questo implica che se il primo processo deve attendere operazioni di I/O, in genere notevolmente più lente, la CPU è di fatto inutilizzata anche se altri processi sono pronti per essere eseguiti.
Gli 8088 vennero soppiantati ben presto dai 286 che introdussero il concetto di multitasking, ovvero la modalità nella quale se un processo è in attesa un altro processo può essere eseguito. Arrivarono poi i thread e i core iniziarono a moltiplicarsi… il resto è storia.
In estrema sintesi il multitasking gestisce l’avvicendamento di più processi sulla stessa CPU e il multithreading gestisce l’avvicendarsi dei thread nel singolo processo. Il discorso sarebbe ben più complesso, ma per ora ci basta questo.

Il multithreading

Il problema è sempre stato quello di sfruttare a pieno la capacità di calcolo e quindi fare in modo che, quando un’operazione è bloccata in attesa di I/O, possa essere eseguita un’altra operazione.
In molti casi il multithreading è più che sufficiente, ma il numero di thread che si possono creare è comunque limitato e la gestione di molti thread può diventare costosa in termini di risorse, in particolare di memoria. Nel caso ad esempio di un web server, un modello molto comune prevede di assegnare ogni richiesta ad un thread preso da un pool, ed il thread è reso di nuovo disponibile per gestirne di nuove quando la richiesta che sta servendo è terminata. Tipicamente gran parte del tempo speso dal thread per rispondere è speso in realtà in attesa di operazioni di I/O. Un server ad elevato throughput, che deve gestire migliaia di richieste contemporanee ha quindi il problema legato alle risorse: il numero dedicato alla gestione dei thread potrebbe limitare significativamente le capacità.
Se i thread diventano una risorsa scarsa, allora si può pensare di sfruttare la stessa idea, vale a dire permettere a più operazioni di alternarsi sul singolo thread. L’aynchronous programming si basa appunto sull’idea di non bloccare un thread quando un’operazione ha bisogno di sospendersi in attesa di I/O, ed eseguire sullo stesso thread altre operazioni.

Asynchronous programming

Dal punto di vista della programmazione il modello asincrono può risultare più complicato del modello multithread. Nel modello multithread è responsabilità del sistema operativo gestire il context switching, ovvero l’avvicendamento dei thread. Il programmatore può considerare i diversi thread come eseguiti in parallelo e le operazioni, eseguite in un singolo thread, come eseguite sequenzialmente dove ogni istruzione è bloccante ed attende che la precedente sia terminata per essere eseguita.
Nel modello asincrono invece le operazioni possono essere non-bloccanti, vale a dire che ritornano immediatamente il controllo anche se non sono terminate e quindi l’istruzione successiva può essere eseguita anche se la precedente non ha ancora prodotto un risultato. Nel modello asincrono è responsabilità anche del programmatore gestire la sospensione e l’avvicendamento delle operazioni e tener conto del fatto che il risultato di un’operazione arriverà in un secondo tempo mentre l’esecuzione nel frattempo può andare avanti.
Fortunatamente stanno emergendo sempre più framework e strumenti che semplificano l’implementazione del modello asincrono.

Stream asincroni

Gli stream reactive definiscono un Publisher, che pubblica i dati ed un Subscriber, che si sottoscrive li riceve ed elabora non appena vengono pubblicati.
Rispetto ai normali stream, quelli asincroni possono pubblicare i dati mano a mano questi si rendano disponibili e non ci sono limiti alle pubblicazioni. La ricezione di un dato da parte del Subscriber è quindi un evento asincrono.
Il Subscriber ha comunque la possibilità di limitare la ricezione dei dati nel caso siano pubblicati con una frequenza troppo elevata applicando la funzionalità cosiddetta di back pressure.

Reactor e RxJava

Le due librerie più aggiornate che implementano il concetto di reactive-stream sono:
  • RxJava
  • Reactor

RxJava [2] si basa, per la sua implementazione, su concetti (Design Pattern) quali Observer, Subject e Scheduler mentre Reactor, anche perché più recente, implementa la specifica degli stream presente nella versione 9 del JDK [3].

Entrambe le librerie implementano il “Reactor Manifesto” che prevede i seguenti quattro princìpi.

Reattivo (Response)

Il sistema risponde tempestivamente, se possibile. La reattività è la pietra angolare dell’usabilità e dell’utilità ma soprattutto sta ad indicare che i problemi possono essere rilevati rapidamente e trattati in modo efficace. I sistemi reattivi si concentrano sulla fornitura di tempi di risposta rapidi e coerenti, stabilendo limiti superiori affidabili in modo da fornire una qualità costante del servizio. Questo comportamento coerente a sua volta semplifica la gestione degli errori, aumenta la fiducia dell’utente finale e incoraggia ulteriori interazioni.

Resiliente (Resilient)

Il sistema rimane reattivo di fronte ai guasti. Ciò si applica non solo ai sistemi altamente disponibili e di importanza critica: qualsiasi sistema non resiliente non risponderà dopo un guasto. La resilienza è ottenuta mediante replica, contenimento, isolamento e delega. Gli errori sono contenuti all’interno di ciascun componente, isolando i componenti l’uno dall’altro e garantendo così che parti del sistema possano guastarsi e ripristinarsi senza compromettere il sistema nel suo insieme. Il recupero di ciascun componente è delegato ad un altro componente (esterno) e l’alta disponibilità è garantita dalla replica ove necessario. Il client di un componente non è gravato dalla gestione dei suoi errori.

Elastico (Elastic)

Il sistema rimane reattivo con carichi di lavoro variabili. I sistemi reattivi possono reagire alle variazioni della velocità di input aumentando o diminuendo le risorse allocate per servire questi input. Ciò implica progetti che non hanno punti di contesa o colli di bottiglia centrali, con la possibilità di frammentare o replicare componenti e distribuire input tra di loro. I sistemi reattivi supportano algoritmi di ridimensionamento predittivi e reattivi fornendo misure di performance dal vivo pertinenti. Raggiungono l’elasticità in modo economico su piattaforme hardware e software di largo consumo.

Messaggi (Message Driven)

I sistemi reattivi si basano sul passaggio di messaggi asincroni per stabilire un confine tra i componenti che garantisce accoppiamento libero, isolamento e trasparenza della posizione. Questo limite fornisce anche i mezzi per delegare gli errori come messaggi. L’impiego del passaggio esplicito dei messaggi consente la gestione del carico, l’elasticità e il controllo del flusso modellando e monitorando le code dei messaggi nel sistema e applicando la contropressione quando necessario. La messaggistica trasparente della posizione come mezzo di comunicazione consente alla gestione del fallimento di funzionare con gli stessi costrutti e la stessa semantica in un cluster o all’interno di un singolo host. La comunicazione non bloccante consente ai destinatari di consumare risorse solo quando sono attivi, riducendo il sovraccarico del sistema.

Libreria Reactor

Alla base di questa libreria [4] ci sono i Publisher che producono dei dati e i Subscriber che li consumano. I primi non emettono nulla fino a quando un Subscriber non si sottoscrive, come mostrato nell’immagine seguente.
Reactor aggiunge il concetto di operatori, che sono concatenati per descrivere quale elaborazione applicare in ogni fase ai dati. L’applicazione di un operatore restituisce un nuovo Publisher (in effetti può essere considerato sia un Subscriber per l’operatore a monte che un Publisher per l’operatore a valle). La forma finale dei dati finisce nel Subscriber finale che definisce cosa fare dal punto di vista dell’utente.

Come iniziare a utilizzare Reactor

Innanzitutto bisogna includerlo tra le dipendenze del nostro progetto. Di seguito un esempio per Maven:
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.0.5.RELEASE</version>
</dependency>

Mono & Flux

Reactor offre due diversi tipi di dato: Mono e Flux.
Flux può emettere da 0 a N elementi mentre Mono può emettere uno stream che va da 0 ad 1 elemento.

Un esempio di inizializzazione di Flux è:

Flux just = Flux.just("1", "2", "3");
In modo analogo l’inizializzazione di Mono è:
Mono<String> just = Mono.just("foo");
Per emettere gli elementi è necessario sottoscriversi allo stream.
Per poter raccogliere tutti gli elementi in uno stream:
List<Integer> elements = new ArrayList<>();
Flux.just(1, 2, 3, 4)
  .log()
  .subscribe(elements::add);

Panoramica metodi

Reactor mette a disposizione alcuni metodi per poter lavorare con Flux e Mono.
Vediamoli con alcuni esempi a corredo.

Input

Flux<Integer> evenNumbers = Flux
  .range(min, max)
  .filter(x -> x % 2 == 0); // i.e. 2, 4

Flux<Integer> oddNumbers = Flux
  .range(min, max)
  .filter(x -> x % 2 > 0);  // ie. 1, 3, 5

concat()

Questo metodo produce un nuovo Flux che avrà inizialmente tutti gli elementi del primo Flux e poi quelli del secondo Flux.
Flux<Integer> fluxOfIntegers = Flux.concat(
      evenNumbers,
      oddNumbers);

combineLatest()

Questo metodo aggrega due Flux associando ad ogni elemento di un Flux l’ultimo elemento disponibile dell’altro Flux.
Flux<Integer> fluxOfIntegers = Flux.combineLatest(
  evenNumbers,
    oddNumbers,
    (a, b) -> a + b);

merge()

Il metodo merge unisce i dati di due Flux nell’ordine in cui vengono emessi.
Flux<Integer> fluxOfIntegers = Flux.merge(
      evenNumbers, 
      oddNumbers);

zip()

Questo metodo aggrega due o più Flux associando ad ogni elemento del primo Flux il rispettivo del secondo Flux.
Flux<Integer> fluxOfIntegers = Flux.zip(
      evenNumbers, 
      oddNumbers, 
      (a, b) -> a + b);

Articolo scritto da