Создание своих операторов RxJs
Операторы являются одним из строительных блоков RxJS
. Библиотека поставляется с множеством операторов, которые можно использовать для решения практически любой задачи, с которой мы можем столкнуться, но иногда возникает необходимость в создании собственных.
Прежде, чем идти дальше стоит разобраться с теорией. RxJS оператор - это всего лишь Observable. Чтобы убедиться в этом рассмотрим пример:
import { interval } from 'rxjs'; import { map } from 'rxjs/operators'; interval(1000).pipe( map(num => num + 1) ).subscribe(...)
Функция interval
, которая возвращает Observable
. Подписавшись на него мы создаем pipe()
, внутри которого мутируем значение при помощи оператора map.
Но что такое pipe
?
class Observable { pipe(...operators): Observable<any> { return operators.reduce((source, next) => next(source), this); } }
Метод pipe
принимает массив операторов, выполняет их поочередно, каждый раз вызывая следующий оператор и передавая ему результат предыдущего в качестве источника. В нашем примере один оператор, который тоже возвращает Observable
:
map(interval): Observable
Из этого кода мы можем понять, что создание оператора ничуть не сложнее написания простой функции, которая принимает и возвращает Observable
:
function myOperator<T>(source: Observable<T>) { return source; }
Таким образом получилось создать свой оператор. Несмотря на отсутствие полезной логики его уже можно использовать в пайпах:
interval(1000).pipe( myOperator ).subscribe(value => console.log(value));
Прежде чем идти дальше, остановимся на секунду и поговорим о распространенном заблуждении. Глядя на пример, можно сказать что создана подписка на Interval
. Это неверно. RxJS
всегда подписан на последний оператор в пайпе. Таким образом, в этом примере мы подписаны на Observable
, который возвращает функция myOperator
. Продемонстрируем это поведение, для этого вернем из нашего оператора другой Observable
:
function myOperator<T>(source: Observable<T>) { return new Observable(subscriber => { subscriber.next(`🦄`); subscriber.complete(); }); }
Если пример запустить еще раз, только один единорог 🦄 вернется в консоль . Это потому, что мы получаем исходный Observable
(в нашем случае, который генерируется Interval
), но не делаем с ним ничего (например, не подписываемся на него).
Это доказывает, что мы подписаны на Observable
, который возвращается из myOperator
, а не на тот, который возвращается из интервала.
Это приводит к следующей теме - цепочке Observable
. К примеру, возьмем первый пример, интервал с map
:
import { interval } from 'rxjs'; import { map } from 'rxjs/operators'; interval(1000).pipe( map(num => num + 1) ).subscribe(...)
Если мы фактически подписываемся на Observable
, который возвращается оператором map()
, как получить доступ к Observable
, который возвращается от interval()
?
Ответ прост: это не нужно. Каждый оператор получает свой исходный Observable
, который предшествует ему, и это именно этот оператор (в большинстве случаев) подписывается на него.
Когда мы вызываем subscribe()
, то выполняем Observable
, созданный оператором map()
, который сам подписывается на Observable
, возвращаемый от interval()
. Каждый раз, когда interval
создает новое значение, это значение достигает функции подписки map
.
Создание своего оператора filterNil
Распространенным случаем является ситуация, когда у нас есть исходный Observable
, который может излучать значение null
или undefined
, и мы хотим игнорировать эти значения. Для этого создадим оператор, который выполнит эту задачу за нас, и который мы сможем применить к любому источнику:
function filterNil() { return function<T>(source: Observable<T>): Observable<T> { return new Observable(subscriber => { source.subscribe({ next(value) { if(value !== undefined && value !== null) { subscriber.next(value); } }, error(error) { subscriber.error(error); }, complete() { subscriber.complete(); } }) }); } }
После получения source
мы подписываемся на него. Когда source
возвращает значение, то запускается проверка, является ли оно undefined
или null
. Если это так, мы игнорируем его; в противном случае мы передаем значение следующему подписчику. Так же обрабатываются коллбеки ошибки и завершения. Если запустить этот пример, то значение генерируемое при инициализации Observable будет пропущено:
interval(1000).pipe( map(value => value === 0 ? undefined : value), filterNil() ).subscribe(value => console.log(value));
Но есть проблема. Пример созданный выше породил утечку памяти. Каждый Observable возвращает функцию unsubscribe()
, ответственную за выполнение всех необходимых операций по очистке. В нашем примере мы подписались на источник, но не отписались от него.
Это означает, что interval
будет продолжать работать даже после того, как мы вызвали метод unsubscribe
для результирующего Observable
.
Как же это исправить? Решение простое - достаточно вызвать метод unsubscribe()
для source
:
function filterNil() { return function<T>(source: Observable<T>): Observable<T> { return new Observable(subscriber => { const subscription = source.subscribe({ next(value) { if(value !== undefined && value !== null) { subscriber.next(value); } }, error(error) { subscriber.error(error); }, complete() { subscriber.complete(); } }); return () => subscription.unsubscribe(); <====== }); } }
Так же есть более короткий способ - можно вернуть результат вызова подписки к source:
function filterNil() { return function<T>(source: Observable<T>): Observable<T> { return new Observable(subscriber => { retrun source.subscribe({ next(value) { if(value !== undefined && value !== null) { subscriber.next(value); } }, ... }); }); } }
Теперь, когда вызывается unsubscribe()
для всей цепочки, это фактически вызывает unsubscribe()
для исходного Observable
.
Создание своих операторов на основе существующих
Создадим оператор filterNil
заново, используя, как вы, возможно, догадались, оператор filter
:
function filterNil() { return function<T>(source: Observable<T>) { return source.pipe(filter(value => value !== undefined && value !== null)); } }
Это круто, но и этот код можно упростить. Поскольку операторы пайпа возвращают функции, которые принимают Observable
, можно сделать так:
function filterNil() { return filter(value => value !== undefined && value !== null) }
Теперь, когда понятен принцип работы пайпов, а так же создания своих операторов можно подумать о работе. Создадим несколько полезных операторов, которые могут пригодиться каждый день
debug
Cоздадим оператор, который использует console API
для журналирования каждого уведомления с выделением цветом:
function debug(tag: string) { return tap({ next(value) { console.log(`%c[${tag}: Next]`, "background: #009688; color: #fff; padding: 3px; font-size: 9px;", value) }, error(error) { console.log(`%[${tag}: Error]`, "background: #E91E63; color: #fff; padding: 3px; font-size: 9px;", error) }, complete() { console.log(`%c[${tag}]: Complete`, "background: #00BCD4; color: #fff; padding: 3px; font-size: 9px;") } }) }
optionalDebounce
При разработке компонентов иногда мы создаем входной параметр, который позволяет пользователю передавать необязательное значение для задержки (debounce
) перед выполнением определенного действия. Если пользователь не передает это значение, мы можем пропустить применение оператора debounceTime
:
function optionalDebounce<T>(time?: number) { return function<T>(source: Observable<T>): Observable<T> { return time === undefined ? source : source.pipe(debounceTime(time)); }; }
filterKey
Этот оператор используется в тех случаях, когда нужно фильтровать определенные клавиши при прослушивании событий клавиатуры:
type KeyboardEventKeys = 'Escape' | 'Enter'; function filterKey(key: KeyboardEventKeys) { return filter((event: KeyboardEvent) => event.key === key); }
fromEvent(document, 'keyup') .pipe( filterKey('Escape') ).subscribe();
polling
Оператор который упрощает процесс пуллинга данных с сервера
function polling<T>(stream: Observable<T>, period: number, initialDelay = 0) { return timer(initialDelay, period).pipe(concatMapTo(stream)); }
polling(this.http.get('https://..'), 10000).subscribe()
Оператор пуллинга принимает stream, period - интервал опроса, и необязательное начальное время задержки, по умолчанию установленное на 0.
Это вольный перевод материала