A Guide to Antipatterns, Cold vs Hot Observables, and Mastering Backpressure
Reactive programming has emerged as a powerful paradigm for building responsive and scalable software systems, allowing developers to create applications that can gracefully handle asynchronous events. However, like any programming paradigm, it comes with its own set of challenges. In this comprehensive guide, we delve into the world of reactive programming, shedding light on the often-overlooked pitfalls that developers may encounter. Additionally, the distinction between cold and hot observables, along with strategies for mastering backpressure, will be explored to equip developers with the knowledge needed to harness the full potential of reactive programming paradigms.
Cold vs Hot Observables
When working in a reactive environment, mostly in code, we will encounter cold observables if it comes to data-driven observables. Cold observables start data emission only upon subscription. Imagine it as a movie-on-demand service where the viewer initiates the streaming of a movie. Each subscriber receives an independent sequence of data, isolated from others, allowing individual control such as pausing or fast-forwarding without affecting other subscribers. This lazily loading characteristic defines cold observables, emitting data only when the observer explicitly requests it. Examples abound in scenarios such as web requests, database queries, or file retrievals, where up-to-date data is paramount, and each subscriber triggers a fresh emission, resulting in unique sets of data.
A Cold Observable is created using the range() function, which sends a range of numbers from 1 to 5. If we subscribe to this Observable several times, each subscriber gets their own separate set of data. The data is not shared among subscribers.
Conversely, hot observables adopt a different approach, emitting items irrespective of whether subscribers are present. In this paradigm, there is a single source of emission, and depending on when observers subscribe, they may miss some emitted items. Unlike cold observables, subscribers in hot observables join a sequence in progress, and the emission mechanism resembles a push rather than pull. Consider UI events as a prime example — multiple mouse clicks occurring independently of subscribers, where missing some clicks may not be critical as the focus is often on the most recent interactions. The absence of control over data generation distinguishes hot observables, making them suitable for scenarios where immediate item emission is crucial, such as real-time event handling.
In Hot Observable example we are using PublishSubject to create a hot observable representing button click events. Multiple event handlers can subscribe to the same button and respond to clicks. We need to remember that new subscribers can miss some data that was emitted before they subscribed.
I also wanted to touch on the topic of Multicasting, as it is often overlooked. Multicasting comes into play when we want to share data emitted by an observable with multiple subscribers. This stands in contrast to the default behaviour of a cold observable.
To achieve multicasting, we transform a cold observable into a hot observable using certain operators like publish() or publishReplay() in RXJava. Afterward, we activate the source observable by explicitly calling connect() on the connectable observer, making it multicast to all subscribers. It’s crucial to note that calling subscribe() on a ConnectableObserver won’t trigger emission; only the connect() operator will.
With connectable subscribers, we gain control over when the source observable starts emitting data. This sets it apart from a normal hot observable. This control can be particularly useful, for instance, when caching data for shared access. In the provided example, chat messages serve as the source, and two users subscribe to it. However, only the first user receives messages initially because the connect() operator triggers the emission before the second user subscribes.
By changing the order, when a new message is sent, it is emitted to both users without the need to send the message to each user individually.
This illustrates the flexibility and control that multicasting provides in managing shared data streams.
Now that we’ve covered the basics of cold and hot observables, let’s delve into the concept of backpressure. Imagine a scenario where a producer is emitting a thousand items per second or more, but the consumer can only process a hundred per second. This is where backpressure comes into play — it’s a mechanism to manage this situation.
There are various ways to address it. One approach is to control the frequency at which the producer emits data, essentially slowing it down. Another method involves buffering some data until the consumer can catch up. In RXJava, we can even specify what action to take if the buffer is full, such as stopping the emission of data, deleting the youngest or oldest element, or simply dropping non-processed data and cancelling the subscription. Let’s take a closer look at the code.
In our example, our data stream consists of newly created insurance policies. We observe whenever a new policy is created, and in the consumer, we process and issue these policies. However, our producer is exceptionally fast, generating new values as quickly as possible, while the consumer is relatively slow. The Observable class has an unbounded buffer size, meaning it will buffer everything, even up to one million items, and push it to the subscriber. I encountered an Out of Memory Exception on both threads. The reason for running OOM, is that the producer pushed too much data, overwhelming the consumer’s capacity. Since RxJava 2, we have a new type called Flowable. It’s simply an extension of Observable and supports a backpressure mechanism to handle situations like these.
We can convert simply Observable to Flowable using toFlowable() operator or create it in the same way as Observable. But as a second parameter we need to provide a backpressure strategy. It determines how data should be handled when the consumer can’t keep up with the producer.
Buffering is a common back-pressure strategy where you store incoming items in a buffer until the consumer is ready to process them. I have reduced the number of policies that producer emits to 10,000. Policies were issued, although if I increase the number of items, then we may again encounter an OOM or application may start to freeze. It is because The Flowable has the default buffer size of 128 elements, but it grows according to needs.
However, we can control the buffer size. We can override this value globally by setting system parameter ‘rx3.buffer-size’, or we can specify it locally using special operator ‘onBackpressureBuffer’. Buffer size in that case is 100. As now the buffer size is constant, when the buffer is full, we will get an error as we can see in console logs. Not even one policy was issued.
This method (‘onBackpressureBuffer’) can be overloaded with the third parameter, which is BackpressureOverflowStrategy. This allows us to specify what should happen when the buffer becomes full. In this case, we are choosing to drop the latest values from the buffer. When the buffer reaches its limit, a warning message, “Buffer overflow!” is emitted, and the most recent items are discarded. We can observe that a significant amount of data has been dropped, and only the older values are delivered once the downstream requests them. This can be valuable when we can safely disregard values from a source, such as current GPS location signals, assuming that more up-to-date values will be available later. Alternatively, we can request to drop the oldest element in the buffer and replace it with the current value.
When dealing with Cold Observables, there’s an additional aspect we can consider. Some operators, such as Flowable.range(), Flowable.generate(), Flowable.iterable(), and Flowable.fromArray(), come with built-in backpressure support. These operators use a pull reactive model. But what does it mean?
It means that these operators can generate data on-demand, precisely when requested. The observeOn() operator, for instance, can instruct the range to generate only as many values as the observeOn() buffer can accommodate without overflowing. In this example, the application behaves smoothly. We can observe that the producer emits a number of items equal to what the consumer has requested, and then the consumer processes them, creating a harmonious flow between the two.
- When implementing Observables, leverage factory methods like range, generate, and fromArray in RxJava’s Flowable to effortlessly incorporate backpressure support.
- Evaluate if it’s feasible to skip or batch incoming events to alleviate the overall load.
- Consider the efficiency of DROP and LATEST strategies; while they can be effective, they may also result in data wastage. These strategies prove beneficial in scenarios where not every update needs to be visible. LATEST is often a safer choice, consistently providing a final result.
Now let’s discuss some key anti-patterns. Those are also mistakes that I made at the beginning of my reactive programming journey, or that I saw many times during the project.
Mixing blocking code with reactive code
The point is that by mixing, we lose the essence of using reactive programming. If we introduce blocking operations within the sequence, we might encounter bottlenecks.
Another common behavior I’ve frequently noticed is when developers call a blocking operation to retrieve an object, perform some actions on it, and then wrap it within a reactive stream. For instance, pulling a user using the block() operator and then wrapping it again with Mono for a single observable item. Instead, it’s advisable to handle every operation, including logging, within the chain and return the reactive type, such as Mono.
Excessive side effects in chains
In this example, we encounter a scenario with a data pipeline containing extensive logging and an external service call within the chain.
We can improve this flow by making a few adjustments. For logging, using doOnNext() or doOnError() allows us to log intermediate results without disrupting the data flow. When handling external service calls, it is advisable to consider using flatMap() or another asynchronous operator. By tweaking the flow, the code becomes cleaner.
Overuse of reactive chains
Similar to Java Streams. If we overuse this pattern and create too long or nested chains, we end up with code that is difficult to understand and maintain.
Overuse of reactive chains in simple synchronous operations
We need to remember that using reactive is not always a good choice. In simple, synchronous operations we don’t benefit from the reactive nature of the library. For simple operations, it’s often better to use standard Java code without introducing the complexity of reactive streams. Reactive programming shines when dealing with asynchronous and event-driven operations.
Using multiple Observable.subscribe()
Why should we avoid this?
The first reason is code readability. It becomes challenging to follow the code’s flow. Nesting ‘subscribe’ calls can lead to unexpected behaviour, especially in terms of asynchrony and error handling. In this example, we assume that when displaying a user profile, profile image data is available. However, if image retrieval is delayed, we might end up displaying user data without the profile image. Instead, we should use operators. In RxJava, we could use zip() or flatMap() to ensure that all data is available before proceeding. Additionally, by avoiding nested ’subscribe ‘, we retain the benefits of reactive futures, enabling us to better manage asynchronous workflows.
As we wrap up this guide, I hope that the insights provided empower you in navigating the challenges of reactive programming. Remember, avoiding common pitfalls, understanding the differences between cold and hot observables, and implementing effective backpressure mechanisms are key to building responsive and efficient applications. I trust that you found this guide insightful and enjoyable. Happy coding!
Words by Paulina Ksienżyk, Altimetrik Poland