Open eric-basley opened 5 years ago
I've made this implementation, but behavior of count() is very weird !!!
const throughLimit= (fn, stream, maxConcurrency) => { const running = flyd.stream(0); const cache = []; return flyd.combine((s, count, self, changed) => { if(count() < maxConcurrency) { count(count() + 1); const value = includes(s, changed) ? s() : cache.pop(); if(value) fn(value, res => { count(count() - 1); self(res) }); }else if (includes(s, changed)){ cache.unshift(s()); } }, [stream, running]); };
Can we update a dependant stream inside a combine() body ?
let's forget the first buggy implementation! here's a first version, with a mix of stream and recursive callbacks.
it's called as: const resStream = throughLimit(makeRequest, ulrsStream, 10)
Does anyone have a better idea ?
const throughLimit= (fn, stream, maxConcurrency) => { let running = 0; const cache = []; const callFn = (value, cb) => { fn(value, res => { cb(res); if(cache.length) { const value = cache.pop(); running++; callFn(value, cb); } }); }; return flyd.combine((s, self, changed) => { if(running < maxConcurrency) { running++; callFn(s(), res => { running--; self(res); }); }else{ cache.unshift(s()); } }, [stream]); };
You could use ramda to help you there
Maybe something like this could do the trick for you?
const urlStream = flyd.stream() // Stream string
const joinedUrlStream = urlStream
.pipe(flyd.scan(R.flip(R.append), [])) // Stream [string]
const requestStream = joinedUrlStream
.map(R.splitEvery(1000)) // Stream [[string]]
.map(async splitUrls => {
let result = []
for (const urls of splitUrls) {
result = result.concat(await Promise.all(urls.map(makeRequest)))
}
return result
}) // Stream Promise [Result]
const responseStream = requestStream
.chain(flyd.fromPromise) // Stream [Result]
I can't find a way to build a control flow system to pop values from a stream until a concurrency limit.
const urls = reduce((stream, url) => stream(url), flyd.stream(), R.times(makeUrl(), 1000)); const results = throughLimit(requestUrl, urls, { maxConcurrency }); // only maxConcurrency requests can be pending at a time // result from requestUrl() are pushed one by one in results
Thanks for your help.
Eric.