Multi-Threading Like a Boss in Android With RxJava 2
If you’re new to RxJava or have been doing it for a while now, you will always have something new to learn.
By Aritra Roy
If you’re new to RxJava or have been doing it for a while now, you will always have something new to learn. We at GO-JEK need to perform a lot of asynchronous operations in our apps and we can’t afford to compromise on the speed and fluidity of the UI.
Writing heavily multi-threaded Android apps can be quite difficult and overwhelming since there are lot of moving parts to take care of. This, along with many other reasons convinced us to make significant use of RxJava in our Android apps.
In this article, we will talk about how we leveraged the true multi-threading capabilities of RxJava making complex app development processes simple, easy and fun again. We will be focussing on RxJava 2 for all the code samples in this article, but the concepts should also be applicable to you on any other flavor of Reactive Extensions.
Why Reactive Programming?
Every article ever written on RxJava starts with an obligatory “why reactive programming” section, and we won’t break the trend either. There are several benefits of taking the reactive approach of building apps in Android, but let’s talk a bit about the ones you really should care about.
No More Callback Hell
Even if you‘ve been doing Android development only for some time, you might have noticed how quickly things go out of hand with nested callbacks.
This happens when you are performing several asynchronous operations in a sequence and want individual actions to depend on the results of the previous operation. In almost no time, the code becomes super-ugly and difficult to manage.
Simple Error Handling
In the real world, when you’re performing a lot of complex, asynchronous operations, errors can happen at any place. This means you’ll need to put a lot of patchy code to handle sudden scenarios, resulting in a lot of repetitive and cumbersome code.
Super Easy Multi-Threading
We all know (and secretly admit) how difficult multi-threading can get in Java. Executing a piece of code in a background thread and getting results back in the UI thread, might sound easy, but in reality, there are a lot of tricky situations to take care of.
RxJava makes it insanely easy to perform several complex operations in any thread of your choice, maintaining proper synchronization and letting you switch threads seamlessly.
The benefits of RxJava is endless. We can talk about it for hours and bore you, instead, let’s dig deeper and start exploring the true multi-threading prowess it brings to the table.
RxJava is NOT Multi-Threaded by Default
Yes, you read it right. RxJava, by default, is not multi-threaded in any way. The definition given for RxJava on their official website is as follows:
A library for composing asynchronous and event-based programs using observable sequences for the Java VM.
From the word “asynchronous”, many tend to form a grave misconception that RxJava is multi-threaded by default. Yes, it does support multi-threading and offers a lot of powerful features to perform asynchronous operations with ease, but that doesn’t necessarily mean the default behavior is multi-threaded.
Assuming you have worked a bit on RxJava, you would know what the basic construct is -
- A source Observable, followed by,
- One or more Operators, followed by,
- A target Subscriber
If you run the snippet above, you can clearly see all the executions take place in the main thread of the application (keep an eye on the logs printed with the thread name). This shows by default, RxJava is blocking. Everything executes entirely on the thread that the code it runs on.
Bonus: Wondering what this doOnNext()
thingy is? It’s nothing but a side-effect operator that lets you move out of an observable chain and perform impure operations. You can read more about it here.
Let’s Do Some Simple Multi-Threading
If you want to do some basic multi-threading in Android using RxJava, all you need to do is have a bit of familiarity with the Schedulers and the observeOn/subscribeOn operators and you are good to go.
Now, let us have a look at one of the simplest multi-threading use cases first. Suppose, we want to fetch a list of Book
objects from the network and show them as a list in the UI thread of our application. A pretty common and straight-forward use case to start with.
Here, we have a getBooks()
method that makes a network call and fetches a list of books for us. Network calls take time (a few milliseconds to a few seconds) because of which we are using subscribeOn()
and specifying the Schedulers.io()
Scheduler to perform the operation in the I/O thread.
We are also using the observeOn()
operator along with the AndroidSchedulers.mainThread()
Scheduler to consume the result in the main thread and to populate the books in the UI of our application. This is something you might already know or have done before.
Don’t worry, we’ll get into more advanced stuff soon. This was just to ensure we’re all on the same page and have the basic idea fleshed out before diving deeper.
Befriending The Schedulers
The main threading game in RxJava starts with the powerful set of Schedulers it provides. In RxJava, you cannot directly access or manipulate threads. If you want to play with threads, you have to do it through the in-built Schedulers.
You can think of Schedulers as threads or thread pools (a collection of threads) to perform different kinds of jobs.
In simple words, if you need to execute a task in a particular thread, you need to pick the right Scheduler for it, which will then take a thread from its pool of available threads and get the task executed.
There are several types of Schedulers available in the RxJava framework, but the tricky part is to choose the right Scheduler for the right kind of job. Your task will never run optimally if you don’t pick the right one. So, let’s try and understand each of them -
Schedulers.io()
It’s backed by an unbounded thread pool and is used for non-CPU intensive I/O work like accessing the file system, performing network calls, accessing the database, etc. This Scheduler is uncapped and the size of its thread pool can grow as needed.
Schedulers.computation()
This Scheduler is used for performing CPU-intensive work like processing large data sets, image handling, etc. It is backed by a bounded thread pool with size up to the number of processors available.
As this Scheduler is suitable for only CPU intensive tasks, we want to limit the number of threads so they don’t fight among each other over CPU time and starve themselves.
Schedulers.newThread()
This Scheduler creates a completely new thread to perform a unit of work every time it’s used. It doesn’t benefit itself by making use of any thread pool. Threads are expensive to create and tear down, so you should be pretty careful of not abusing excessive thread spawning leading to severe system slowdowns and out of memory errors.
Ideally, you would use this Scheduler quite rarely, mostly for kicking off long-running, isolated units of work in a completely separate thread.
Schedulers.single()
This Scheduler is newly introduced in RxJava 2 and is backed by a single thread which can only be used to perform tasks in a sequential manner. This can be highly useful when you have a set of background jobs to perform from different places in your app but can’t afford to execute more than one at a time.
Schedulers.from(Executor executor)
You can use this to create a custom Scheduler backed by your own Executor
. There can be several use cases where you might need to create a custom Scheduler to perform specific tasks for your app that demands custom threading logic.
Suppose you want to limit the number of parallel network calls happening in your app, then you can create a custom Scheduler with an executor of fixed thread pool size, Scheduler.from(Executors.newFixedThreadPool(n))
and use it on all network-related Observables in your code.
AndroidSchedulers.mainThread()
This is a special Scheduler that’s not available in the core RxJava library. You need to use the RxAndroid extension library to make use of it. This scheduler is specifically useful for Android apps to perform UI based tasks in the main thread of the application.
By default, it enqueues tasks in the looper associated with the main thread of the application, but there are other variations of it that allows us to use any Looper
we want via APIs like this, AndroidSchedulers.from(Looper looper)
.
Note: Be careful while using Schedulers backed by unbounded thread pools like Schedulers.io()
, as there always lies a risk of infinitely growing the thread pool and flooding the system with too many threads.
Understanding subscribeOn() & observeOn()
Now, as you have a clear understanding of the different types of Schedulers and when should you use what, let us move ahead in understanding and exploring the subscribeOn() and the observeOn() operators in detail.
You should have a deep understanding of how these two operators work individually and when combined together, to master the true multi-threading capabilities of RxJava.
subscribeOn()
In simple words, this operator tells which thread the source observable can emit its items on. You should understand the importance of the word “source” observable here. When you have a chain of observables, the source observable is always at the root or top of the chain from where the emissions originate.
As you have already seen; if we don’t use subscribeOn()
, all the emissions happen directly on the thread the code is executed on (in our case, the main
thread).
Now let us direct all the emissions to the computation thread using subscribeOn()
along with the Schedulers.computation()
Scheduler. Once you run the code snippet below, you will notice all the emissions taking place in one of the computation threads available in the thread pool, RxComputationThreadPool-1.
For brevity purposes, we have not used the full DisposableSubscriber
as we don’t need to handle the onError()
and onComplete()
every time in these simple scenarios. To handle just the onNext()
, a single consumer is enough.
It doesn’t really matter where you put the subscribeOn()
method in the chain. It only acts on the source observable and controls which thread it emits its items on.
In the example below, you will notice that there are other observables in the chain created by the map()
and the filter()
operators and subscribeOn()
has been placed at the bottom of the chain. But once you run the snippet below, you will notice that it only affects the source observable. This would be more clear once we mix the chain with observeOn()
as well. Even if we place subscribeOn()
below observeOn()
, it will only affect the source observable.
It is also important to understand that you cannot use subscribeOn()
multiple times in your chain. Technically, you can do so, but that won’t have any additional effect. In the snippet below we are chaining three different Schedulers ,but can you guess which Scheduler
will the source observable emit its items on?
If your guess is Schedulers.io()
, then bingo! That was a tough guess though, and for that, you will get a five-minute hug the next time we meet. :-P
Even if you put multiple subscribeOn()
operators in your chain, only the one closed to the source observable will take its effect and nothing else.
Under The Hood
It’s worth spending some time understanding the above scenario in depth. Why does the Schedulers.io()
Scheduler take its effect and not the other ones? Normally, you would think that the Schedulers.newThread()
should have taken effect as it was applied last in the chain.
You have to understand that in RxJava, subscriptions are always made with upstream observable instances. The snippet below is very similar to what we have already seen before, just a bit more verbose.
Let’s understand this starting from the last line of the snippet. Here, the target subscriber (or the most downstream observer in the chain) invokes the subscribe()
method on observable o3
which then makes an implicit subscribe()
call to its immediate upstream observable o2
. The observer implementation provided by the o3
multiplies the emitted numbers by 10.
The process repeats, and o2
implicitly calls subscribe on o1
passing an observer implementation that only lets even numbers to pass. Now we have reached the root where the source observable o1
doesn’t have any upstream observable to call subscribe
on. This actually completes the observable chain making the source observable emit its items.
This should clear our concept on how subscriptions work in RxJava. By now, you should have a mental model of how observable chains are formed and how events are propagated down the chain, starting from the source observable.
observeOn()
As we’v seen, subscribeOn()
instructs the source observable to emit its items on a particular thread, and this is the thread responsible for pushing the items all the way down to the sink Subscriber
. Thus, by default, the subscriber will also consume items on that particular thread only.
But this might not be the expected behaviour you would want all the time from your application. Suppose you want to fetch some data from the network and show it in the UI of your app?
You essentially have to accomplish two things here -
- Make the network call in a non-blocking I/O thread
- Consume the results in the main (or UI thread) of the application
You will have an observable that makes a network call in the I/O thread and passes the emissions down to the target subscriber. If you just use subscribeOn()
with Schedulers.io()
, the final subscriber will also operate in the I/O thread. And as we can’t touch UI components in any other thread, except the main thread, we are out of luck.
Now, we are in a dire need to switch threads and this is exactly where the observeOn()
operator will come into play. In the observable chain, if it encounters an observeOn()
somewhere in the chain, then the emissions are immediately switched to the thread specified by it.
In this contrived example, we have an observable that emits a stream of integers fetched from the network. In real-world use cases, this can be any other asynchronous operation like reading a large file, or fetching data from the database, etc. You can try the snippet and see the results for yourself. Just keep an eye on the thread names in the logs.
Now let us have a look at a slightly more complicated example, where we will be using multiple observeOn()
operators to switch threads multiple times in our observable chain.
In the snippet above, the source observable emits its items on the I/O thread because we have used subscribeOn()
with Schedulers.io()
. Now we want to transform each item using the map()
operator but want to do so in the computation thread. For that, we can use observeOn()
with Schedulers.computation()
just before the map()
operator to switch threads and pass the emissions to the computation thread.
Next, we want to filter the items, but due to some reason, we want to perform that operation in a completely new thread for each of the items. We can use observeOn()
again with Schedulers.newThread()
before the filter()
operator to switch to new threads for each item.
In the end, we want our subscriber to consume the final processed items and show the results in the UI and to do that we need to switch threads again, but this time, to the main thread using observeOn()
with the AndroidSchedulers.mainThread()
Scheduler.
But what happens if we use observeOn()
multiple times consecutively? In the snippet below, which thread will the final subscriber consume the results on? Will the first observeOn()
come into play or the last one? Any guesses?
If you run this snippet, you will see for yourself that all the items are consumed in the RxComputationThreadPool-1
thread, which means that the last observeOn()
with Schedulers.computation()
made its effect. But wonder why?
Under The Hood
You might have already guessed why the last observeOn()
made its effect and not the other ones. As we already know, subscriptions always happen upstream, on the other hand, emissions always happen downstream. They originate from the source observable, flowing down the chain to the sink subscriber.
The observeOn()
operator always works on the downstream observable instances because of which the last observeOn()
with Schedulers.computation()
overrides all other observeOn()
operators specified earlier. So, whenever you want to switch threads for any particular observable, all you need to do is specify the observeOn()
operator just above it. Synchronization, state inconsistencies, race conditions, and all other threading edge cases are automatically handled under the hood.
By now, you should have a pretty good understanding of how to make proper use of RxJava to write heavily multi-threaded apps ensuring a fast and smooth user experience in your apps.
It’s okay if all the things haven’t clicked in your head yet. Don’t be shy of reading the article a few more times and playing with all the code samples by yourself to have a better understanding. It’s quite a lot of stuff to consume at once, so take your time.
If you like the article, consider sharing it with your friends via social media.
Oh, and one last thing. We’re hiring! For any roles in Engineering, Design or Product Management, or anything at all, visit http://www.gojek.io/careers