Как устроены Observable в JavaScript
Observable в JavaScript - это пример реализации паттерна Observer (наблюдатель).
Наблюдатель - это шаблон проектирования программного обеспечения, в котором объект, называемый субъектом, поддерживает список своих зависимостей, называемых наблюдателями, и автоматически уведомляет их об изменениях состояния, обычно вызывая один из их методов. (Википедия)
Observable
Мне нравится представлять Observable как функцию, которая "пушит" значения. Она может "запушить" значения как синхронно, так и асинхронно. Наблюдатель всегда может быть зарегистрирован под интересующим поставщиком.
Observer
Наблюдатель - это объект с тремя функциями:
next()
=> сработает когда появится новое значениеerror()
=> сработает когда у вас возникнет новая ошибкаcomplete()
=> сработает когда поток завершится
Когда Observable
(т.е. функция) "пушит" новое значение, ошибку или завершается, он вызывает соответствующую функцию наблюдателя
Push vs. Pull
Если вы знакомы с паттерном Iterator
, то вы знаете, что в этом случае вы должны управлять процессом. Когда вам нужно новое значение, вы просто вызываете метод next
, чтобы его получить.
var it = makeIterator(['yo', 'ya']); console.log(it.next().value); // 'yo'
Observable
работает по схожему принципу, но вызывать next()
нет необходимости. Observable
- это босс. Когда у него появляется новое значение, он сам его передаст вам. Ваша задача просто "слушать".
Пример из жизни
Метафора из реального мира — электронные рассылки.
Как появляются новости в вашем электронном письме? Вы подписываетесь на рассылку новостей и когда появляется новый выпуск, менеджер просто отправляет его вам на электронную почту.
Достаточно разговоров, давайте начнем практиковаться и понять, как работает Observable
, создав свой собственный простой мини-фреймворк Rx
.
Нам нужно заставить этот код работать:
let fakeAsyncData$ = new Observable(observer => { setTimeout(() => { observer.next('New data is coming'); observer.complete(); }, 2000); }); fakeAsyncData$.subscribe({ next(val) { console.log(val) } , error(e) { console.log(e) } , complete() { console.log('complete') } });
Для начала создадим класс и сохраним ссылку на функцию которая будет "пушить" значения.
class Observable { constructor(functionThatThrowsValues) { this._functionThatThrowsValues = functionThatThrowsValues; } }
subscribe(observer) { return this._functionThatThrowsValues(observer); }
Значения будут получены только если метод subscribe()
будет вызван. Только что был реализован простейший Observable.
fakeAsyncData$.map(val => `New value ${val}`).subscribe({ next(val) { console.log(val) } , error(e) { console.log(e) } , complete() { console.log(‘complete’) } });
map(projectionFunction) { return new Observable(observer => { return this.subscribe({ next(val) { observer.next( projectionFunction(val)) }, error(e) { observer.error(e) } , complete() { observer.complete() } }); }); }
Когда мы вызываем метод map
, происходит следующее: map
возвращает новый Observable
, который подписывается на источник, в нашем случае на fakeAsyncData$
. Когда источник "пушит" новое значение, оно сначала попадает в метод map
, а затем, после применения функции к значению, Observable map
"пушит" это значение дальше.
var button = document.getElementById(‘button’);let clicks$ = Observable.fromEvent(button, 'click') .map(e => `${e.pageX}px`);let unsubscribe = clicks$.subscribe({ next(val) { console.log(val) } , error(e) { console.log(e) } , complete() { console.log('complete') } });
static fromEvent(element, event) { return new Observable(observer => { const handler = (e) => observer.next(e); element.addEventListener(event, handler); return () => { element.removeEventListener(event, handler); }; }); }
Метод fromEvent
просто возвращает новый Observable
, который будет "бросать" нам объект события, когда событие произойдет. Мы не хотим утечек памяти, поэтому возвращаем функцию, которая позволит нам отписаться, когда это будет необходимо.
setTimeout(() => unsubscribe(), 1000);
let array$ = Observable.fromArray([1,2,3]);array$ .subscribe({ next(val) { console.log(val) } , error(e) { console.log(e) } , complete() { console.log(‘complete’) } });
static fromArray(array) { return new Observable(observer => { array.forEach(val => observer.next(val)); observer.complete(); }); }
Предположим, что нам нужно сделать что-то такое:
let promise = val => { return new Promise(resolve => { setTimeout(() => resolve(val), 3000); }) }; let data$ = Observable.fromArray([1,2,3]). map(val => Observable.fromPromise(promise(val)));
После выполнения этого кода мы получим Observable
, потому что функция map()
возвращает Observable
. Нам нужен способ объединить этот Observable
в поток. Вот почему это называется mergeMap
; мы выполняем одновременно операцию вывода и слияния.
mergeMap(anotherFunctionThatThrowsValues) { return new Observable(observer => { return this.subscribe({ next(val) { anotherFunctionThatThrowsValues(val).subscribe({ next(val) { observer.next(val) }, error(e) { observer.error(e) } , complete() { observer.complete() } }); }, error(e) { observer.error(e) } , complete() { observer.complete() } }); }); }
Теперь мы объединили Observable
в поток.
let data$ = Observable.fromArray([1,2,3]). mergeMap(val => Observable.fromPromise(promise(val)));
Результат
class Observable { constructor(functionThatThrowsValues) { this._functionThatThrowsValues = functionThatThrowsValues; } subscribe(next, error, complete) { if (typeof next === "function") { return this._functionThatThrowsValues({ next, error: error || () => {}, complete: complete || () => {} }); } else { return this._functionThatThrowsValues(next); } } map(projectionFunction) { return new Observable(observer => { return this.subscribe({ next(val) { observer.next(projectionFunction(val)) }, error(e) { observer.error(e) } , complete() { observer.complete() } }); }); } mergeMap(anotherFunctionThatThrowsValues) { return new Observable(observer => { return this.subscribe({ next(val) { anotherFunctionThatThrowsValues(val).subscribe({ next(val) {observer.next(val)}, error(e) { observer.error(e) } , complete() { observer.complete() } }); }, error(e) { observer.error(e) } , complete() { observer.complete() } }); }); } static fromArray(array) { return new Observable(observer => { array.forEach(val => observer.next(val)); observer.complete(); }); } static fromEvent(element, event) { return new Observable(observer => { const handler = (e) => observer.next(e); element.addEventListener(event, handler); return () => { element.removeEventListener(event, handler); }; }); } static fromPromise(promise) { return new Observable(observer => { promise.then(val => { observer.next(val); observer.complete(); }) .catch(e => { observer.error(val); observer.complete(); }); }) } }
Это вольный перевод материала