Open reboottime opened 1 year ago
Data Streams: In Reactive Programming, everything can be represented as a stream of data. This includes variables, user inputs, properties, caches, data structures, etc. A stream can emit multiple values over time, unlike a traditional function that returns a single value.
Asynchronous Data Flow: Data streams are inherently asynchronous. Reactive Programming handles data that flows over time and allows for asynchronous data handling, making it ideal for applications that deal with events, real-time updates, and asynchronous computations.
Reactive Systems: These systems are responsive, resilient, elastic, and message-driven. They react to changes in the input data, maintaining a dynamic environment. Reactive systems are highly responsive, providing real-time feedback based on user interactions or other input. ( I do not understand deeply, need use case and practices to understand its implication)
Observer Pattern: This pattern is fundamental in Reactive Programming. Observers subscribe to data streams and react to emitted values. The pattern consists of three parts:
Functional Programming Techniques: Reactive Programming often leverages functional programming concepts like
These principles facilitate the creation of side effect-free and scalable applications.
The concepts and their associated code patterns discussed below were learned from Derek Banas's tutorial on the Observer Design Pattern, available at this link.
Core concepts
When to Use the Observer Pattern
interface Observer {
notify: (data: any) => void;
}
interface Subject { attach: (observer: Observer) => void; detach: (observer: Observer) => void; notifyObservers: () => void; }
- Subject code
```ts
class ConcreteSubject implements Subject {
private observers: Observer[] = [];
// Any state changes that the observers should know about
private state: any;
public attach(observer: Observer): void {
const isExist = this.observers.includes(observer);
if (isExist) {
return console.log('Subject: Observer has been attached already.');
}
console.log('Subject: Attached an observer.');
this.observers.push(observer);
}
public detach(observer: Observer): void {
const observerIndex = this.observers.indexOf(observer);
if (observerIndex === -1) {
return console.log('Subject: Nonexistent observer.');
}
this.observers.splice(observerIndex, 1);
console.log('Subject: Detached an observer.');
}
public notifyObservers(): void {
console.log('Subject: Notifying observers...');
for (const observer of this.observers) {
observer.notify(this.state);
}
}
// Usually, the subscription logic is only a fraction of what a Subject can do.
// Subjects commonly hold some important business logic, that triggers a notification
// method whenever something important is about to happen (or after it).
public someBusinessLogic(): void {
console.log('\nSubject: I\'m doing something important.');
this.state = Math.floor(Math.random() * 10);
console.log(`Subject: My state has just changed to: ${this.state}`);
this.notifyObservers();
}
}
class ConcreteObserver implements Observer {
// Receive update from subject
public notify(data: any): void {
console.log(`Observer: Reacted to the event with data: ${data}`);
}
}
const subject = new ConcreteSubject();
const observer1 = new ConcreteObserver(); subject.attach(observer1);
const observer2 = new ConcreteObserver(); subject.attach(observer2);
subject.someBusinessLogic(); subject.someBusinessLogic();
subject.detach(observer2);
subject.someBusinessLogic();
Take the burger shop (as in this tutorial) as an analogy:
Good practices: Each operator on the pipe takes a single responsibility
Something in mind: When an error occurs within a pipe for a specific event in a data stream, subsequent events are ignored because the stream terminates due to the error.
Sample code snippets:
const { Observable } = require("rxjs");
const { pluck, map, filter } = require("rxjs/operators");
const users = {
data: [
{
id: 1,
status: "active",
age: 14,
},
{
id: 1,
status: "inactive",
age: 12,
},
{
id: 1,
status: "active",
age: 42,
},
{
id: 1,
status: "inactive",
age: 42,
},
{
id: 1,
status: "active",
age: 13,
},
{
id: 1,
status: "inactive",
age: 75,
},
{
id: 1,
status: "inactive",
age: 43,
},
{
id: 1,
status: "inactive",
age: 54,
},
{
id: 1,
status: "active",
age: 7,
},
{
id: 1,
status: "active",
age: 17,
},
],
};
const observable = new Observable((subscriber) => {
subscriber.next(users);
}).pipe(
pluck("data"),
filter((users) => users.length >= 10),
map((users) => {
return users.filter((user) => user.status === "active");
}),
map((users) => {
return users.reduce((sum, user) => sum + user.age, 0) / users.length;
}),
map((average) => {
if (average < 18) throw new Error(`Average age is too small (${average})`);
else return average;
}),
map((average) => `The average age is ${average}`)
);
const observer = {
next: (x) => console.log("Observer got a next value: " + x),
error: (err) => console.error("Observer got an error: " + err),
complete: () => console.log("Observer got a complete notification"),
};
const observer2 = {
next: (x) => console.log("Observer 2 got a next value: " + x),
error: (err) => console.error("Observer 2 got an error: " + err),
complete: () => console.log("Observer 2 got a complete notification"),
};
observable.subscribe(observer);
observable.subscribe(observer2);
Overview
Introduction
This article attempts to seize Rxjs essentials in a short time. The content covers two parts:
As RxJS declares itself as a library for reactive programming using Observables, this article starts with understanding reactive programming essentials.
Quick References