Tutorials Logic, IN +91 8092939553 info@tutorialslogic.com
Navigation
Home About Us Contact Us Blogs FAQs
Tutorials
All Tutorials
Services
Academic Projects Resume Writing Interview Questions Website Development
Compiler Tutorials
RxJS

RxJS — Reactive Extensions for JavaScript

A library for composing asynchronous and event-based programs using observable sequences.

Reactive Programming Observables Angular

What is RxJS?

RxJS (Reactive Extensions for JavaScript) is the JavaScript implementation of the ReactiveX API. It brings the concept of reactive programming to JavaScript applications, enabling composition of asynchronous and event-based programs using observable sequences.

RxJS is a core dependency of Angular and is widely used for handling HTTP requests, user events, WebSockets, and any stream of data over time.

Core Concepts

Observable
A lazy push-based collection that emits values over time. It does nothing until subscribed to. Can emit zero or more values and optionally complete or error.
Observer
A consumer of values delivered by an Observable. An observer is an object with three callbacks: next(), error(), and complete().
Subscription
The link between an Observable and an Observer. Created by calling observable.subscribe(). Must be unsubscribed to avoid memory leaks.
Subject
A special Observable that is also an Observer — it can multicast values to multiple subscribers. Types: Subject, BehaviorSubject, ReplaySubject, AsyncSubject.
Operators
Pure functions that transform, filter, combine, or create Observables. Examples: map, filter, mergeMap, switchMap, combineLatest.
Scheduler
Controls when a subscription starts and when notifications are delivered. Used for controlling concurrency and timing of Observable execution.

Code Examples

Creating an Observable

TypeScript
import { Observable } from 'rxjs';

const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});

observable.subscribe({
  next: value => console.log('Value:', value),
  error: err  => console.error('Error:', err),
  complete: () => console.log('Done!')
});
// Value: 1 | Value: 2 | Value: 3 | Done!

Common Creation Operators

TypeScript
import { of, from, interval, fromEvent } from 'rxjs';

of(1, 2, 3).subscribe(console.log);           // Emit fixed values
from([10, 20, 30]).subscribe(console.log);    // Emit from array/promise
interval(1000).subscribe(console.log);        // Emit every 1 second
fromEvent(button, 'click').subscribe(e => console.log('Clicked!'));

Pipeable Operators

TypeScript
import { of } from 'rxjs';
import { map, filter, take } from 'rxjs/operators';

of(1, 2, 3, 4, 5).pipe(
  filter(n => n % 2 === 0),   // Keep even numbers
  map(n => n * 10),            // Multiply by 10
  take(2)                      // Take only first 2
).subscribe(console.log);
// 20 | 40

switchMap — HTTP Request Example

TypeScript
import { fromEvent } from 'rxjs';
import { switchMap, debounceTime, distinctUntilChanged } from 'rxjs/operators';

// Search input that cancels previous HTTP request on new keystroke
fromEvent(searchInput, 'input').pipe(
  debounceTime(300),
  distinctUntilChanged(),
  switchMap(event => this.http.get(`/api/search?q=${event.target.value}`))
).subscribe(results => console.log(results));

BehaviorSubject

TypeScript
import { BehaviorSubject } from 'rxjs';

// Commonly used for state management in Angular services
const count$ = new BehaviorSubject(0);

count$.subscribe(val => console.log('Count:', val)); // Count: 0

count$.next(1); // Count: 1
count$.next(2); // Count: 2

console.log(count$.getValue()); // 2 (get current value synchronously)

mergeMap — Concurrent Inner Observables

TypeScript
import { of } from 'rxjs';
import { mergeMap, delay } from 'rxjs/operators';

// Subscribes to all inner observables concurrently - order not guaranteed
of('A', 'B', 'C').pipe(
  mergeMap(letter =>
    of(`${letter} done`).pipe(delay(Math.random() * 1000))
  )
).subscribe(console.log);
// Output order may vary: B done | A done | C done

concatMap — Sequential Inner Observables

TypeScript
import { of } from 'rxjs';
import { concatMap, delay } from 'rxjs/operators';

// Waits for each inner observable to complete before starting next
of('First', 'Second', 'Third').pipe(
  concatMap(item =>
    of(`${item} saved`).pipe(delay(500))
  )
).subscribe(console.log);
// First saved | Second saved | Third saved (in order, 500ms apart)

combineLatest — Combine Multiple Streams

TypeScript
import { combineLatest, BehaviorSubject } from 'rxjs';
import { map } from 'rxjs/operators';

const firstName$ = new BehaviorSubject('John');
const lastName$  = new BehaviorSubject('Doe');

combineLatest([firstName$, lastName$]).pipe(
  map(([first, last]) => `${first} ${last}`)
).subscribe(name => console.log(name));
// John Doe

firstName$.next('Jane');
// Jane Doe

forkJoin — Wait for All to Complete

TypeScript
import { forkJoin } from 'rxjs';

// Like Promise.all() - emits last value from each when all complete
forkJoin({
  user:    this.http.get('/api/user/1'),
  posts:   this.http.get('/api/posts'),
  profile: this.http.get('/api/profile/1'),
}).subscribe(({ user, posts, profile }) => {
  console.log('All loaded:', user, posts, profile);
});

catchError & retry — Error Handling

TypeScript
import { of } from 'rxjs';
import { catchError, retry } from 'rxjs/operators';

this.http.get('/api/data').pipe(
  retry(2),                        // Retry up to 2 times on error
  catchError(err => {
    console.error('Error:', err);
    return of([]);                 // Return fallback value
  })
).subscribe(data => console.log(data));

takeUntil — Unsubscribe on Destroy

TypeScript
import { Subject, interval } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

private destroy$ = new Subject<void>();

ngOnInit() {
  interval(1000).pipe(
    takeUntil(this.destroy$)   // Auto-unsubscribe when component destroys
  ).subscribe(n => console.log(n));
}

ngOnDestroy() {
  this.destroy$.next();
  this.destroy$.complete();
}

Popular Operators Quick Reference

map
Transform each value
filter
Keep values matching condition
tap
Side effects without changing stream
take
Take first N values
takeUntil
Complete when notifier emits
switchMap
Cancel previous, switch to new inner observable
mergeMap
Merge all inner observables concurrently
concatMap
Queue inner observables sequentially
combineLatest
Combine latest values from multiple observables
forkJoin
Wait for all observables to complete
debounceTime
Emit after silence period
distinctUntilChanged
Skip duplicate consecutive values


Ready to Level Up Your Skills?

Explore 500+ free tutorials across 20+ languages and frameworks.