FrankKai / FrankKai.github.io

FE blog
https://frankkai.github.io/
362 stars 39 forks source link

Rxjs那些事儿 #256

Open FrankKai opened 2 years ago

FrankKai commented 2 years ago
FrankKai commented 2 years ago

of

将参数转化为一个可观察序列。

每个参数都会作为下一个通知。 image

示例

发射数据10,20,30

import { of } from 'rxjs';
of(10,20,30).subscribe(
 next => console.log('next:', next),
  err => console.log('error:', err),
  () => console.log('the end'),
)

发射数组[]10,20,30]

import { of } from 'rxjs';

of([1, 2, 3])
.subscribe(
  next => console.log('next:', next),
  err => console.log('error:', err),
  () => console.log('the end'),
);

// Outputs
// next: [1, 2, 3]
// the end
FrankKai commented 2 years ago

from

将数组,类数组,Promise,可迭代对象等等,转化为Observable。 一句话总结:将万物转化为Observable image

例子

同步流

import { from } from 'rxjs';

const array = [10, 20, 30];
const result = from(array);

console.log('start');

// 同步订阅
result.subscribe((x) => console.log(x));

console.log('end');

// Logs:
// 10
// 20
// 30

异步流

import { from, asyncScheduler } from 'rxjs';

console.log('start');

const array = [10, 20, 30];
const result = from(array, asyncScheduler);

result.subscribe(x => console.log(x));

console.log('end');

// Logs:
// start
// end
// 10
// 20
// 30
FrankKai commented 2 years ago

map

对observable发出的值,都经过map函数的处理,并且作为结果返回。 image

对每个值都乘以10

import { of, map } from 'rxjs';

of(1, 2, 3)
  .pipe(map((num) => num * 10))
  .subscribe(
    (next) => console.log('next:', next),
    (err) => console.log('error:', err),
    () => console.log('the end')
  );
// next:10
// next:20
// next:30
// end
FrankKai commented 2 years ago

timer

timer observable很适合在代码中创建延时,或者是与其他特定的值做race。 delay默认情况下是毫秒。

每隔2s,执行一次回调函数(0~2秒内不触发)

import { timer } from 'rxjs';

timer(0, 2000).subscribe((n) => console.log('timer', n));
FrankKai commented 2 years ago

interval

可用于准时返回递增数字。 image

例子

每秒+1,加到3秒

import { interval } from 'rxjs';
import { take } from 'rxjs/operators';

const numbers = interval(1000);

const takeFourNumbers = numbers.pipe(take(4));

takeFourNumbers.subscribe(x => console.log('Next: ', x));

// Logs:
// Next: 0
// Next: 1
// Next: 2
// Next: 3

每隔2s,执行一次回调函数(0~2秒内触发)

import { interval } from 'rxjs';

interval(2000).subscribe((n) => console.log('timer', n));
FrankKai commented 2 years ago

Observable

通过Observable创建一个流,并且通过subscriber.next控制输出。

import { Observable } from 'rxjs';

const stream$ = new Observable((subscriber) => {
  setTimeout(() => {
    subscriber.next([1, 2, 3]);
  }, 500);
  setTimeout(() => {
    subscriber.next({ a: 1000 });
  }, 1000);
  setTimeout(() => {
    subscriber.next('end');
  }, 3000);
  setTimeout(() => {
    subscriber.complete();
  }, 4000);
});

// 启动流
stream$.subscribe({
  complete: () => console.log('done'),
  next: (v) => console.log(v),
  error: () => console.log('error'),
});
FrankKai commented 2 years ago

Subject

What is a Subject? An RxJS Subject is a special type of Observable that allows values to be multicasted to many Observer

是一种特殊类型的Observable,可以将消息同时广播给多个Observer,也就是多播。 而普通Observable则对应一个Observer,是单播。 可以理解为最简发布订阅。

import { Subject } from 'rxjs';

// 创建subject
const subject = new Subject();

// 订阅一个observer
subject.subscribe((v) => console.log('stream 1', v));
// 再订阅一个observer
subject.subscribe((v) => console.log('stream 2', v));
// 延时1s再订阅一个observer
setTimeout(() => {
  subject.subscribe((v) => console.log('stream 3', v));
}, 1000);
// 产生数据1
subject.next(1);
// 产生数据2
subject.next(2);
// 延时3s产生数据3
setTimeout(() => {
  subject.next(3);
}, 3000);
// output
// stream 1 1 //立即输出
// stream 2 1 //立即输出
// stream 1 2 //立即输出
// stream 2 2 //立即输出
// stream 1 3 //3s后输出
// stream 2 3 //3s后输出
// stream 3 3 //3s后输出
FrankKai commented 2 years ago

BehaviorSubject(与Subject区别、在medusa中的应用)

Subject的变体,需要初始值,并在订阅时发出其当前值。

注意:与Subject不同的是,需要初始值,而且subject.next会在所有订阅函数中执行(无论前后位置)。

import { BehaviorSubject } from 'rxjs';

const subject = new BehaviorSubject(123);

subject.subscribe(console.log);
subject.subscribe(console.log);

subject.next(456);

subject.subscribe(console.log);

subject.next(789);

// 123
// 123
// 456
// 456
// 456
// 789
// 789
// 789

Subject只会被位于subject.next之前订阅函数处理。

const subject = new Subject();

subject.subscribe(console.log);
subject.subscribe(console.log);

subject.next(101112); // 注意这里,只打印了2次

subject.subscribe(console.log);

subject.next(131415);
// 101112
// 101112
// 131415
// 131415
// 131415

tuya的微前端框架medusa,有用到BehaviorSubject这个类,用于主子应用做通信。

源码地址:https://github.com/tuya/medusa/blob/main/packages/medusa/src/packages/store.ts

发送数据

import { dispatch } from 'mmed/client';
dispatch("Hello World", false, 'foo')

接收数据

import { subscribe } from 'mmed/client';

useEffect(() => {
  const stream$ = subscribe((data) => {
    console.log(data) // Hello World
  }, 'foo');
  return () => {
    stream$.unsubscribe();
  };
}, [])
FrankKai commented 2 years ago

Rxjs流最简流程(创建流、处理流、启动流、停止流)

分为4步:

  1. 创建流
  2. 处理流(或者说消费流)
  3. 启动流
  4. 停止流
import { Observable } from "rxjs";

// 创建流 - Observable
const stream$ = new Observable(subscriber => {
  setTimeout(() => {
    subscriber.next([1, 2, 3]);
  }, 500);
});

// 处理流(消费流产生的数据)- Observer 
const observer = {
  complete: () => console.log("done"),
  next: v => console.log(v),
  error: () => console.log("error")
};

// 启动流 - Subscription
const subscription = stream$.subscribe(observer);

setTimeout(() => {
  // 停止流
  subscription.unsubscribe();
}, 1000);
FrankKai commented 2 years ago

Rxjs在React框架中使用

这是一个例子,模拟了如何使用Rxjs,改造 前端请求接口后返回的数据,与prop数组做拼接 这种场景。

传统方式

import * as React from 'react';

const GreetSomeone = ({ greet = 'Hello' }) => {
  const [greeting, setGreeting] = React.useState('');
  const [name, setName] = React.useState('');

  React.useEffect(() => {
    setTimeout(() => {
      setName('World');
    }, 3000);
  }, []);

  React.useEffect(() => {
    setGreeting(`${greet}, ${name}!`);
  }, [greet, name]);

  return <p>{greeting}</p>;
};

export default GreetSomeone;

rxjs方式

import * as React from 'react';
import { combineLatest, from, of, Observable, BehaviorSubject } from 'rxjs';
import { catchError, map, startWith } from 'rxjs/operators';

const GreetSomeone = ({ greet = 'Hello' }) => {
  const [greeting, setGreeting] = React.useState('');
  // 创建greet流
  const greet$ = React.useRef(new BehaviorSubject(greet));

  React.useEffect(() => {
    greet$.current.next(greet);
  }, [greet]);

  React.useEffect(() => {
    console.log('创建流');
    // 创建name流
    const name$ = from(
      // 模拟远程搜索数据
      new Observable((subscriber) => {
        setTimeout(() => {
          subscriber.next('World');
        }, 3000);
      })
    ).pipe(
      startWith('_____'),
      catchError(() => of('Mololongo'))
    );
    // 创建greet和name合并流
    const greeting$ = combineLatest(greet$.current, name$).pipe(
      map(([greet, name]) => `${greet}, ${name}!`)
    );

    // 启动合并流
    const subscription = greeting$.subscribe((value) => {
      setGreeting(value);
    });

    return () => {
      subscription.unsubscribe();
    };
  }, []);

  return <p>{greeting}</p>;
};

export default GreetSomeone;

demo地址:https://stackblitz.com/edit/react-ts-jmk8wd?file=Hello.tsx