RxJSInAction / rxjs-in-action

Code sample repository
132 stars 59 forks source link

http://download.finance.yahoo.com is shut down, examples in chapter 5 don't work #13

Open invegat opened 6 years ago

invegat commented 6 years ago

https://forums.yahoo.net/t5/Yahoo-Finance-help/http-download-finance-yahoo-com-d-quotes-csv-s-GOOG-amp-f/td-p/387096

https://www.washingtonpost.com/news/the-switch/wp/2017/06/13/its-official-verizon-finally-buys-yahoo/?utm_term=.2c6ad3e209a0 download.finance.yahoo.com is also used in Chapter 8

luijar commented 6 years ago

Thanks for letting us know. What a drag . And they seem to have pissed a lot of people as well, I can't even imagine how many integrations/apps broke as a result. That service had a very simple CSV interface.
@paulpdaniels Now we'll need to find something equivalent so that the examples don't deviate much from what's in the book or perhaps just mock our own.

invegat commented 6 years ago

To get the symbol here csv doesn't work, it only shows prices. This works https://www.alphavantage.co/query?function=TIME_SERIES_INTRADAY&symbol=MSFT&interval=15min&outputsize=full&apikey=demo

paulpdaniels commented 6 years ago

Oh boy, yeah @luijar I guess I must have jinxed this back when we were writing this chapter. Mocking might be the best option. @invegat I'll take a look at the API you provided, if the shape is still relatively close we could use it directly. If not I can write something to do some shape mangling in the server side before delivering it to the client. Then to prevent future issues, maybe I could also add a fallback to return random data perhaps?

calebcwells commented 6 years ago

Ran into the same issue. Found that this Yahoo API call will work, https://query1.finance.yahoo.com/v7/finance/quote?lang=en-US&region=US&corsDomain=finance.yahoo.com&symbols=FB&fields=regularMarketPrice but it returns JSON not csv.

peerreynders commented 6 years ago

Workaround for Listing 5.4 & 5.5 The request quote stream (p.142)

/**
 *  RxJS in Action
 *  Listing 5.4
 *  Note: make sure you have turned on CORS sharing in you browser so that you can make
 *  cross-site requests
 *  @author Paul Daniels
 *  @author Luis Atencio
 */

const ajax = url => new Promise((resolve, reject) => {
  let req = new XMLHttpRequest();
  req.open('GET', url);
  req.responseType = 'json';
  req.onload = function() {
    if(req.status == 200) {
      resolve(req.response);
    }
    else {
      reject(new Error(req.statusText));
    }
  };
  req.onerror = function () {
    reject(new Error('IO Error'));
  };
  req.send();
});

/* In proxySettings.js change (don't forget to restart the Gulp script):
const yahooProxyOptions = {
  target: 'https://query1.finance.yahoo.com',
  changeOrigin: true,
  pathRewrite: {
    '^/external/yahoo': ''
  }
};
*/

// Proxying around CORS -> 'https://query1.finance.yahoo.com
const makeQuotesUrl = (symbols, fields) =>
  `/external/yahoo/v7/finance/quote\
?lang=en-US\
&region=US\
&corsDomain=finance.yahoo.com\
&symbols=${symbols.join(',')}\
&fields=${fields.join(',')}`;

// For the array of symbols fetch the specified fields:
// e.g. ['currency','ask','regularMarketOpen','regularMarketPrice'] etc.
const requestQuote$ = (symbols, fields) =>
  Rx.Observable
    .fromPromise(ajax(makeQuotesUrl(symbols,fields)))
    .pluck('quoteResponse','result');

// Periodic quote fetch stream for a single symbol
const twoSecond$ = Rx.Observable.interval(2000);
const extract = ([{symbol, currency, ask: price}]) => [symbol, currency, price]; // 3-tuple
const fetchDataInterval$ = symbol =>
  twoSecond$
    .switchMap(() => requestQuote$([symbol],['currency','ask']))
    .map(extract);

const logResult = ([symbol, currency, price]) =>
  console.log(`${symbol}, ${currency}, ${price.toFixed(2)}`);

fetchDataInterval$('FB')
  .subscribe(logResult);

Workaround for Listing 5.6 Updating multiple stock symbols (p. 143)

/**
 *  RxJS in Action
 *  Listing 5.6
 *  Note: make sure you have turned on CORS sharing in you browser so that you can make
 *  cross-site requests
 *  @author Paul Daniels
 *  @author Luis Atencio
 */
console.log('Note: Please turn on CORS in your browser');
const Money = function (currency, val) {
  return {
    value: function () {
      return val;
    },
    currency: function () {
      return currency;
    },
    toString: function () {
      return `${currency} ${val.toFixed(2)}`;
    }
  };
};
const makeMoney = (currency, val) => new Money(currency, val);

// --- Render logic
const updateRow = (rowElem, currency, price) => {
  let [ , priceElem] = rowElem.childNodes;
  priceElem.innerHTML = makeMoney(currency, price).toString();
};

const table = document.querySelector('#stocks-table');

const addRow = (id, symbol,  currency, price) => {
  let symbolElem = document.createElement('td');
  let priceElem = document.createElement('td');
  let row = document.createElement('tr');

  row.setAttribute('id', id);
  row.appendChild(symbolElem);
  row.appendChild(priceElem);

  updateRow(row, currency, price);
  symbolElem.innerHTML = symbol;

  table.appendChild(row);
};

const render = ([symbol, currency, price]) => {
  let id = 'row-' + symbol.toLowerCase();
  let row = document.querySelector(`#${id}`);
  if(!row) {
    addRow(id, symbol, currency, price);
  } else {
    updateRow(row, currency, price);
  }
};

// -- Fetch logic
const ajax = url => new Promise((resolve, reject) => {
  let req = new XMLHttpRequest();
  req.open('GET', url);
  req.responseType = 'json';
  req.onload = function() {
    if(req.status == 200) {
      resolve(req.response);
    } else {
      reject(new Error(req.statusText));
    }
  };
  req.onerror = function () {
    reject(new Error('IO Error'));
  };
  req.send();
});

const makeQuotesUrl = (symbols, fields) =>
  `/external/yahoo/v7/finance/quote\
?lang=en-US\
&region=US\
&corsDomain=finance.yahoo.com\
&symbols=${symbols.join(',')}\
&fields=${fields.join(',')}`;

// For the array of symbols fetch the specified fields:
// e.g. ['currency','ask','regularMarketOpen','regularMarketPrice'] etc.
const requestQuote$ = (symbols, fields) =>
  Rx.Observable
    .fromPromise(ajax(makeQuotesUrl(symbols, fields)))
    .pluck('quoteResponse','result');

// Periodic quote fetch stream for a single symbol
const twoSecond$ = Rx.Observable.interval(2000);
const extract = ([{symbol, currency, ask: price}]) => [symbol, currency, price]; // 3-tuple
const priceNotChanged = ([,, previous], [,, next]) => previous === next;
const fetchDataInterval$ = symbol =>
  twoSecond$
    .switchMap(() => requestQuote$([symbol], ['currency','ask']))
    .map(extract)
    .distinctUntilChanged(priceNotChanged);

// Use the symbol stream to launch a separate periodic fetch stream for each symbol
// and merge all the events coming from all those streams
const symbols$ = Rx.Observable.from(['AAPL', 'CTXS', 'FB']);
const ticks$ = symbols$.mergeMap(fetchDataInterval$);

ticks$.subscribe(
  render,
  error => console.error(error.message)
);

Alternate workaround for Listing 5.6 Updating multiple stock symbols (p. 143) - fetching multiple stock prices per request

// Listing 5.6 Version: fetch multiple quotes at once

console.log('Note: Please turn on CORS in your browser');
const Money = function (currency, val) {
  return {
    value: function () {
      return val;
    },
    currency: function () {
      return currency;
    },
    toString: function () {
      return `${currency} ${val.toFixed(2)}`;
    }
  };
};
const makeMoney = (currency, val) => new Money(currency, val);

// --- Render logic
const updateRow = (rowElem, currency, price) => {
  let [ , priceElem] = rowElem.childNodes;
  priceElem.innerHTML = makeMoney(currency, price).toString();
};

const table = document.querySelector('#stocks-table');

const addRow = (id, symbol,  currency, price) => {
  let symbolElem = document.createElement('td');
  let priceElem = document.createElement('td');
  let row = document.createElement('tr');

  row.setAttribute('id', id);
  row.appendChild(symbolElem);
  row.appendChild(priceElem);

  updateRow(row, currency, price);
  symbolElem.innerHTML = symbol;

  table.appendChild(row);
};

const render = ([symbol, currency, price]) => {
  let id = 'row-' + symbol.toLowerCase();
  let row = document.querySelector(`#${id}`);
  if(!row) {
    addRow(id, symbol, currency, price);
  } else {
    updateRow(row, currency, price);
  }
};

// --- Fetch logic
const makeQuotesUrl = (symbols, fields) =>
  `/external/yahoo/v7/finance/quote\
?lang=en-US\
&region=US\
&corsDomain=finance.yahoo.com\
&symbols=${symbols.join(',')}\
&fields=${fields.join(',')}`;

// For the array of symbols fetch the specified fields:
// e.g. ['currency','ask','regularMarketOpen','regularMarketPrice'] etc.
const requestQuote$ = (symbols, fields) => {
  const config = {
    url: makeQuotesUrl(symbols, fields),
    method: 'GET',
    responseType: 'json'
  };
  const fetchResults$ =
    Rx.Observable
      .ajax(config)
      .pluck('response','quoteResponse','result');

  return fetchResults$;
};

// Periodic fetch stream - multiple symbols per fetch with all the fields needed
// "groupBy" creates a separate stream for each symbol
// so each distinctUntilChanged will only ever "see" quotes with the same symbol
// all the quotes are then merged again to a single stream with "mergeAll"
const twoSecond$ = Rx.Observable.interval(2000);
const extract = ({symbol, currency, ask: price}) => [symbol, currency, price]; // 3-tuple
const extractAll = results => results.map(extract);
const priceNotChanged = ([,, previous], [,, next]) => previous === next;
const makeSymbolQuoteChanged = symbolQuote$ => symbolQuote$.distinctUntilChanged(priceNotChanged);
const fetchDataInterval$ = symbols =>
  twoSecond$
    .switchMap(() => requestQuote$(symbols, ['currency','ask']))
    .mergeMap(extractAll)
    .groupBy(([symbol]) => symbol)
    .map(makeSymbolQuoteChanged)
    .mergeAll();

// Launch a single stream to periodically fetch quotes for all the specified symbols
const symbols = ['AAPL', 'CTXS', 'FB'];
fetchDataInterval$(symbols)
  .subscribe(
    render,
    error => console.error(error.message)
  );

Workaround for Listing 6.8 Using forkJoin to fetch multiple stock symbols simultaneously (p. 169)

/**
 *  RxJS in Action
 *  Listing 6.8
 *  @author Paul Daniels
 *  @author Luis Atencio
 */
const Money = function (currency, val) {
  return {
    value() {
      return val;
    },
    currency() {
      return currency;
    },
    toString() {
      return `${currency} ${val}`;
    }
  };
};

const makeMoney = (currency,val) => new Money(currency, val);

// --- Fetch logic
const makeQuotesUrl = (symbols, fields) =>
  `/external/yahoo/v7/finance/quote\
?lang=en-US\
&region=US\
&corsDomain=finance.yahoo.com\
&symbols=${symbols.join(',')}\
&fields=${fields.join(',')}`;

const requestQuote$ = symbol =>
  Rx.Observable.ajax({
    url: makeQuotesUrl([symbol],['currency','ask']),
    method: 'GET',
    responseType: "json"
  })
    .pluck('response','quoteResponse','result');

// Create an array of fetch observables - one for each symbol
// then process the resulting array of results
const symbols = ['AAPL', 'CTXS', 'FB'];
const extractQuote = ([{symbol, currency, ask: price}]) => [symbol, currency, price]; // 3-tuple
const fetchQuote$ = symbol => requestQuote$(symbol).map(extractQuote);
const sumQuote = ([,sum], [_symbol, currency, price]) => [currency, sum + price];
const sumAllQuotes = allQuotes => allQuotes.reduce(sumQuote, ['',0.0]);
const localeMoney = (currency, val) => makeMoney(currency, val.toLocaleString());
const logTotal = ([currency, total]) => {
  console.log(`Total Value: ${localeMoney(currency, total)}`);
};

Rx.Observable.forkJoin(
  symbols.map(fetchQuote$)
)
  .map(sumAllQuotes)
  .subscribe(logTotal);

Workaround for Listing 8.4 Stock ticker as event emitter (p. 231)

/**
 *  RxJS in Action
 *  Listing 8.4
 *  @author Paul Daniels
 *  @author Luis Atencio
 */

const Money = function (currency, val) {
  return {
    currency: function () {
      return currency;
    },
    value: function () {
      return val;
    },
    toString: function () {
      return `${currency} ${val.toFixed(2)}`;
    }
  };
};

const makeMoney = (currency, val) => new Money(currency, val);

// --- Render logic
const makeRender = (symbolSelect, priceSelect) => {
  const symbolElem = document.querySelector(symbolSelect);
  const priceElem = document.querySelector(priceSelect);

  return ([symbol, currency, price]) => {
    priceElem.innerHTML = makeMoney(currency,price).toString();
    symbolElem.innerHTML = symbol;
  };
};

// --- Fetch logic

// --- module StockTicker
const makeQuotesUrl = (symbols, fields) =>
  `/external/yahoo/v7/finance/quote\
?lang=en-US\
&region=US\
&corsDomain=finance.yahoo.com\
&symbols=${symbols.join(',')}\
&fields=${fields.join(',')}`;

// fetch quote results for "symbols"
// every "interval" until event on "end$"
const makeQuotesSource = (interval, end$, symbols) => {
  const config = {
    url: makeQuotesUrl(symbols, ['currency','ask']),
    method: 'GET',
    responseType: 'json'
  };
  const makeFetchResults = () =>
    Rx.Observable
      .ajax(config)
      .pluck('response','quoteResponse','result');
  const ticker$ =
    Rx.Observable
      .interval(interval)
      .takeUntil(end$)
      .switchMap(makeFetchResults);

  return ticker$;
};

// StockTicker private support
const _private = Symbol('private');
const _reset = (t) => {
  const p = t[_private];
  p.source$ = null;
  p.stopSource = null;
};
const _makeStop = t =>  {
  const setStopSource = observer  => {
    t[_private].stopSource = () => {
      observer.next(0);
      observer.complete();
      _reset(t);
    };
  };

  return Rx.Observable.create(setStopSource);
};
const _stop = t => {
  const { stopSource } = t[_private];
  if(stopSource){
    stopSource();
  }
};
const _init = (t, symbols) => {
  const tick = ([symbol, currency, price]) => {
    t.emit('tick', symbol, currency, price);
  };
  const onError = (error) => {
    console.log(error);
  };
  const onComplete = () => {
    _reset(t);
    console.log("Complete!");
  };

  return {
    source$: null,
    stopSource: null,
    symbols,
    tick,
    onError,
    onComplete
  };
};

// StockTicker constructor only supports a single symbol
// so there should only be a single quote
const _extractQuote = ([{symbol, currency, ask: price}]) =>
  ([symbol, currency, price]);

// Node.js style EventEmitter
// https://nodejs.org/api/events.html#events_class_eventemitter
//
class StockTicker extends EventEmitter {

  constructor(symbol) {
    super();
    this[_private] = _init(this, [symbol]);
  }

  start() {
    _stop(this);

    const INTERVAL = 2000;
    const TIMEOUT = 10100;
    const p = this[_private];
    const { symbols, tick, onError, onComplete } = p;
    const end$ =
      Rx.Observable
        .timer(TIMEOUT)
        .merge(_makeStop(this));

    p.source$ =
      makeQuotesSource(INTERVAL, end$, symbols)
        .map(_extractQuote)
        .subscribe(tick, onError, onComplete);
  }
  stop() {
    _stop(this);
  }
}
// --- end module StockTicker

// --- Adapt to an EventEmitter
const ticker = new StockTicker('FB');
ticker.start();

const selector = (symbol, currency, price) => ([symbol, currency, price]);
const makeTickerError = _err => Rx.Observable.throw(new Error('Stock ticker exception'));
const tick$ =
  Rx.Observable
    .fromEvent(ticker, 'tick', selector)
    .catch(makeTickerError);

// --- Subscriptions
const stopTicker = error => {
  ticker.stop();
};
const sub1 = tick$.subscribe(
  makeRender('#company','#price'),
  stopTicker
);
const sub2 = tick$.subscribe(
  makeRender('#company2','#price2'),
  stopTicker
);

/*
setTimeout(() => {
  ticker.stop();
}, 5000);
*/

Workaround for Listing 8.5 Complete stock ticker widget with change tracking (p. 235)

/**
 *  RxJS in Action
 *  Listing 8.5
 *  @author Paul Daniels
 *  @author Luis Atencio
 */

const Money = function (currency, val) {
  return {
    currency: function () {
      return currency;
    },
    value: function () {
      return val;
    },
    toString: function () {
      return `${currency} ${val.toFixed(2)}`;
    }
  };
};

const makeMoney = (currency, val) => new Money(currency, val);

// --- Rendering logic
const UP = {className: 'green-text', icon: 'up-green'};
const DOWN = {className: 'red-text', icon: 'down-red'};

const priceChangeHtml = change => {
  const {className, icon} = change < 0 ? DOWN : UP;
  const content = Math.abs(change).toFixed(2);
  return `<span class="${className}">\
<span class="${icon}">(${content})</span>\
</span>`;
};

const updatePriceChange = (rowElem, change) => {
  let [,, elem] = rowElem.childNodes;
  elem.innerHTML = priceChangeHtml(change);
};

const updatePrice = (rowElem, currency, price) => {
  let [, elem] = rowElem.childNodes;
  elem.innerHTML = makeMoney(currency, price).toString();
};

const table = document.querySelector('#stocks-table');

const addRow = (id, symbol, currency, price) => {
  let symbolElem = document.createElement('td');
  let priceElem = document.createElement('td');
  let changeElem = document.createElement('td');

  const row = document.createElement('tr');
  row.setAttribute('id', id);
  row.appendChild(symbolElem);
  row.appendChild(priceElem);
  row.appendChild(changeElem);

  updatePrice(row, currency, price);
  updatePriceChange(row, 0.0);
  symbolElem.innerHTML = symbol;

  table.appendChild(row);
};

const makeSymbolId = symbol => `row-${symbol.toLowerCase()}`;
const queryRowById = id => document.querySelector(`#${id}`);

const renderPrice = ([symbol, currency, price]) => {
  const id = makeSymbolId(symbol);
  const row = queryRowById(id);

  if(!row) {
    addRow(id, symbol, currency, price);
  } else {
    updatePrice(row, currency, price);
  }
};

const renderPriceChange = ([symbol, change]) => {
  const row = queryRowById(makeSymbolId(symbol));
  if(row) {
    updatePriceChange(row, change);
  }
};

// --- Fetch logic
const makeQuotesUrl = (symbols, fields) =>
  `/external/yahoo/v7/finance/quote\
?lang=en-US\
&region=US\
&corsDomain=finance.yahoo.com\
&symbols=${symbols.join(',')}\
&fields=${fields.join(',')}`;

const makeStockError = _err =>
  Rx.Observable.throw(
    new Error('Stock data not available. Try again later!')
  );

// For the array of symbols fetch the specified fields:
// e.g. ['currency','ask','regularMarketOpen','regularMarketPrice'] etc.
const requestQuote$ = (symbols, fields) => {
  const config = {
    url: makeQuotesUrl(symbols, fields),
    method: 'GET',
    responseType: 'json'
  };
  const fetchResults$ =
    Rx.Observable
      .ajax(config)
      .retry(3)
      .catch(makeStockError)
      .pluck('response','quoteResponse','result');

  return fetchResults$;
};

// Periodic quote fetch stream for a single symbol
const twoSecond$ = Rx.Observable.interval(2000);
const extractPrice = ([{symbol, currency, ask: price}]) => [symbol, currency, price]; // 3-tuple
const priceNotChanged = ([,,previous], [,,next]) => previous.toFixed(2) === next.toFixed(2);
const fetchDataInterval$ = symbol =>
  twoSecond$
    .switchMap(() => requestQuote$([symbol],['currency','ask']))
    .map(extractPrice)
    .distinctUntilChanged(priceNotChanged);

// Use the symbol stream to launch a separate periodic fetch stream for each symbol
// and merge all the events coming from all those streams
const symbol$ = Rx.Observable.from(['AAPL', 'CTXS', 'FB']);
const ticks$ = symbol$.mergeMap(fetchDataInterval$).share();

ticks$.subscribe(
  renderPrice,
  error => console.log(error.message)
);

// A quote price change triggers a separate fetch for the opening price
// to calculate and display the "price change" since the market opened
const combineAsPriceChange = ([symbol, _currency ,price], [{regularMarketOpen: open}]) =>
      [symbol, (price - open)]; // 2-tuple

const makePriceChange = (quote) => {
  const [symbol] = quote;
  return Rx.Observable.of(quote)
    .combineLatest(
      requestQuote$([symbol],['regularMarketOpen']),
      combineAsPriceChange
    );
};

ticks$
  .mergeMap(makePriceChange)
  .subscribe(
    renderPriceChange,
    error => console.log(`Fetch error occurred: ${error}`)
  );

Listing 8.5 is contrived as a secondary request is issued to obtain the regularMarketOpen price separately, the result of which is then used to update the "Change" portion of the page. With the current service:

http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#instance-method-groupBy http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#instance-method-mergeAll

// Listing 8.5 Version: No need for "share()" on ticks$

const Money = function (currency, val) {
  return {
    currency: function () {
      return currency;
    },
    value: function () {
      return val;
    },
    toString: function () {
      return `${currency} ${val.toFixed(2)}`;
    }
  };
};

const makeMoney = (currency, val) => new Money(currency, val);

// --- Render logic
const UP = {className: 'green-text', icon: 'up-green'};
const DOWN = {className: 'red-text', icon: 'down-red'};

const priceChangeHtml = change => {
  const {className, icon} = change < 0 ? DOWN : UP;
  const content = Math.abs(change).toFixed(2);

  return `<span class="${className}">\
<span class="${icon}">(${content})</span>\
</span>`;
};

const updateRow = (rowElem, {currency, price, change}) => {
  let [, priceElem, changeElem] = rowElem.childNodes;
  priceElem.innerHTML = makeMoney(currency, price).toString();
  changeElem.innerHTML = priceChangeHtml(change);
};

const table = document.querySelector('#stocks-table');

const addRow = (id, quote) => {
  let symbolElem = document.createElement('td');
  let priceElem = document.createElement('td');
  let changeElem = document.createElement('td');
  let row = document.createElement('tr');

  row.setAttribute('id', id);
  row.appendChild(symbolElem);
  row.appendChild(priceElem);
  row.appendChild(changeElem);

  updateRow(row, quote);
  symbolElem.innerHTML = quote.symbol;

  table.appendChild(row);
};

const makeSymbolId = symbol => `row-${symbol.toLowerCase()}`;
const queryRowById = id => document.querySelector(`#${id}`);

const render = (quote) => {
  const id = makeSymbolId(quote.symbol);
  const row = queryRowById(id);
  if(!row) {
    addRow(id, quote);
  } else {
    updateRow(row, quote);
  }
};

// --- Fetching logic
const makeQuotesUrl = (symbols, fields) =>
  `/external/yahoo/v7/finance/quote\
?lang=en-US\
&region=US\
&corsDomain=finance.yahoo.com\
&symbols=${symbols.join(',')}\
&fields=${fields.join(',')}`;

const makeQuoteError = _err =>
  Rx.Observable.throw(
    new Error('Stock data not available. Try again later!')
  );

// For the array of symbols fetch the specified fields:
// e.g. ['currency','ask','regularMarketOpen','regularMarketPrice'] etc.
const requestQuote$ = (symbols, fields) => {
  const config = {
    url: makeQuotesUrl(symbols, fields),
    method: 'GET',
    responseType: 'json'
  };
  const fetchResults$ =
    Rx.Observable
      .ajax(config)
      .retry(3)
      .catch(makeQuoteError)
      .pluck('response','quoteResponse','result');

  return fetchResults$;
};

// Periodic fetch stream - multiple symbols per fetch with all the fields needed
// "groupBy" creates a separate stream for each symbol
// so each distinctUntilChanged will only ever "see" quotes with the same symbol
// all the quotes are then merged again to a single stream with "mergeAll"
const twoSecond$ = Rx.Observable.interval(2000);
const extract = ({symbol, currency, ask: price, regularMarketOpen: open}) =>
  ({ symbol, currency, price, change: price - open});
const extractAll = results => results.map(extract);
const priceNotChanged = ({price: previous}, {price: next}) => previous.toFixed(2) === next.toFixed(2);
const makeSymbolQuoteChanged = symbolQuote$ => symbolQuote$.distinctUntilChanged(priceNotChanged);
const fetchDataInterval$ = symbols =>
  twoSecond$
    .switchMap(() => requestQuote$(symbols,['currency','ask','regularMarketOpen']))
    .mergeMap(extractAll)
    .groupBy(({symbol}) => symbol)
    .map(makeSymbolQuoteChanged)
    .mergeAll();

// Launch a single stream to periodically fetch quotes for all the specified symbols
const symbols = ['AAPL', 'CTX', 'FB'];
const ticks$ = fetchDataInterval$(symbols);
ticks$.subscribe(
  render,
  error => console.log(error.message)
);

Works on Chrome Version 65.0.3325.181 (Official Build) (64-bit)