Code

A reactive programming example

29 Agosto 2018 - 3 minuti di lettura

As part of our guild, we spent a few hours to explore reactive programming and we would like to share with you a simple example we developed as an exercise.

Reactive programming is a paradigm focused on simplifying asincronicity through the observer pattern with some functional flavour. By representing data as streams, it can process in a convenient and uniform way both plain data structures and streams of asynchronous events.

In this example we will build a simple REPL in reactive style, in Kotlin with RxKotlin library (which in turn works on top of RxJava, providing convenient extension functions). You can find the full code on github: https://github.com/intresrl/RxKotlin-REPL.

The REPL is an Observable object implemented as a simple loop that waits for commands to be issued and then notifies them to every observer subscribed.

private val emitterFunction = { emitter: Emitter<String> ->
    var shouldAskForCommand = true
    while (shouldAskForCommand) {
        print(PROMPT_STRING)
        val command = readLine()!!
        when (command) {
            "exit" -> shouldAskForCommand = false
            else -> emitter.onNext(command)
        }
    }
    emitter.onComplete()
}

The whole snippet is wrapped into a function that has the emitter as parameter: the emitter is a Rx object used to notify every observer of a new event through the onNext method.

All the observers are wrapped into an Interpreter class, registered to the observable with its observe method

fun main(args: Array<String>) {
    val observable = Observable.create(emitterFunction).publish()
    Interpreter().observe(observable)
}

An Interpreter registers some command handlers that will observe the command stream and react when needed.

class Interpreter {
    fun observe(observable: ConnectableObservable<String>) {
       
       // register observers here

        observable.connect()
    }
}

The command stream can be modified and filtered in a functional way for each subscription so that only the relevant messages are dispatched to the subscriber.

The first handler just echoes back every non blank command issued

// echo everything
observable
        .filter { it.isNotBlank() }
        .subscribe(::echo, ::error, ::end)

Subscribe method accepts functions to be executed on every event, on error and when the stream ends.

private fun echo(it: String?) {
    println(" & $it")
}

private fun error(it: Throwable) {
    it.printStackTrace()
}

private fun end() {
    println("Done!")
}

A second handler keeps an history of issued commands, filtering some of them and removing duplicates

// censored history (everything ending with "ck" is a bad word)
observable
        .filter { !it.toLowerCase().endsWith("ck") }
        .distinctUntilChanged()
        .subscribe(historian(), ::error)
private fun historian(): (String) -> Unit {
    val history = mutableListOf<String>()
    return {
        when (it) {
            "history", "hist" -> println(" Previous commands: $history")
            else -> history.add(it)
        }
    }
}

The last observer filters arithmetic operations and prints out the result.

// execute arithmetic operations
val jsEngine = ScriptEngineManager().getEngineByName("js")
observable
        .filter { it.matches(Regex("(\\s*[-+]?\\d+\\s*[-+*/])*\\s*[-+]?\\d+\\s*")) }
        .map { jsEngine.eval(it) }
        .subscribe { println(" = $it") }

REPL in action

Here is a simple demo of the REPL in action:

Closing remarks

We think that one advantage of this approach is that it decouples how and when data is provided from how it should be elaborated. In this way we could reuse our business logic, i.e. the Interpreter, in another context, for example while reading a list of commands to be executed from a script file as in the FileRunner example:

fun main(args: Array<String>) {
    File("commands.txt").useLines {
        sequence -> Interpreter().observe(sequence.toObservable().publish())
    }
}

In conclusion I believe it would be hard to rewrite this code without using the reactive approach and still managing both synchronous and asynchronous scenarios in such a simple way. And I must say that I find very expressive yet concise the functional stream manipulation syntax.

Articolo scritto da