RxJS for Angular Developers, Part 2

RxJS for Angular Developers, Part 2

For the most part, Angular is a straightforward framework. But, RxJS is a common stumbling point for Angular beginners. In Part 1 of this series, we introduced RxJS, explained why it’s an essential tool for Angular developers, and gave a quick overview of Observables. In this Part 2, we will cover some of the most common RxJS operators and provide some sample use cases to demonstrate the versatility of RxJS. Finally, in Part 3, next year, we will cover some of the common pitfalls that developers encounter with RxJS and the best practices that will help you avoid them.

Meet the Operators

RxJS provides over a hundred operators with which to manipulate observables. This means that no matter what you want to do with your observables, chances are there’s an operator for that. These operators can be grouped into a few broad categories. I’ve listed them below, along with a few examples from each category.

Creation

Creation functions are used to create new observables. Some of the most commonly used are from and interval. Note: These are actually functions, not operators, as they do not go inside the pipe.

  • from<T>(input: ObservableInput<T>, scheduler?: SchedulerLike): Observable<T>

From converts almost anything to an observable, including promises, arrays, or any iterable. This makes it easy to integrate with libraries that use promises rather than observables, among other uses.

  • timer(dueTime: number | Date = 0, periodOrScheduler?: number | SchedulerLike, scheduler?: SchedulerLike): Observable<number>

Timer creates an observable that emits incremental numbers periodically in time. This is useful for polling, or any other event that needs to be processed on a fixed interval.

Combination

Combination functions are used to join multiple observables into one. Some of the most commonly used are forkJoin and merge. Note: These are actually functions, not operators, as they do not go inside the pipe.

  • forkJoin(…sources: any[]): Observable<any>

ForkJoin takes in an arbitrary number of observables, waits for them all to complete, and then emits the last value from each. This is particularly useful for operations that need to wait for a number of asynchronous processes to complete before they can begin. For those familiar with promises, this is the RxJS equivalent to Promise.all().

  • merge<T, R>(…observables: Array<ObservableInput<any> | SchedulerLike | number>): Observable<R>

Just like forkjoin, merge takes in an arbitrary number of observables and combines them into one. Unlike forkJoin, it does not wait for the inner observables to complete, instead emitting every value from its inner observables as they occur. One common use case is when you want the same action to be set off by more than one different trigger, like a task that runs on a schedule but can also be started manually.

Transformation

Transformation operators allow you to change values as they pass through the stream. Some of the most commonly used are map, mergeMap, and switchMap.

  • map<T, R>(project: (value: T, index: number) => R, thisArg?: any): OperatorFunction<T, R>

Map passes each value in the stream through the provided projection function. This is very similar to how map works on arrays.

  • mergeMap<T, R, O extends ObservableInput<any>>(project: (value: T, index: number) => O, resultSelector?: ((outerValue: T, innerValue: ObservedValueOf<O>, outerIndex: number, innerIndex: number) => R) | number, concurrent: number = Number.POSITIVE_INFINITY): OperatorFunction<T, ObservedValueOf<O> | R>

MergeMap is a type of higher-order operator, which means it can flatten new observables into the stream. For every value that comes through the stream, mergeMap maps it to a new observable. This makes it very useful for chaining multiple asynchronous operations without falling into the anti-pattern of nested subscribes.

  • switchMap<T, R, O extends ObservableInput<any>>(project: (value: T, index: number) => O, resultSelector?: (outerValue: T, innerValue: ObservedValueOf<O>, outerIndex: number, innerIndex: number) => R): OperatorFunction<T, ObservedValueOf<O> | R>

Just like mergeMap, switchMap is a type of higher-order operator. For every value that comes through the stream, switchMap maps it to an observable, then flattens all of these inner Observables into the stream. The difference is that switchMap cancels all previous inner observables every time a new one is created. That makes it useful for applications like polling or typeahead (see examples below), where old values don’t matter once newer values are available.

Filtering

Filtering operators help us filter the stream of values from an observable to just process what we need. Some of the most useful operators in this category are debounceTime, distinctUntilChanged, filter, and takeUntil.

  • debounceTime<T>(dueTime: number, scheduler: SchedulerLike = async): MonoTypeOperatorFunction<T>

DebounceTime is useful for filtering bursty user interactions for applications such as typeahead. It allows us to filter out all but the most recent value from each burst values in the stream, with the length of a burst being determined by the dueTime parameter.

  • distinctUntilChanged<T, K>(compare?: (x: K, y: K) => boolean, keySelector?: (x: T) => K): MonoTypeOperatorFunction<T>

DistinctUntilChanged allows us to filter out repeated values from the stream. This is useful for avoiding unnecessary duplicate processing.

  • filter<T>(predicate: (value: T, index: number) => boolean, thisArg?: any): MonoTypeOperatorFunction<T>

Filter allows you to filter the stream of values with a custom predicate. Resulting observable will only emit values that match the predicate. This is very similar to how filter works on arrays.

  • takeUntil<T>(notifier: Observable<any>): MonoTypeOperatorFunction<T>

TakeUntil only lets values pass until a second Observable, the notifier, emits a value. Then, it completes. This is useful as an alternative to keeping track of your subscriptions and unsubscribing from them manually when a component is destroyed (This is discussed further later in this article).

Utility

There are so many operators in RxJS that some don’t neatly fall into categories. But these operators, such as tap, finalize, and timeout, can still be very useful.

  • tap<T>(nextOrObserver?: PartialObserver<T> | ((x: T) => void), error?: (e: any) => void, complete?: () => void): MonoTypeOperatorFunction<T>

Tap intercepts each value in the stream and runs a function on it, but does not modify the value. It is commonly used for debugging by inserting a call to console.log into the stream.

  • finalize<T>(callback: () => void): MonoTypeOperatorFunction<T>

Finalize will call the provided function when the source observable completes, even if it completes due to an error.

  • timeout<T>(due: number | Date, scheduler: SchedulerLike = async): MonoTypeOperatorFunction<T>

Timeout allows you to cause observables to emit an error if the source observable does not emit within a specified amount of time.

Error handling

One of the benefits of RxJS is that it allows you to gracefully handle asynchronous errors and optionally retry, without the need to rely on try/catch blocks. Some of the most useful operators for this purpose include catchError, retry, and defaultIfEmpty.

  • catchError<T, O extends ObservableInput<any>>(selector: (err: any, caught: Observable<T>) => O): OperatorFunction<T, T | ObservedValueOf<O>>

As the name suggests, catchError allows you to catch errors in the stream, and handle them by returning a new observable. This is useful for catching and handling known types of errors before they get to the subscriber.

  • retry<T>(count: number = 1): MonoTypeOperatorFunction<T>

Retry allows you to retry the observable up to a certain number of times in the event of an error.

  • defaultIfEmpty<T, R>(defaultValue: R = null): OperatorFunction<T, T | R>

DefaultIfEmpty allows you to provide a default value if the source observable completes without emitting any values. This is useful if we need subsequent steps in the chain to complete even if the source observable doesn’t emit.

Multicasting

  • share<T>(): MonoTypeOperatorFunction<T>

Share allows us to multicast an observable. This is useful for observables that will have multiple subscribers, but shouldn’t duplicate execution for every subscriber. 

Use Cases

The best way to understand what all these operators do is to see them in use. Toward that end, here are some examples of use cases that RxJS is particularly suited for.

HTTP Polling

This example uses the timer function to create an observable that fires every 5 seconds, the switchMap operator to transform each event into an HTTP request, and the map operator to extract the data from the response. In order to avoid having to subscribe and unsubscribe manually, we use Angular’s async pipe to display the results in the template.

HTTP Retry and Error Handling

Spotty internet connections often cause HTTP requests to fail intermittently. We can greatly improve the user experience in this situation by automatically retrying failed requests and providing visible error messages rather than failing silently. With Angular and RxJS, we can create an HttpInterceptor that will add retry and error handling logic to every HTTP request we make.

In this example, you can trigger an HTTP request by hitting the “Make request” button. Much like the previous example, this will use the switchMap operator to make the request, the map operator to extract the relevant data, and Angular’s async pipe to display the results. However, we also have the option to trigger an error by using the wrong URL in our request. Because we have an HttpInterceptor defined in ‘http-error.interceptor.ts’ and provided in ‘app.module.ts’, this request will be retried two more times. If the request fails all three times, the user will see an alert pop up to inform them of the issue. This is done with the retry and catchError operators, respectively.

Typeahead

With the tools that RxJS provides for us, implementing complex behavior like typeahead with debouncing becomes trivial. This example uses RxJS operators such as debounceTime (to prevent multiple requests within a short time period), distinctUntilChanged (to filter out duplicate requests), and filter (to prevent querying the API without a search term, in this case). We also make use of some of the same operators as previous examples, like switchMap, catchError, and map.

For the most part, Angular is a straightforward framework. But, RxJS is a common stumbling point for Angular beginners. In Part 1 of this series, we introduced RxJS, explained why it’s an essential tool for Angular developers, and gave a quick overview of Observables. In this Part 2, we will cover some of the most common RxJS operators and provide some sample use cases to demonstrate the versatility of RxJS. Finally, in Part 3, next year, we will cover some of the common pitfalls that developers encounter with RxJS and the best practices that will help you avoid them.

Meet the Operators

RxJS provides over a hundred operators with which to manipulate observables. This means that no matter what you want to do with your observables, chances are there’s an operator for that. These operators can be grouped into a few broad categories. I’ve listed them below, along with a few examples from each category.

Creation

Creation functions are used to create new observables. Some of the most commonly used are from and interval. Note: These are actually functions, not operators, as they do not go inside the pipe.

  • from<T>(input: ObservableInput<T>, scheduler?: SchedulerLike): Observable<T>

From converts almost anything to an observable, including promises, arrays, or any iterable. This makes it easy to integrate with libraries that use promises rather than observables, among other uses.

  • timer(dueTime: number | Date = 0, periodOrScheduler?: number | SchedulerLike, scheduler?: SchedulerLike): Observable<number>

Timer creates an observable that emits incremental numbers periodically in time. This is useful for polling, or any other event that needs to be processed on a fixed interval.

Combination

Combination functions are used to join multiple observables into one. Some of the most commonly used are forkJoin and merge. Note: These are actually functions, not operators, as they do not go inside the pipe.

  • forkJoin(…sources: any[]): Observable<any>

ForkJoin takes in an arbitrary number of observables, waits for them all to complete, and then emits the last value from each. This is particularly useful for operations that need to wait for a number of asynchronous processes to complete before they can begin. For those familiar with promises, this is the RxJS equivalent to Promise.all().

  • merge<T, R>(…observables: Array<ObservableInput<any> | SchedulerLike | number>): Observable<R>

Just like forkjoin, merge takes in an arbitrary number of observables and combines them into one. Unlike forkJoin, it does not wait for the inner observables to complete, instead emitting every value from its inner observables as they occur. One common use case is when you want the same action to be set off by more than one different trigger, like a task that runs on a schedule but can also be started manually.

Transformation

Transformation operators allow you to change values as they pass through the stream. Some of the most commonly used are map, mergeMap, and switchMap.

  • map<T, R>(project: (value: T, index: number) => R, thisArg?: any): OperatorFunction<T, R>

Map passes each value in the stream through the provided projection function. This is very similar to how map works on arrays.

  • mergeMap<T, R, O extends ObservableInput<any>>(project: (value: T, index: number) => O, resultSelector?: ((outerValue: T, innerValue: ObservedValueOf<O>, outerIndex: number, innerIndex: number) => R) | number, concurrent: number = Number.POSITIVE_INFINITY): OperatorFunction<T, ObservedValueOf<O> | R>

MergeMap is a type of higher-order operator, which means it can flatten new observables into the stream. For every value that comes through the stream, mergeMap maps it to a new observable. This makes it very useful for chaining multiple asynchronous operations without falling into the anti-pattern of nested subscribes.

  • switchMap<T, R, O extends ObservableInput<any>>(project: (value: T, index: number) => O, resultSelector?: (outerValue: T, innerValue: ObservedValueOf<O>, outerIndex: number, innerIndex: number) => R): OperatorFunction<T, ObservedValueOf<O> | R>

Just like mergeMap, switchMap is a type of higher-order operator. For every value that comes through the stream, switchMap maps it to an observable, then flattens all of these inner Observables into the stream. The difference is that switchMap cancels all previous inner observables every time a new one is created. That makes it useful for applications like polling or typeahead (see examples below), where old values don’t matter once newer values are available.

Filtering

Filtering operators help us filter the stream of values from an observable to just process what we need. Some of the most useful operators in this category are debounceTime, distinctUntilChanged, filter, and takeUntil.

  • debounceTime<T>(dueTime: number, scheduler: SchedulerLike = async): MonoTypeOperatorFunction<T>

DebounceTime is useful for filtering bursty user interactions for applications such as typeahead. It allows us to filter out all but the most recent value from each burst values in the stream, with the length of a burst being determined by the dueTime parameter.

  • distinctUntilChanged<T, K>(compare?: (x: K, y: K) => boolean, keySelector?: (x: T) => K): MonoTypeOperatorFunction<T>

DistinctUntilChanged allows us to filter out repeated values from the stream. This is useful for avoiding unnecessary duplicate processing.

  • filter<T>(predicate: (value: T, index: number) => boolean, thisArg?: any): MonoTypeOperatorFunction<T>

Filter allows you to filter the stream of values with a custom predicate. Resulting observable will only emit values that match the predicate. This is very similar to how filter works on arrays.

  • takeUntil<T>(notifier: Observable<any>): MonoTypeOperatorFunction<T>

TakeUntil only lets values pass until a second Observable, the notifier, emits a value. Then, it completes. This is useful as an alternative to keeping track of your subscriptions and unsubscribing from them manually when a component is destroyed (This is discussed further later in this article).

Utility

There are so many operators in RxJS that some don’t neatly fall into categories. But these operators, such as tap, finalize, and timeout, can still be very useful.

  • tap<T>(nextOrObserver?: PartialObserver<T> | ((x: T) => void), error?: (e: any) => void, complete?: () => void): MonoTypeOperatorFunction<T>

Tap intercepts each value in the stream and runs a function on it, but does not modify the value. It is commonly used for debugging by inserting a call to console.log into the stream.

  • finalize<T>(callback: () => void): MonoTypeOperatorFunction<T>

Finalize will call the provided function when the source observable completes, even if it completes due to an error.

  • timeout<T>(due: number | Date, scheduler: SchedulerLike = async): MonoTypeOperatorFunction<T>

Timeout allows you to cause observables to emit an error if the source observable does not emit within a specified amount of time.

Error handling

One of the benefits of RxJS is that it allows you to gracefully handle asynchronous errors and optionally retry, without the need to rely on try/catch blocks. Some of the most useful operators for this purpose include catchError, retry, and defaultIfEmpty.

  • catchError<T, O extends ObservableInput<any>>(selector: (err: any, caught: Observable<T>) => O): OperatorFunction<T, T | ObservedValueOf<O>>

As the name suggests, catchError allows you to catch errors in the stream, and handle them by returning a new observable. This is useful for catching and handling known types of errors before they get to the subscriber.

  • retry<T>(count: number = 1): MonoTypeOperatorFunction<T>

Retry allows you to retry the observable up to a certain number of times in the event of an error.

  • defaultIfEmpty<T, R>(defaultValue: R = null): OperatorFunction<T, T | R>

DefaultIfEmpty allows you to provide a default value if the source observable completes without emitting any values. This is useful if we need subsequent steps in the chain to complete even if the source observable doesn’t emit.

Multicasting

  • share<T>(): MonoTypeOperatorFunction<T>

Share allows us to multicast an observable. This is useful for observables that will have multiple subscribers, but shouldn’t duplicate execution for every subscriber. 

Use Cases

The best way to understand what all these operators do is to see them in use. Toward that end, here are some examples of use cases that RxJS is particularly suited for.

HTTP Polling

This example uses the timer function to create an observable that fires every 5 seconds, the switchMap operator to transform each event into an HTTP request, and the map operator to extract the data from the response. In order to avoid having to subscribe and unsubscribe manually, we use Angular’s async pipe to display the results in the template.

HTTP Retry and Error Handling

Spotty internet connections often cause HTTP requests to fail intermittently. We can greatly improve the user experience in this situation by automatically retrying failed requests and providing visible error messages rather than failing silently. With Angular and RxJS, we can create an HttpInterceptor that will add retry and error handling logic to every HTTP request we make.

In this example, you can trigger an HTTP request by hitting the “Make request” button. Much like the previous example, this will use the switchMap operator to make the request, the map operator to extract the relevant data, and Angular’s async pipe to display the results. However, we also have the option to trigger an error by using the wrong URL in our request. Because we have an HttpInterceptor defined in ‘http-error.interceptor.ts’ and provided in ‘app.module.ts’, this request will be retried two more times. If the request fails all three times, the user will see an alert pop up to inform them of the issue. This is done with the retry and catchError operators, respectively.

Typeahead

With the tools that RxJS provides for us, implementing complex behavior like typeahead with debouncing becomes trivial. This example uses RxJS operators such as debounceTime (to prevent multiple requests within a short time period), distinctUntilChanged (to filter out duplicate requests), and filter (to prevent querying the API without a search term, in this case). We also make use of some of the same operators as previous examples, like switchMap, catchError, and map.

Further reading

That’s it for now, but there’s always more to learn! Tune in again next year as we continue this series in Part 3.  For now, you can continue with checking out some of the related topics listed below:

References