Closed rudzikdawid closed 3 years ago
You need to ensure that the JSON received is valid - I hit this when receiving an unquoted string for example.
You can test this by passing the data received to JSON.parse - this is what the underlying code will use
By default webSocketSubject is trying to deserialize the message by applying JSON.parse()
. If you message is not an string is going to fail (the error you are getting now). Now when you tell to webSocket
to use this deserializer () => {}
, you are telling it, just give me back an undefined
value (Since it is returning nothing).
Here you have two options,
Example:
Here you can apply whatever transformation you want. I just not applying anything but avoiding the default behaviour (JSON.parse()
). Since JSON.parse()
attempts to transform an string to a JSON object and surely you are not expecting that type of data (an string coming from your server).
const WebSocketSubject = webSocket({
url: 'ws://localhost:3200',
deserializer: msg => msg
});
i can't use deserializer to resolve data from server which is Blob,because it doesn't support async
What do you mean by
because it doesn't support async
Since you are sending (from server to client) chunks of data. You can make use perfectly of deserializer property. Let's say you have,
Server.js
const src = fs.createReadStream(`${__dirname}/file.pdf`);
src.on("data", chunk => {
ws.send(chunk);
});
src.on("end", () => {
ws.send('end');
});
Now, in your client you can:
let pdf = new FileReader();
const WebSocketSubject = webSocket({
url: "ws://localhost:3200",
deserializer: msg => {
const res = msg;
if (res.data === "end") {
// Here you have a complete pdf as Arraybuffer, you can do whatever you want
console.log( pdf );
} else {
pdf.readAsArrayBuffer(msg.data)
}
return { type: "error" };
}
});
@luillyfe when use multiplex, i use
(message: any) => {
return message.requestId === params.requestId
}
to filter data. so i have to use deserializer to resolve data (which is Blob) to text and then parse to json. now i can't get the filter work because it receive a Promise.
all code here:
import { webSocket } from 'rxjs/webSocket'
const api = (options) => {
const subject = webSocket(options);
return {
subscribe: (params: any) => {
const observableA = subject.multiplex(
() => params,
() => ({ close: true }),
(message: any) => {
return message.requestId === params.requestId
}
);
return observableA
}
}
}
const ws = api({
url: 'ws://127.0.0.1:50055/ws/v2',
protocol: 'json',
binaryType: 'blob',
deserializer: ({ data }) => {
return new Promise((res, rej) => {
const reader = new FileReader()
reader.readAsText(data)
reader.onload = () => {
res(JSON.parse(reader.result))
}
})
}
})
ws.subscribe({requestId: '1', data:{}})
.subscribe((res) => {
console.log('res', res);
})
@iamcco thanks for sharing your code, that makes clear your problem. I think your issue https://github.com/ReactiveX/rxjs/issues/4270 is pretty interesting and I support your request. But meanwhile you can use this as a workaround, let me know if it fits your requirements. I tried to keep using your code 😱.
const blobSubject = new Subject(), reader = new FileReader();
const blobMessages$ = blobSubject.asObservable();
let chunks = [];
reader.onloadend = event => {
const response = new TextDecoder("utf-8").decode(event.target.result);
const message = { type: "Blob", file: JSON.parse(response) };
blobSubject.next(message);
};
const api = options => {
const subject = webSocket(options);
return {
subscribe: (params: any) => {
let observableA = subject.multiplex(
() => params,
() => ({ close: true }),
(message: any) => {
return message.requestId === params.requestId
}
);
observableA = concat(blobMessages$, observableA);
return observableA
}
}
}
const ws = api({
url: "ws://127.0.0.1:50055/ws/v2",
protocol: 'json',
binaryType: 'blob',
deserializer: ({ data }) => {
if (data === "end" && chunks.length > 0) {
reader.readAsArrayBuffer(new Blob(chunks));
chunks = [];
return {type: "error"};
} else {
if (data && data.type === "") {
chunks.push(data);
} else {
return JSON.parse(data);
}
}
return {type: "error"};
}
});
Notice I am using onloadend
event instead of onload
since this event (onloadend
) is triggered each time the reading operation is completed (either in success or failure). Then you can,
ws.subscribe({requestId: '1', data:{})
.subscribe(data => {
console.log(data);
});
I just create a Branch here, you can play with. https://github.com/luillyfe/rxjs-websockets/tree/features/blob-type-support
@luillyfe thanks for your help. i had check your branch, but i can't get it work.
it seems that this doesn't work.
this line:
https://github.com/luillyfe/rxjs-websockets/blob/features/blob-type-support/src/app.js#L89
binary
will not subscribe before blobMessages$
is complete. so it will not send message to server.
@iamcco In your case it does not work because you need to replace process.env.API_KEY
with your own API_KEY
. (Here I am using .env variables to keep my API_KEY
secret)
bin/www
const dataSourceUrl = (topic = "Google", page = 1) => "https://newsapi.org/v2/everything?" +
"q="+ topic +"&" +
"from=2018-10-13&" +
"sortBy=popularity&" +
"page="+ page +"&" +
"apiKey="+ process.env.API_KEY +"";
I am using this site as a source, https://newsapi.org/
. Just request an API_KEY
and replace it, then it should works.
@luillyfe i have request an API_KEY
and test it.
but the binary doesn't call:
https://github.com/luillyfe/rxjs-websockets/blob/features/blob-type-support/src/app.js#L70
the news receive from the server is through the chat
subscribe and resolve by deserializer
and emit by blobMessages$
@iamcco what is the error you are getting now, I mean what output do you have in the console ? Because as far as I see it is working as expected, it should fill the card with news every 25 seconds.
In any case I am about to update the branch, because there is not point at all to use concat
and multiplexing
however you can "mimic" this behaviour.
const news$ = blobSubject.asObservable().pipe(
filter(message => message.type === "Blob"),
map(message => {
if (message.articles && message.articles.length > 0) {
return message.articles;
}
return { message: "No data!" };
})
);
Let me know how this works for you. 🤪
@luillyfe it works in this way, thanks.
Actually there is not point at all to create a new Subject
. Just create in advance an Observer
that will let you use the next
method to send messages to the Observable
.
const obs = {
next(articles) {
if (articles && articles.length >= 3) {
const carusellActive = [...Array.from(document.querySelector(".carousel-item.active").children)];
carusellActive.forEach((card, index) => {
const {urlToImage, author, content, url} = articles[index];
setCardInfo(card, {urlToImage, author, content, url});
});
} else {
console.log("It looks like there is not more content related to this topic. We switched the topic.");
setTopic();
}
},
error(error) { console.log(error); },
complete() { console.log("This is over!"); }
};
I already updated the Branch. https://github.com/luillyfe/rxjs-websockets/tree/features/blob-type-support. Any Improvements are welcome.
Event better, just use fromEvent
to listen for changes on reader
.
const news$ = fromEvent(reader, "loadend").pipe(
map(event => {
const response = new TextDecoder("utf-8").decode(event.target.result);
return JSON.parse(response).articles;
})
);
This seems like it was just a debugging session. Closing.
Bug Report
Current Behavior using string
urlConfigOrSource
generatesSyntaxError: Unexpected token s in JSON at position 0
withWebSocketSubjectConfig
and defineddeserializer
function error does not appear.Reproduction
throws error
to fix this we need use:
after that
this.pubsubSubjectsubscribe()
doesn't throw errorEnvironment