JavaScript 📜
March 18

Как устроены Observable в JavaScript

Observable в JavaScript - это пример реализации паттерна Observer (наблюдатель).

Наблюдатель - это шаблон проектирования программного обеспечения, в котором объект, называемый субъектом, поддерживает список своих зависимостей, называемых наблюдателями, и автоматически уведомляет их об изменениях состояния, обычно вызывая один из их методов. (Википедия)

Observable

Мне нравится представлять Observable как функцию, которая "пушит" значения. Она может "запушить" значения как синхронно, так и асинхронно. Наблюдатель всегда может быть зарегистрирован под интересующим поставщиком.

Observer

Наблюдатель - это объект с тремя функциями:

  • next() => сработает когда появится новое значение
  • error() => сработает когда у вас возникнет новая ошибка
  • complete() => сработает когда поток завершится

Когда Observable (т.е. функция) "пушит" новое значение, ошибку или завершается, он вызывает соответствующую функцию наблюдателя

Push vs. Pull

Если вы знакомы с паттерном Iterator, то вы знаете, что в этом случае вы должны управлять процессом. Когда вам нужно новое значение, вы просто вызываете метод next, чтобы его получить.

var it = makeIterator(['yo', 'ya']);
console.log(it.next().value); // 'yo'

Observable работает по схожему принципу, но вызывать next() нет необходимости. Observable - это босс. Когда у него появляется новое значение, он сам его передаст вам. Ваша задача просто "слушать".

Пример из жизни

Метафора из реального мира — электронные рассылки.

Как появляются новости в вашем электронном письме? Вы подписываетесь на рассылку новостей и когда появляется новый выпуск, менеджер просто отправляет его вам на электронную почту.

Достаточно разговоров, давайте начнем практиковаться и понять, как работает Observable, создав свой собственный простой мини-фреймворк Rx.

Нам нужно заставить этот код работать:

let fakeAsyncData$ = new Observable(observer => {
  setTimeout(() => {
    observer.next('New data is coming');
    observer.complete();
  }, 2000);
});

fakeAsyncData$.subscribe({
  next(val) { console.log(val) } ,
  error(e) { console.log(e) } ,
  complete() { console.log('complete') } 
});

Для начала создадим класс и сохраним ссылку на функцию которая будет "пушить" значения.

class Observable {

  constructor(functionThatThrowsValues) {
    this._functionThatThrowsValues = functionThatThrowsValues;
  }
  
}

Реализуем функцию подписки:

 subscribe(observer) {     
   return this._functionThatThrowsValues(observer);     
 }

Значения будут получены только если метод subscribe() будет вызван. Только что был реализован простейший Observable.

Реализация map()

fakeAsyncData$.map(val => `New value ${val}`).subscribe({
   next(val) { console.log(val) } ,
   error(e) { console.log(e) } ,
   complete() { console.log(‘complete’) } 
});
  map(projectionFunction) { 
     return new Observable(observer => {
       return this.subscribe({
          next(val) { observer.next( projectionFunction(val)) },
          error(e) { observer.error(e) } ,
          complete() { observer.complete() } 
        });
     });
  }

Когда мы вызываем метод map, происходит следующее: map возвращает новый Observable, который подписывается на источник, в нашем случае на fakeAsyncData$. Когда источник "пушит" новое значение, оно сначала попадает в метод map, а затем, после применения функции к значению, Observable map "пушит" это значение дальше.

Реализация fromEvent

var button = 
document.getElementById(‘button’);let clicks$ = 
Observable.fromEvent(button, 'click')
  .map(e => `${e.pageX}px`);let unsubscribe = 
clicks$.subscribe({
  next(val) { console.log(val) } ,
  error(e) { console.log(e) } ,
  complete() { console.log('complete') } 
});
  static fromEvent(element, event) {
    return new Observable(observer => {
      const handler = (e) => observer.next(e);
      element.addEventListener(event, handler); 
      
      return () => {
        element.removeEventListener(event, handler);
      };
      
    });
  }


Метод fromEvent просто возвращает новый Observable, который будет "бросать" нам объект события, когда событие произойдет. Мы не хотим утечек памяти, поэтому возвращаем функцию, которая позволит нам отписаться, когда это будет необходимо.

setTimeout(() => unsubscribe(), 1000);

Реализация fromArray()

let array$ = Observable.fromArray([1,2,3]);array$
.subscribe({
 next(val) { console.log(val) } ,
 error(e) { console.log(e) } ,
 complete() { console.log(‘complete’) } 
});
static fromArray(array) {
   return new Observable(observer => {
     array.forEach(val => observer.next(val));
     observer.complete();
  });
}

mergeMap()

Предположим, что нам нужно сделать что-то такое:

let promise = val => {
  return new Promise(resolve => {
    setTimeout(() => resolve(val), 3000);
 })
};
let data$ = Observable.fromArray([1,2,3]).
map(val =>  Observable.fromPromise(promise(val)));

После выполнения этого кода мы получим Observable, потому что функция map() возвращает Observable. Нам нужен способ объединить этот Observable в поток. Вот почему это называется mergeMap; мы выполняем одновременно операцию вывода и слияния.

mergeMap(anotherFunctionThatThrowsValues) {
    return new Observable(observer => {
      return this.subscribe({
        next(val) {    
          anotherFunctionThatThrowsValues(val).subscribe({
            next(val) { observer.next(val) },
            error(e) { observer.error(e) } ,
            complete() { observer.complete() } 
          });
        },
        error(e) { observer.error(e) } ,
        complete() { observer.complete() } 
      });
    });
}

Теперь мы объединили Observable в поток.

let data$ = Observable.fromArray([1,2,3]).
            mergeMap(val => Observable.fromPromise(promise(val)));

Результат

class Observable {

  constructor(functionThatThrowsValues) {
    this._functionThatThrowsValues = functionThatThrowsValues;
  }

  subscribe(next, error, complete) {   
    if (typeof next === "function") {   
      return this._functionThatThrowsValues({
        next,
        error: error || () => {},
        complete: complete || () => {}
      });
    }
    else {
      return this._functionThatThrowsValues(next);
    }
  
  }
  

  map(projectionFunction) { 
      return new Observable(observer => {
        return this.subscribe({
           next(val) { observer.next(projectionFunction(val)) },
           error(e) { observer.error(e) } ,
           complete() { observer.complete() } 
        });
      });
  }
  
  mergeMap(anotherFunctionThatThrowsValues) {
    return new Observable(observer => {
      return this.subscribe({
        next(val) {    
          anotherFunctionThatThrowsValues(val).subscribe({
            next(val) {observer.next(val)},
            error(e) { observer.error(e) } ,
            complete() { observer.complete() } 
          });
        },
        error(e) { observer.error(e) } ,
        complete() { observer.complete() } 
      });
    });
  }
  
  static fromArray(array) {
    return new Observable(observer => {
      array.forEach(val => observer.next(val));
      observer.complete();
    });
  }

  static fromEvent(element, event) {
    return new Observable(observer => {
      const handler = (e) => observer.next(e);
      element.addEventListener(event, handler); 
      return () => {
        element.removeEventListener(event, handler);
      };
    });
  }
  
  static fromPromise(promise) {
    return new Observable(observer => {
      promise.then(val => {
        observer.next(val); observer.complete();
      })
      .catch(e => {
        observer.error(val); observer.complete();
      });
    })
  }
}

Это вольный перевод материала