JavaScript 📜
April 10, 2024

Defer Observable в RxJs

Представьте, что нам нужно создать оператор, который принимает функцию и выполняет ее только при первом событии. Реализуем его, назовав его tapOnce:

function tapOnce<T>(fn: Function): OperatorFunction<T, T> {
  return function(source: Observable<any>) {
    let run = false;
    return source.pipe(
      tap(() => {
        if (!run) {
          fn();
          run = true;
        }
      })
    );
  };
}

Код простой: используется оператор tap для запуска переданной функции один раз на основе значения переменной run. Теперь воспользуемся созданным оператором:

const source = interval(5000).pipe(
  tapOnce(() => console.log('🦊')
));

source.subscribe(console.log);

Все работает хорошо. Мы видим эмодзи один раз, во время первого события. Теперь добавим еще одного подписчика:

const source = interval(5000).pipe(
  tapOnce(() => console.log('🦊')
));

source.subscribe(console.log);
source.subscribe(console.log);

Если мы посмотрим в консоль, мы увидим только один эмодзи. Проблема заключается в том, что оба подписчика используют одну и ту же лексическую область видимости и, следовательно, ссылается на ту же переменную run. Нам нужен способ отложить создание Observable до момента, когда кто-то подпишется на него. Defer приходит на помощь:

import { defer } from 'rxjs';

function tapOnce<T>(fn: Function): OperatorFunction<T, T> {
  return function(source: Observable<any>) {
    return defer(() => {
      let run = false;
      return source.pipe(
        tap(() => {
          if (!run) {
            fn();
            run = true;
          }
        })
      );
    });
  };
}

Observable defer принимает функцию, которая возвращает ObservableInput. Код внутри defer выполняется только при подписке, а не во время создания. Используя этот подход и благодаря замыканиям JS, каждый подписчик получает свою собственную лексическую область видимости.

Чтобы понять как работает этот оператор, создадим свою упрощенную реализацию:

function defer(observableFactory: () => ObservableInput<any>) {
  return new Observable(subscriber => {
    const source = observableFactory();
    return source.subscribe(subscriber);
  });
}

defer возвращает новый Observable, который при подписке вызывает функцию-фабрику и использует ее в качестве источника.

Рассмотрим еще несколько случаев использования, где мы можем воспользоваться Observable defer. Предположим, у нас есть выражение, которое мы хотим вычислить только при подписке. Например:

const randNum = of(Math.random());
 
randNum.subscribe(console.log);
randNum.subscribe(console.log);

В этом примере каждый подписчик получит одно и то же случайное значение. Мы можем решить эту проблему, используя defer, чтобы выражение оценивалось только тогда, когда кто-то подписался на Observable, и не заранее:

const randNum = defer(() => of(Math.random()));

randNum2.subscribe(console.log);
randNum2.subscribe(console.log);

// The same concept as using a function
const randNum = () => of(Math.random());
randNum2().subscribe(console.log);
randNum2().subscribe(console.log);

Еще один случай использования - когда мы хотим отложить выполнение промиса до момента подписки:

// This already executing regardless the numbers of handlers
const promise = new Promise(resolve => {
  resolve();
});

// Deferring the creation of the promise until someone subscribes
const promiseDefered = defer(() => {
  return new Promise(resolve => {
    resolve();
  });
});

promiseDefered.subscribe(console.log);

Промисы по своей природе выполняются всегда, даже если нет обработчиков результата. Мы можем использовать отложенное наблюдаемое, чтобы сделать выполнение обещания более похожим на rxjs потоки, то есть выполнение произойдет только при подписке.

Это вольный перевод материала