Skip to content

Rxjs

80 行代码实现简易 RxJS

rxjs在react中的最佳实践

RxJS(Reactive Extensions for JavaScript)是一个强大的库,用于处理异步数据流和事件驱动编程。 在React应用中使用RxJS,可以有效地管理和组合复杂的异步操作,如数据获取、用户输入、定时任务等。 以下是一些在React中使用RxJS的最佳实践:

1. 明确区分副作用与纯逻辑

  • 纯逻辑:在React组件内部,使用RxJS操作符来处理纯逻辑,如过滤、映射、合并等,这些操作应当与React的状态管理、渲染逻辑分离。
  • **副作用 **:使用useEffect Hook(或类组件中的生命周期方法)来订阅RxJS Observable,处理副作用,如发起网络请求、更新DOM等。确保在适当的时候订阅和取消订阅,避免内存泄漏。

2. 使用自定义 Hooks

  • 封装RxJS逻辑到自定义 Hooks 中,使代码更易于复用和测试。例如,可以创建一个useObservable Hook,用于订阅Observable并返回其最新值:
tsx
import { useEffect, useState } from 'react';
import { Observable } from 'rxjs';

function useObservable<T>(observable$: Observable<T>, initialState: T): T {
  const [state, setState] = useState(initialState);

  useEffect(() => {
    const subscription = observable$.subscribe(setState);
    return () => subscription.unsubscribe();
  }, [observable$]);

  return state;
}

3. 结合React Hooks

  • 使用useStateuseEffect与RxJS操作符协同工作。例如,使用useState存储Observable的最新值,然后在useEffect 中订阅Observable并更新状态:
jsx
import { useEffect, useState } from 'react';
import { interval, of } from 'rxjs';
import { map } from 'rxjs/operators';

function Counter() {
  const [count, setCount] = useState(0);

  useEffect(() => {
    const counter$ = interval(1000).pipe(
      map(i => i + 1)
    );

    const subscription = counter$.subscribe(setCount);
    return () => subscription.unsubscribe();
  }, []);

  return <h1>{count}</h1>;
}

4. 利用withLatestFromcombineLatest等操作符处理关联数据

  • 当多个数据源需要同时更新UI时,使用withLatestFromcombineLatest等操作符来合并多个Observable,确保在渲染时数据是最新的:
jsx
import { useEffect, useState } from 'react';
import { combineLatest, of } from 'rxjs';
import { map } from 'rxjs/operators';

function UserDetails() {
  const [userId, setUserId] = useState(1);

  useEffect(() => {
    const userDetails$ = combineLatest([
      of(userId),
      userService.getUserById(userId)
    ]).pipe(
      map(([_, user]) => user)
    );

    const subscription = userDetails$.subscribe(setUserDetails);
    return () => subscription.unsubscribe();
  }, [userId]);

  return <UserCard user={userDetails} />;
}

5. 使用BehaviorSubjectReplaySubject实现状态管理

  • 如果需要替代或补充传统的状态管理库(如Redux、MobX),可以使用BehaviorSubjectReplaySubject 作为应用状态的中心存储,并提供自定义Hooks供组件订阅所需状态:
jsx
import { BehaviorSubject } from 'rxjs';

const appState$ = new BehaviorSubject({ count: 0 });

function useAppState() {
  return appState$.getValue();
}

function incrementCounter() {
  const currentState = appState$.getValue();
  appState$.next({ ...currentState, count: currentState.count + 1 });
}

6. 使用rxjs-hooks

  • 如果觉得手动管理Observable订阅繁琐,可以使用rxjs-hooks库,它提供了与React Hooks更紧密集成的API:
jsx
import { useObservableState } from 'rxjs-hooks';

function Counter() {
  const count = useObservableState(counter$, 0);

  return <h1>{count}</h1>;
}

7. 避免过度使用

  • 虽然RxJS功能强大,但并不意味着所有场景都需要使用。对于简单的状态管理、一次性异步操作,使用原生的React Hooks(如useStateuseEffect)或简化的库(如swraxios-hooks)可能更为合适。

综上所述,要在React中优雅地使用RxJS,关键在于合理划分职责、充分利用React Hooks、恰当选择RxJS操作符,以及适时使用辅助库来简化集成。 通过这些最佳实践,可以充分发挥RxJS在处理复杂异步逻辑上的优势,同时保持React应用的清晰架构和良好性能。

Rxjs 与 ReactiveX

针对复杂的、多状态、异步、注重时序控制的场景,天然有一种技术,或者说是一种编程思想是为此而生的,那就是 FRP(Functional Reactive Programming),

而在 FRP 领域,ReactivX,简称 Rx,是由微软推出的通过可观察的流来进行异步编程的 API 则是 FRP 最经典的实现范本之一。

ReactiveX 的官方:https://reactivex.io/

什么是 RxJS?

RxJS(Reactive Extension for JavaScript),通过 RxJS 整合了异步编程、时序控制以及各种遵循函数式编程思想的 Operators 的特性,优雅的实现了这种复杂的状态机。

其中的 Rx 是指 Reactive Extension,有各种语言的实现,如 RxJava,RxPy,RxGo 等等...

RxJS指响应式编程(Reactive Programming)这种编程范式在 JavaScript 的一种实现。

ReactiveX 结合了观察者模式、迭代器模式与函数式编程的精华思想,其中迭代器模式和数组就有着不一般的关系

虽然 RxJS 最核心的概念:Stream,相比数组多了一个时间维度的概念,可以理解为带上时间属性的 “数组”

Stream 其实也是一个数组,但是 Stream 还拥有一个 “时间” 维度的概念,即随着时间的增长,Stream 上会不断的增加元素

最能体现 RxJS 中对 Stream 这个具有时间维度属性的方法就是 merge 因为 Stream 具有时间的属性,所以上述两个 Stream 合并之后会变成最下面的 Stream,红色和蓝色的 1 会插入在中间,如果在数组里对两个数组进行 merge 只能是 concat:

ts
const arr1 = [20, 40, 60, 80, 100];
const arr2 = [1, 1];

const arr3 = arr1.concat(arr2); // [20, 40, 60, 80, 100, 1, 1]
const arr4 = arr2.concat(arr1); // [1, 1, 20, 40, 60, 80, 100]

这就是时间属性在 Stream 上最明显的体现。

什么是 Reactive Programming?

响应式编程(Reactive Programming)是融合了观察者模式、迭代器模式与函数式编程三者最优秀的观点,从而诞生的一种新的编程范式,是一种以异步数据流(Async Data Stream)为中心的编程范式。

观察者模式与迭代器模式

观察者模式定了一个对象之间的一对多的依赖关系,当目标对象 Subject 更新时,所有依赖此 Subject 的 Observer 都会收到更新。

举个例子🌰

tsx
import { fromEvent } from 'rxjs';

// 创建一个监听 document click 事件的 Observable
const Observable = fromEvent(document, 'click');

// 通过 Observable.subscribe 时,接收一个 Observer 回调,当有点击事件(click)发生时
// 则调用传入的回调函数,即 Observer 会收到更新
const subscription = Observable.subscribe((e) => {
  console.log('dom clicked');
});

const subscription2 = Observable.subscribe((e) => {
  console.log('dom clicked');
});

const subscription3 = Observable.subscribe((e) => {
  console.log('dom clicked');
});

上述代码,当点击 DOM 时,三个 observer (回调函数)都会收到通知,然后打印 dom clicked 语句。

迭代器模式是指提供一种方法顺序访问一个聚合对象中各个元素,而不需要暴露该对象的具体表示,

常见的为部署 Symbol.iterator 属性,调用对应 Symbol.iterator 的方法返回一个迭代器对象,然后就可以以统一的方式进行遍历:

tsx
const arr = ['a', 'b', 'c'];
const iterator = arr[Symbol.iterator]('Symbol.iterator');

iterator.next(); // { value: 'a', done: false }
iterator.next(); // { value: 'b', done: false }
iterator.next(); // { value: 'c', done: false }
iterator.next(); // { value: undefined, done: true }

对应的 RxJS 里面就是 Observable 可观察对象,也就是我们后续将引出的 Stream 流的概念,

每个 Stream/Observable 其实可以看作是一个数组,然后支持数组相关的各种操作、变换等,变成另外一个 Stream/Observable,拿 RxJS 举例:

tsx
import { fromEvent, map } from 'rxjs';

// 创建一个监听 document click 事件的 Observable
const subscription = fromEvent(document, 'click')
  .pipe(map(e => e.target)
    .subscribe((value) => {
      console.log('click: ', value);
    })
  );

fromEvent(document, "click") 会声明一个 Observable 对象,同时也创建了一个 Stream

fromEvent(document, "click") 创建的 Observable 对应着上面的带有箭头的线,这条线就是一个 Stream 流,

上面的一个个 ev 就是每次点击之后产生的事件,随着时间推移,不断的产生事件,在这个线上不断的流动下去

-- 之所以为 Stream,而这个 Stream 其实也可以看作是一个 “数组”,上面的一个个事件即为 “数组” 的元素,

我们可以对这个 “数组”进行遍历,以统一的方式如 map/filter 等进行遍历,所以也叫融合了迭代器模式,

而在 RxJS 中,通过这种 “迭代器” 模式,我们可以方便的对一个 Stream 进行变换

map 将一个 Stream 变换为另外一个 Stream

而最后通过 subscribe 生成了 observer 观察者,当有事件发生时,observer 的回调函数会调用,打印 Log,即融合了观察者模式。

那么函数式是如何应用在 RxJS 里面的呢?细心地同学可能发现了,RxJS 其实提供了大量的 Operators,如 map、filter、scan 等,

以 函数式/声明式 的方式来操作 Stream,且操作之后生成一个新 Stream,不会突变原 Stream,此为融合了函数式编程思想。

主动式与响应式 Proactive VS Reactive

Proactive(主动式):即主动轮询,不停的去问需求方以期完成任务,常见的有设置一个定时器,不断的去给服务器发请求询问是否有新的内容产生。

Reactive(响应式):即有事件发生时,通知我完成任务,常见的有 DOM 事件的监听与触发、WebSocket 等

命令式与函数式 Imperactive VS Functional

命令式:你命令计算机去做事情(how),得到你想要的(what)

声明式:你告诉计算机你需要什么(what),让机器想出如何去做(how)

为什么叫做响应式呢? 因为是对事件源做监听和一系列处理的,这种编程模式就叫做响应式。

为什么叫函数式呢? 因为每一步 operator 都是纯函数,返回一个新的 Observable,这符合函数式的不可变,修改后返回一个新的的理念。

为什么叫流呢? 因为一个个事件是动态产生和传递的,这种数据的动态产生和传递就可以叫做流。

概念理解

  • Observer 通过观察者模式订阅 Observable 产生的数据
  • Observable 是单播的;基于代理对象 Subject 可以支持多播
  • Scheduler 基于事件循环机制和任务调度,可以优化 Observable 产生数据的时机

单播、广播和多播

  • 单播(unicast):一对一
  • 广播(broadcast):全局通知,不区分受众
  • 多播(multicast):可以通知给指定的一群受众

Subject

在 Rxjs 中,Subject 是一个代理对象,它既有 Observable 的接口,又有 Observer 的接口。

利用 Subject 可以实现多播,当下游需要 Hot Observable 的时候,可以订阅一个 Subject 对象。

  • Rxjs 提供了三个基础的多播操作符:

multicast publish shareRxjs

  • 提供了三个高级的多播操作符:

publishLast publishRelpay publishBehavior

  • 这背后其实对应了 Subject 的三个子类提供支持:

AsyncSubject ReplaySubject BehaviorSubject

  • 利用这三个子类,我们可以在产生新的订阅时,指定数据重播的方式。

scan 类似我们平时的 reduce,即对一组数据进行聚合

如何 Debug RxJS 应用?

tap 可以拿到上一步的值,但是又不影响后续的操作

tap是一个 RxJS 操作符(operators),类似 subscribe 的效果,但是只会拿传过的来值进行一次不影响后续 Stream 的 “纯操作”

常用来在 Stream 的中间态拿到当前的数据事件来修改外部的状态或做一些通知

画出 Marble 图

当我们遇到复杂的 RxJS 代码时,如果通过 Tap 无法轻易的看出程序是如何执行的,因为 Tap 只能拿到某个中间的执行结果,但是无法可视化中间的执行过程,那么我们就可以通过之前介绍的,

从 Stream 的起始态、中间态、错误态、完成态触发,通过 Marble 图体现 Stream 的变换,然后通过 Tap 验证变换的结果。

rxjs-spy

rxjs-devtools

ts
import DevToolsPlugin from 'rxjs-spy-devtools-plugin';
import { create } from 'rxjs-spy';

export const spy = create();
const devtoolsPlugin = new DevToolsPlugin(spy, {
  verbose: false,
});

spy.plug(devtoolsPlugin);

Contributors

作者:Long Mo
字数统计:2.9k 字
阅读时长:10 分钟
Long Mo
文章作者:Long Mo
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Longmo Docs