What is RxJS?
RxJS (Reactive Extensions for JavaScript) is the JavaScript implementation of the ReactiveX API. It brings the concept of reactive programming to JavaScript applications, enabling composition of asynchronous and event-based programs using observable sequences.
RxJS is a core dependency of Angular and is widely used for handling HTTP requests, user events, WebSockets, and any stream of data over time.
Core Concepts
Observable
A lazy push-based collection that emits values over time. It does nothing until subscribed to. Can emit zero or more values and optionally complete or error.
Observer
A consumer of values delivered by an Observable. An observer is an object with three callbacks:
next(), error(), and complete().Subscription
The link between an Observable and an Observer. Created by calling
observable.subscribe(). Must be unsubscribed to avoid memory leaks.Subject
A special Observable that is also an Observer — it can multicast values to multiple subscribers. Types:
Subject, BehaviorSubject, ReplaySubject, AsyncSubject.Operators
Pure functions that transform, filter, combine, or create Observables. Examples:
map, filter, mergeMap, switchMap, combineLatest.Scheduler
Controls when a subscription starts and when notifications are delivered. Used for controlling concurrency and timing of Observable execution.
Code Examples
Creating an Observable
import { Observable } from 'rxjs';
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
observable.subscribe({
next: value => console.log('Value:', value),
error: err => console.error('Error:', err),
complete: () => console.log('Done!')
});
// Value: 1 | Value: 2 | Value: 3 | Done!Common Creation Operators
import { of, from, interval, fromEvent } from 'rxjs';
of(1, 2, 3).subscribe(console.log); // Emit fixed values
from([10, 20, 30]).subscribe(console.log); // Emit from array/promise
interval(1000).subscribe(console.log); // Emit every 1 second
fromEvent(button, 'click').subscribe(e => console.log('Clicked!'));Pipeable Operators
import { of } from 'rxjs';
import { map, filter, take } from 'rxjs/operators';
of(1, 2, 3, 4, 5).pipe(
filter(n => n % 2 === 0), // Keep even numbers
map(n => n * 10), // Multiply by 10
take(2) // Take only first 2
).subscribe(console.log);
// 20 | 40switchMap — HTTP Request Example
import { fromEvent } from 'rxjs';
import { switchMap, debounceTime, distinctUntilChanged } from 'rxjs/operators';
// Search input that cancels previous HTTP request on new keystroke
fromEvent(searchInput, 'input').pipe(
debounceTime(300),
distinctUntilChanged(),
switchMap(event => this.http.get(`/api/search?q=${event.target.value}`))
).subscribe(results => console.log(results));BehaviorSubject
import { BehaviorSubject } from 'rxjs';
// Commonly used for state management in Angular services
const count$ = new BehaviorSubject(0);
count$.subscribe(val => console.log('Count:', val)); // Count: 0
count$.next(1); // Count: 1
count$.next(2); // Count: 2
console.log(count$.getValue()); // 2 (get current value synchronously) mergeMap — Concurrent Inner Observables
import { of } from 'rxjs';
import { mergeMap, delay } from 'rxjs/operators';
// Subscribes to all inner observables concurrently - order not guaranteed
of('A', 'B', 'C').pipe(
mergeMap(letter =>
of(`${letter} done`).pipe(delay(Math.random() * 1000))
)
).subscribe(console.log);
// Output order may vary: B done | A done | C doneconcatMap — Sequential Inner Observables
import { of } from 'rxjs';
import { concatMap, delay } from 'rxjs/operators';
// Waits for each inner observable to complete before starting next
of('First', 'Second', 'Third').pipe(
concatMap(item =>
of(`${item} saved`).pipe(delay(500))
)
).subscribe(console.log);
// First saved | Second saved | Third saved (in order, 500ms apart)combineLatest — Combine Multiple Streams
import { combineLatest, BehaviorSubject } from 'rxjs';
import { map } from 'rxjs/operators';
const firstName$ = new BehaviorSubject('John');
const lastName$ = new BehaviorSubject('Doe');
combineLatest([firstName$, lastName$]).pipe(
map(([first, last]) => `${first} ${last}`)
).subscribe(name => console.log(name));
// John Doe
firstName$.next('Jane');
// Jane DoeforkJoin — Wait for All to Complete
import { forkJoin } from 'rxjs';
// Like Promise.all() - emits last value from each when all complete
forkJoin({
user: this.http.get('/api/user/1'),
posts: this.http.get('/api/posts'),
profile: this.http.get('/api/profile/1'),
}).subscribe(({ user, posts, profile }) => {
console.log('All loaded:', user, posts, profile);
});catchError & retry — Error Handling
import { of } from 'rxjs';
import { catchError, retry } from 'rxjs/operators';
this.http.get('/api/data').pipe(
retry(2), // Retry up to 2 times on error
catchError(err => {
console.error('Error:', err);
return of([]); // Return fallback value
})
).subscribe(data => console.log(data));takeUntil — Unsubscribe on Destroy
import { Subject, interval } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
private destroy$ = new Subject<void>();
ngOnInit() {
interval(1000).pipe(
takeUntil(this.destroy$) // Auto-unsubscribe when component destroys
).subscribe(n => console.log(n));
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}Popular Operators Quick Reference
map
Transform each value
filter
Keep values matching condition
tap
Side effects without changing stream
take
Take first N values
takeUntil
Complete when notifier emits
switchMap
Cancel previous, switch to new inner observable
mergeMap
Merge all inner observables concurrently
concatMap
Queue inner observables sequentially
combineLatest
Combine latest values from multiple observables
forkJoin
Wait for all observables to complete
debounceTime
Emit after silence period
distinctUntilChanged
Skip duplicate consecutive values