moleculerjs / moleculer-web

:earth_africa: Official API Gateway service for Moleculer framework
http://moleculer.services/docs/moleculer-web.html
MIT License
291 stars 118 forks source link

Piping into the response from an action to socket.io #250

Closed rishighan closed 3 years ago

rishighan commented 3 years ago

I have a socket.io setup, where I am emitting an event from a Moleculer service and the express client app responds to it. My action creates a stream from the JSON and I want to pipe it into the response object.

Currently I am just returning the stream. My question is: Do I have access to the response object from my action that I can pipe the stream into? If not, what are my options?

My action in import.service.ts

getComicCovers: {
    rest: "POST /getComicCovers",
    params: {
        extractionOptions: "object",
        walkedFolders: "array",
    },
    async handler(
        ctx: Context < {
            extractionOptions: IExtractionOptions;
            walkedFolders: IFolderData[];
        } >
    ) {
        switch (
            ctx.params.extractionOptions.extractionMode
        ) {
            case "bulk":
                map(
                    ctx.params.walkedFolders,
                    async (folder, idx) => {
                        let foo = await extractArchive(
                            ctx.params
                            .extractionOptions,
                            folder
                        );
                       // THIS CREATES A STREAM (https://github.com/Faleij/json-stream-stringify)
                        let jsonStream =
                            new JsonStreamStringify({
                                foo,
                            });
                        return jsonStream; // HOW CAN I DO res.pipe(jsonStream) ????
                    }
                );

            case "single":
                return await extractArchive(
                    ctx.params.extractionOptions,
                    ctx.params.walkedFolders[0]
                );
            default:
                console.log(
                    "Unknown extraction mode selected."
                );
                return {
                    message:
                        "Unknown extraction mode selected.",
                        errorCode: "90",
                        data: `${ctx.params.extractionOptions}`,
                };
        }
    },
},

My api.service.ts

started(): any {
        // Create a Socket.IO instance, passing it our server
        this.io = IO.listen(this.server);

        // Add a connect listener
        this.io.on("connection", (client) => {
            this.logger.info("Client connected via websocket!");

            client.on("call", ({
                action,
                params,
                opts
            }, done) => {
                this.logger.info(
                    "Received request from client! Action:",
                    action,
                    ", Params:",
                    params
                );
                const stream = ss.createStream();
                this.broker
                    .call("import." + action, params, opts)
                    .then((resp) => {
                        ss(client).emit(
                            "comicBookCoverMetadata",
                            stream,
                            resp // <----------- THIS NEEDS TO BE A STREAM!!!!!
                        );
                    })
                    .catch((err) => this.logger.error(err));
            });

            client.on("disconnect", () => {
                this.logger.info("Client disconnected");
            });
        });
icebob commented 3 years ago

You can't access res in services because it's non-serializable only in API gateway. To return a stream, just return it in the service. but I don't know how it should transfer with socket.io.

rishighan commented 3 years ago

So I made a change, I created a custom function in an alias in my API gateway. I have access to the res object here. The question is how to I call it with broker.call ?

aliases: {
    async "POST getComicCovers"(req, res) {
        const {
            extractionOptions,
            walkedFolders
        } =
        req.body;

        switch (extractionOptions.extractionMode) {
            case "bulk":
                map(
                    walkedFolders,
                    async (folder, idx) => {
                        let foo = await extractArchive(
                            extractionOptions,
                            folder
                        );
                        // console.log("levar", foo);
                        let jsonStream =
                            new JsonStreamStringify({
                                foo,
                            });
                        jsonStream.pipe(res);
                    }
                );

            case "single":
                return await extractArchive(
                    extractionOptions,
                    walkedFolders[0]
                );
            default:
                console.log(
                    "Unknown extraction mode selected."
                );
                return {
                    message:
                        "Unknown extraction mode selected.",
                        errorCode: "90",
                        data: `${extractionOptions}`,
                };
        }
    },
},