Open viniciusCamargo opened 6 years ago
I assume it's important to pipe the responses in order?
First of all, you'll want to implement getUsers
this way
const getUsers = (userId) => _(request(`https://www.myapi.com/users?id=${userId}`))
request
returns a Node stream, not a Highland stream, so you want to convert it to a Highland stream first. Then something like this should work
app.get('/', (req, res) => {
_(users).flatMap(getUser).pipe(res);
})
or if you want to do the requests in parallel,
app.get('/', (req, res) => {
// Replace 2 with your desired parallelism factor
_(users).map(getUser).parallel(2).pipe(res);
})
I'm honestly not sure why you're getting the "already consumed" errors. It's there because streams are one-time use, but I don't see you reusing any streams, so I'm not sure what's wrong.
Thank you, @vqvu!
Here's what I tried (it is the actual code):
const express = require('express')
const request = require('request')
const _ = require('highland')
const app = express()
const api = (id) => request(`https://www.myapi.com/users?id=${id}`)
const parseJSON = (buffer) => JSON.parse(buffer.toString())
const toBuffer = (obj) => {
const string = JSON.stringify(obj)
const buffer = new Buffer(string)
return buffer
}
const fetchEntries = (data) => {
const { submitted } = data
return submitted.filter((e, i) => i < 3)
}
const filterTitles = (entries) => {
return _(entries).flatMap(api)
}
const filter = _.pipeline(
_.map(parseJSON),
_.map(fetchEntries),
_.map(filterTitles),
_.map(toBuffer)
)
app.get('/', async (req, res) => {
_(request('initialRequest')).pipe(filter).pipe(res)
})
app.listen(3000, () => l('http://localhost:3000'))
And the response:
{
"domain": null,
"_events": {},
"_eventsCount": 2,
"__HighlandStream__": true,
"id": "766078",
"paused": true,
"_incoming": [],
"_outgoing": [],
"_consumers": [],
"_observers": [],
"_destructors": [],
"_send_events": false,
"_nil_pushed": false,
"_delegate": null,
"_is_observer": false,
"_in_consume_cb": false,
"_repeat_resume": false,
"_consume_waiting_for_next": false,
"source": null,
"writable": true
}
In fact, it's not working because I expected the responses from the API.
There's a few things wrong here, so I'll just start at the top.
As I mentioned, api
should be wrapping the result of request
in a Highland stream.
const api = (id) => _(request(`https://www.myapi.com/users?id=${id}`))
filterTitle
doesn't seem to filter anything. Did you switch the implementations of filterTitles
and fetchEntries
?
Your filter
pipeline doesn't make sense.
The input is a stream of Buffers
, since that is what request
provides. The response may come in multiple chunks (Buffer
). The way you have it now, each chunk will be processed separately. Unless your initial response is small enough to fit in a single chunk, JSON.parse
will throw an error, since you're passing it only a partial JSON. Since you need the full response, you're better off using a library like request-promise for the initial request. The api
function can still the vanilla request library.
filterTitle
returns a Highland stream, so when you use it with map
, you get a stream of Highland streams. Then when you map the result, toBuffer
is called with a Highland stream as input. This is why you get that response. It's what you get if you JSON.stringify
a Highland stream.
You probably want to use flatMap
instead. This will take the result of filterTitles (a Highland stream) and "flatten" it so that you end up with a stream of Buffers.
You don't need to call toBuffer
at the end of filter
. You should already be getting buffers from request
. Just omit the toBuffer
call.
To put it all together, your filter
function should probably look like this.
// Input is a stream of JSON strings.
_.map(parseJSON), // After this, it's a stream of parsed JSON objects.
_.map(filterTitles), // After this, it's a stream of arrays of user ids.
_.flatMap(fetchEntries), // After this, it's a stream of Buffers
// No need to call toBuffer
pipeline
and pipe
are meant for interop with Node streams. Don't use them with Highland streams. Instead, you can implement filter
like this and use through
const filter = (stream) => {
return stream.map(parseJSON)
.map(filterTitles)
.flatMap(fetchEntries)
)
app.get('/', async (req, res) => { _(requestPromise('initialRequest')).through(filter).pipe(res) })
I had to pass that data to a new Buffer
, otherwise, I'd get a TypeError: First argument must be a string or Buffer
. I refactored the code and changed some functions names to help, but it still doesn't work.
const express = require('express')
const rp = require('request-promise')
const _ = require('highland')
const app = express()
const parseJSON = (buffer) => JSON.parse(buffer.toString())
const filterSubmissions = (data) => {
const { submitted } = data
return submitted.filter((e, i) => i < 3) // returns the first 3 entries
}
const filter = (stream) => stream.map(parseJSON).map(filterSubmissions)
const getUser = (id) => `https://hacker-news.firebaseio.com/v0/user/${id}.json?print=pretty`
app.get('/', (req, res) => {
_(rp(getUser('user'))).through(filter).pipe(res)
})
app.listen(3000, () => console.log('http://localhost:3000'))
The reason why I said you didn't need toBuffer
was because you were making the secondary call to myapi.com
, which already returned buffers. Now that you're not doing that anymore, you need to add toBuffer
(or its equivalent) back in.
Change filter
to
const filter = (stream) => stream.map(parseJSON).map(filterSubmissions).map(JSON.stringify);
The server outputs when fetching the test jl
user.
[14237416,11871616,11483492]
Hello, nice folks!
I have a function that makes a request and returns a stream.
I have an array of users' IDs and I want to call
getUser
on each one of them:*Most of the time I don't know how many users I will have in my array.
Now I want to create an array of streams:
What I am trying to do is iterate over
requests
and pipe its response tohttp.ServerResponse
:When I do this I get an error:
Stream already being consumed, you must either fork() or observe()
What I tried is something like
_(requests.fork()).pipe(res)
but it obviously didn't work.It might be something silly that I'm missing but if anyone could point me to what am I doing wrong, or have any hint, I'd really appreciate.
Thank you!