An introduction to RxJS: things you need to know in 2021
June 11, 2021
Update:

An introduction to RxJS: things you need to know in 2021

Filip Guerrero-Mengana
Filip Guerrero-Mengana
Front-end developer

RxJS is a JavaScript implementation of a ReactiveX library that allows working with asynchronous, event-based code. ReactiveX (so as RxJS) is based on an observer pattern.

Pull vs. Push

RxJs is a push-based library, so it is important to understand the differences between push and pull systems for a complete understanding. The whole difference is about how data Producer & data Consumer interact. 

Pull-based behavior

Every JavaScript Function is a Pull system. When working with functions, first the call occurs, and then we receive some data from it. The code that calls the function is a Consumer, and the function is a data Producer.

In this case, the Consumer pulls data from the Producer, and it’s a Consumer who determines when it receives the data. The Consumer is an active element, while the Producer remains dormant until the data is pulled from it.

Push-based behavior

The most common example of a push system is Promise. Promise (data Producer) delivers resolved value to the registered callback functions (data Consumers). Promise determines when the callback function will receive the data - in other words, it’s responsible for pushing the data. The callback function is inactive until it receives the data. An interaction between the RxJS Observable (Producer) and Observer (Consumer) is based on the same behavior.

Single- and multiple-value Producers

In a Pull system, a single-value Producer is a JavaScript function since it can only return a single value on invocation. In a Push system, a single-value Producer is represented by Promise: the resolved-value could only be sent once.

Multiple-value Producers are distinguished by their ability to return n-values over time, and they’re present in both systems. In a Pull system, a multiple-value Producer is a JavaScript Generator function. Unlike simple functions, a Generator function is capable of returning zero to (potentially) infinite values on iteration. But it is still controlled by Consumer since the generator method next() is called from the ‘external’ code, leading to resuming a Generator function execution. 

In the scope of a Push system, an example of a multiple-value Producer might be an Event Listener. User interaction with a ‘targeted’ DOM element leads to multiple calls of the callback function. But in contrast to the Generator function, data Consumer (callback function) remains passive while data Producer (events triggered by user’s interaction) decides when Consumer will receive data. 

To summarise: in Pull systems, the Consumer determines when it receives data from the data Producer, while in Push systems, the Producer determines when to send data to the Consumer.

Basic RxJS concepts

What is Stream?

A Stream is a sequence of values in some time interval. It could be a simple numbers sequence generated within 6 seconds, the events from various elements on a form, text messages in chat sent via WebSocket. Everything above is the examples of data, which will be collected over some timespan.RxJS Observables are made to make work with Streams much more convenient.

Observable

Observables are objects that can emit data over some time. You can think of it as a function that returns the data Stream to the Observers, synchronously or asynchronously. The number of data returned by a Stream varies from zero to infinite.

Observers & Subscriptions

For Observable to work, it needs the Observer and a subscription. An Observer is a consumer of values delivered by an Observable. 

The Subscription serves as a link between the Observable and the Observer: the Observer connects to the Observable via the subscribe() method. 

Observable lifecycle

While working with Observers & subscriptions, the Observable goes through four stages of its lifecycle:

  1. Creation
  2. Subscription
  3. Execution
  4. Disposal

Creating Observable

Be default the Observable is created in the following way: 


The Observable constructor requires only one argument: a subscriber function.

When we mention the Observable execution, we talk about the execution of a code inside the subscriber function. To invoke the Observable we need the Observer and a subscription. 

We subscribe to the Observable by using the subscribe() method: Now that the Observable has gained a subscriber, the following result will be printed in the console: ‘Hello World!’.

Note that every subscription leads to the separate execution of the Observable. Multiple Observers that subscribed to the same Observable aren’t connected in any way.

Observable produces n-values (in sync or async manner) that get delivered to a subscriber during the execution. There are three types of values an Observable Execution can deliver:

  • "Next" notification: sends a value such as a Number, a String, an Object, etc. The number of “next” notifications isn’t limited.
  • "Error" notification: sends a JavaScript Error or exception.
  • "Complete" notification: does not send a value, serves as a signal that the Observable has completed successfully. You cannot pass values with that notification!

The "completed" and "error" states are final. That means, Observables cannot emit any data after delivering these notifications.

Giving the example: 

The console output would be: 

“I am number 1”

“I am number 2”

“I am number 3”  won’t be printed since the callback inside the subscribe method won’t send the data with the “complete” notification. And since the Observable won’t deliver anything after success/error notifications, the “I am number 4” also won’t be printed.

Cold vs. Hot Observables

In a nutshell:

  • If the data is generated by Observable itself, we call it a ‘cold’ Observable; 
  • If the data is generated outside the Observable, we call it ‘hot’.

Now, with more details:

Cold Observables

By default, Observables are lazy - the Observable execution only happens for each Observer that subscribes. The Observable starts a new execution for every subscriber, no data is shared between them. 

If the Observable generates multiple values, the situation when two subscribers receive different values can easily occur: 

In this case, the data generated inside the Observable (Math.random()), which means we’re dealing with the cold Observable. The subscribers are getting different values since the execution with the random number generation starts separately for every subscriber.

Hot Observables

In order to transform cold Observable into a hot one, we just need to move the data generation outside the Observable. Let’s slightly tweak a previous example: 

Now it’s a hot Observable since the data generation (Math.random()) occurs no matter if the Observable has the subscriber or not. The only difference is that by default, that data will be simply lost without the subscribers.

So, what’s better?

Depends on the use case. In general, you would want to keep the Observables cold, unless:

  • You need the ability to have n-subscribers that will receive the same data
  • You’re dealing with the new instance. For example, a WebSocket connection: you probably don’t want to create a separate connection for every new subscriber. Instead, you’d want to share a single connection between all of them.

Operators

Operators are what make RxJS useful. They allow you to manipulate the data from its source, returning an Observable with the modified data. 

There’s a ton of useful operators provided by RxJS out of the box, one for almost every use case.

How to use Operators

Most operators are known as Pipeable Operators since you can chain them with the pipe() method. Pipeable Operator is a pure function that takes one Observable as input and generates another Observable as output. They are located at ‘rxjs/operators’.

A pipe() method accepts Operators as params, and then calls then one by one:

Every subsequent Operator will receive a new Observable that was generated as a result of previous Operator execution.

Categories of Operators

RxJS Operators are split into categories which makes the search process much easier:

Combination Operators

They Allow combining the data from the multiple Observables. For instance, we can combine data updates from different sources to perform some calculations. Operator combineLatest() produces last values from each Observable whenever one of these produces a value: 

Creation Operators

These Operators can produce the Observable almost out of anything. For example, you can create a progress bar that will indicate the progress of reading the article. You can use the fromEvent() Operator to achieve that goal. It allows to create of an Observable based on the event data:

Error Handling Operators

This group allows us to handle errors efficiently and repeat the query if needed. The most used operator is catchError (you have to return the Observable when using it!):

Filtering Operators

Operators from this group are responsible for data stream filtering, and they also help when you need to adjust accumulated values in the data stream. For example, the take operator generates only the specified number of values until it completes its execution. You can subscribe to a click-event and process only the first click:

Transformation Operators

These Operators provide us with the method to transform the data. Like the scan operator that accumulates the state in a way like it works in Redux:

Subjects

Another big concept in RxJS.

Subject is a special type of Observable. It allows values to be multicasted to many Observers, which can subscribe to the Subject even when its execution has already started (on the contrary, the execution of a default Observable is unique for every subscriber). Let’s take a look at the example: 

The following result will be printed in the console:

   1st: 3

   1st: 9

   2nd: 9

A Subject is created using the new Subject() constructor.

Same as with the Observable, Observers are subscribed to it via the subscribe() method, which receives from Subject values of three types: next, error, and complete.

Internally to the Subject, subscribe() method does not invoke a new execution that delivers values. It simply registers the given Observer in a list of Observers.

Aside from the “standard” Subject, there are three varieties of it:

BehaviorSubject

BehaviorSubject stores the last emitted value. So every newly subscribed Observer will receive the “current value” immediately.

The initial value is set during the BehaviorSubject initialisation: 

The following result will be printed in the console:

 1st: 5

 2nd: 5 - if it was a “default” Subject, a second subscriber wouldn’t receive the last value, only the next one

 1st: 7

 2nd: 7

ReplaySubject

Unlike BehaviorSubject, a ReplaySubject object can store a given amount of the lastly emitted values. That amount is set during the object creation process.

Every Observer will receive the n-amount of “replayed” values of a ReplaySubject: 

The following result will be printed in the console:

1st: 5

1st: 6

1st: 7

2nd: 6

2nd: 7

AsyncSubject

With the AsyncSubject the Observers will receive only the last emitted value and only when it will complete its execution: 

In 3 seconds, the following result will be printed in the console:

Async: 9

Observable vs. Promise

Here are the key differences between Promise and Observable:

Callback-function execution

Callback-function that we send to a Promise constructor gets executed immediately: 

The console output:  

‘Callback call’

‘Before calling then…’

"Greeting from Promise: A-a-and resolved!"

In the case of cold Observable, a callback function will be executed only after calling a subscribe() method: 

The following result will be printed in the console:

“Before calling subscribe…”

“Callback call”

“Next!”

“Complete the Observable”

Asynchrony

Promise is always asynchronous, even if it resolves instantly:

The console output: 

"Before calling then…"

"After calling then..."

"Greeting from Promise: Promise Resolved!"

The message from then() method will be the last one displayed even though Promise was resolved without delay.

On the other hand, the Observable can be synchronous: 

Console output: 

"Before calling subscribe..."

"Callback call"

"Next!"

"Complete the Observable"

"After calling subscribe…"

And it can be asynchronous: 

Console output: 

"Before calling subscribe..."

"Callback call"

"After calling subscribe…"

"Next!"

"Complete the Observable"

Observables are collections of multiple values

A Promise can return only one value. It could be an array, but it’s a single object still. An Observable can emit n-values during its execution.

Operators

The aforementioned RxJS feature allows users to manipulate and modify the Observable data stream. Promise objects don’t have anything like that.

Memory Leak & Unsubscribe

Many Observables that we are subscribing to are potentially infinite (such as click-events) - we can’t tell beforehand how many values will be emitted. This means that we have to manipulate our Observable subscription on our own.

Why does memory leak occur?

Let’s take a look at the Angular component for reference: 

In this example, we’re using the RxJS function - timer. It creates an Observable that emits numbers. In our case - every second. 

The problem is, after the component will be unmounted, our subscription will live on, and it’ll keep printing the outputs in the console. And if the app reinitializes the same component, we’ll end up having another subscription, and so on and so on. An example of the memory leak is the components recreation without cleaning up our subscriptions.

How to manage the subscriptions to avoid memory leak

The unsubscribe() method:

An Observable subscribe() method returns a Subscription object. It has a method called unsubscribe(), which used to remove the subscription: 

A takeUntil() Operator:

The takeUntil() Operator allows us to use a declarative approach while working with data streams: 

A takeUntil(notifier: Observable<any>) Operator emits values produced by Observable until a notifier Observable produces values.

This is a declarative approach since we’re declaring our Observable-chain along with everything it needs for a complete lifecycle.

A take() and first() Operators:

Sometimes we need a subscription to work only once - for instance, when we’re loading some profile data at the application initialization.

In this case, it’s better to use either first() Operator (which emits only the first value), either take() (which emits only n-values), both unsubscribe automatically: 

In this example, the console output will display 0, 1, and 2 values. Afterward, the take() Operator will unsubscribe from the Observable.

Notice: take(1) and first() Operators won’t unsubscribe from the Observable before the first value is emitted. Use them only when you’re certain that at least one value will be produced. Otherwise, the subscription will remain active, which may lead to unexpected behavior.

Afterword

This article is meant to give you an overview of what RxJS is about without going too deep into each concept. Hopefully, it’ll add more clarity on the subject, but I definitely recommend taking a look at the official RxJS documentation for a deep dive into the topic.

Locked content
Front-end developer
Filip Guerrero-Mengana
Front-end developer

Filip knows the secret paths of the front-end side of the force and teaches young padawans how to master it.

Like the article? Spread the word
Written by

Akveo

Akveo is an experienced team of full-stack software experts passionate about creating reliable software. Our expertise lets us understand the essence of our a business need to deliver the best solution possible. Plus, our own products in development and design allow us to implement new solutions faster. Check out what our customers say and contact us.

More articles by themes

Show more