JavaScript 📜
April 9, 2024

Создание своих операторов RxJs

Операторы являются одним из строительных блоков RxJS. Библиотека поставляется с множеством операторов, которые можно использовать для решения практически любой задачи, с которой мы можем столкнуться, но иногда возникает необходимость в создании собственных.

Прежде, чем идти дальше стоит разобраться с теорией. RxJS оператор - это всего лишь Observable. Чтобы убедиться в этом рассмотрим пример:

import { interval } from 'rxjs';
import { map } from 'rxjs/operators';

interval(1000).pipe(
  map(num => num + 1)
).subscribe(...)

Функция interval, которая возвращает Observable. Подписавшись на него мы создаем pipe(), внутри которого мутируем значение при помощи оператора map. Но что такое pipe?

class Observable {
  pipe(...operators): Observable<any> {
    return operators.reduce((source, next) => next(source), this);
  }
}

Метод pipe принимает массив операторов, выполняет их поочередно, каждый раз вызывая следующий оператор и передавая ему результат предыдущего в качестве источника. В нашем примере один оператор, который тоже возвращает Observable:

map(interval): Observable

Из этого кода мы можем понять, что создание оператора ничуть не сложнее написания простой функции, которая принимает и возвращает Observable:

function myOperator<T>(source: Observable<T>) {
  return source;
}

Таким образом получилось создать свой оператор. Несмотря на отсутствие полезной логики его уже можно использовать в пайпах:

interval(1000).pipe(
  myOperator
).subscribe(value => console.log(value));

Прежде чем идти дальше, остановимся на секунду и поговорим о распространенном заблуждении. Глядя на пример, можно сказать что создана подписка на Interval. Это неверно. RxJS всегда подписан на последний оператор в пайпе. Таким образом, в этом примере мы подписаны на Observable, который возвращает функция myOperator. Продемонстрируем это поведение, для этого вернем из нашего оператора другой Observable:

function myOperator<T>(source: Observable<T>) {
  return new Observable(subscriber => {
    subscriber.next(`🦄`);
    subscriber.complete();
  });
}

Если пример запустить еще раз, только один единорог 🦄 вернется в консоль . Это потому, что мы получаем исходный Observable (в нашем случае, который генерируется Interval), но не делаем с ним ничего (например, не подписываемся на него).

Это доказывает, что мы подписаны на Observable, который возвращается из myOperator, а не на тот, который возвращается из интервала.

Это приводит к следующей теме - цепочке Observable. К примеру, возьмем первый пример, интервал с map:

import { interval } from 'rxjs';
import { map } from 'rxjs/operators';

interval(1000).pipe(
  map(num => num + 1)
).subscribe(...)

Если мы фактически подписываемся на Observable, который возвращается оператором map(), как получить доступ к Observable, который возвращается от interval()?

Ответ прост: это не нужно. Каждый оператор получает свой исходный Observable, который предшествует ему, и это именно этот оператор (в большинстве случаев) подписывается на него.

Когда мы вызываем subscribe(), то выполняем Observable, созданный оператором map(), который сам подписывается на Observable, возвращаемый от interval(). Каждый раз, когда interval создает новое значение, это значение достигает функции подписки map.

Создание своего оператора filterNil

Распространенным случаем является ситуация, когда у нас есть исходный Observable, который может излучать значение null или undefined, и мы хотим игнорировать эти значения. Для этого создадим оператор, который выполнит эту задачу за нас, и который мы сможем применить к любому источнику:

function filterNil() {
  return function<T>(source: Observable<T>): Observable<T> {
    return new Observable(subscriber => {
      source.subscribe({
        next(value) {
          if(value !== undefined && value !== null) {
            subscriber.next(value);
          }
        },
        error(error) {
          subscriber.error(error);
        },
        complete() {
          subscriber.complete();
        }
      })
    });
  }
}

После получения source мы подписываемся на него. Когда source возвращает значение, то запускается проверка, является ли оно undefined или null. Если это так, мы игнорируем его; в противном случае мы передаем значение следующему подписчику. Так же обрабатываются коллбеки ошибки и завершения. Если запустить этот пример, то значение генерируемое при инициализации Observable будет пропущено:

interval(1000).pipe(
  map(value => value === 0 ? undefined : value),
  filterNil()
).subscribe(value => console.log(value));

Но есть проблема. Пример созданный выше породил утечку памяти. Каждый Observable возвращает функцию unsubscribe(), ответственную за выполнение всех необходимых операций по очистке. В нашем примере мы подписались на источник, но не отписались от него.

Это означает, что interval будет продолжать работать даже после того, как мы вызвали метод unsubscribe для результирующего Observable.

Как же это исправить? Решение простое - достаточно вызвать метод unsubscribe() для source:

function filterNil() {
  return function<T>(source: Observable<T>): Observable<T> {
    return new Observable(subscriber => {
      const subscription = source.subscribe({
        next(value) {
          if(value !== undefined && value !== null) {
            subscriber.next(value);
          }
        },
        error(error) {
          subscriber.error(error);
        },
        complete() {
          subscriber.complete();
        }
      });

      return () => subscription.unsubscribe(); <======
    });
  }
}

Так же есть более короткий способ - можно вернуть результат вызова подписки к source:

function filterNil() {
  return function<T>(source: Observable<T>): Observable<T> {
    return new Observable(subscriber => {
      retrun source.subscribe({
        next(value) {
          if(value !== undefined && value !== null) {
            subscriber.next(value);
          }
        },
        ...
      });
    });
  }
}

Теперь, когда вызывается unsubscribe() для всей цепочки, это фактически вызывает unsubscribe() для исходного Observable.

Создание своих операторов на основе существующих

Создадим оператор filterNil заново, используя, как вы, возможно, догадались, оператор filter:

function filterNil() {
  return function<T>(source: Observable<T>) {
    return source.pipe(filter(value => value !== undefined && value !== null));
  }
}

Это круто, но и этот код можно упростить. Поскольку операторы пайпа возвращают функции, которые принимают Observable, можно сделать так:

function filterNil() {
  return filter(value => value !== undefined && value !== null)
}

Теперь, когда понятен принцип работы пайпов, а так же создания своих операторов можно подумать о работе. Создадим несколько полезных операторов, которые могут пригодиться каждый день

debug

Cоздадим оператор, который использует console API для журналирования каждого уведомления с выделением цветом:

function debug(tag: string) {
  return tap({
    next(value) {
      console.log(`%c[${tag}: Next]`, "background: #009688; color: #fff; padding: 3px; font-size: 9px;", value)
    },
    error(error) {
      console.log(`%[${tag}: Error]`, "background: #E91E63; color: #fff; padding: 3px; font-size: 9px;", error)
    },
    complete() {
      console.log(`%c[${tag}]: Complete`, "background: #00BCD4; color: #fff; padding: 3px; font-size: 9px;")
    }
  })
}

optionalDebounce

При разработке компонентов иногда мы создаем входной параметр, который позволяет пользователю передавать необязательное значение для задержки (debounce) перед выполнением определенного действия. Если пользователь не передает это значение, мы можем пропустить применение оператора debounceTime:

function optionalDebounce<T>(time?: number) {
  return function<T>(source: Observable<T>): Observable<T> {
    return time === undefined ? source : source.pipe(debounceTime(time));
  };
}

filterKey

Этот оператор используется в тех случаях, когда нужно фильтровать определенные клавиши при прослушивании событий клавиатуры:

type KeyboardEventKeys = 'Escape' | 'Enter';

function filterKey(key: KeyboardEventKeys) {
  return filter((event: KeyboardEvent) => event.key === key);
}
fromEvent(document, 'keyup')
  .pipe(
    filterKey('Escape')
  ).subscribe();

polling

Оператор который упрощает процесс пуллинга данных с сервера

function polling<T>(stream: Observable<T>, period: number, initialDelay = 0) {
  return timer(initialDelay, period).pipe(concatMapTo(stream));
}
polling(this.http.get('https://..'), 10000).subscribe()

Оператор пуллинга принимает stream, period - интервал опроса, и необязательное начальное время задержки, по умолчанию установленное на 0.

Это вольный перевод материала