Closed icebob closed 6 years ago
That will be realy interresting ! Since actually I don't find no way to exchange stream between two services .
Would this also allow to send stream as parameter? That would be very cool!
Unfortunately not. It is just for responses. I'm thinking on it how I can add it to request side too. It is more complicated because on request side the node is selected dynamically (in addition in case of disableBalancer
there is no node selection). But in response side, the target node is known.
It will support request streams too. So you can send stream as ctx.params
and you can return a stream.
Example
Here is a sample code which is working. It creates an aes
service which has encrypt
and decrypt
action.
With the client
broker I open a file, call aes.encrypt
, the response (which is a stream too) is sent to aes.decrypt
. And this response (which is a stream again) is saved to a file and checked the result with SHA.
So it sends streams on both sides, multiple times.
"use strict";
const ServiceBroker = require("../src/service-broker");
const fs = require("fs");
const path = require("path");
const crypto = require("crypto");
const password = "moleculer";
// Create broker #1
const broker1 = new ServiceBroker({
nodeID: "client-" + process.pid,
transporter: "NATS",
logger: console,
logLevel: "info"
});
// Create broker #2
const broker2 = new ServiceBroker({
nodeID: "encrypter-" + process.pid,
transporter: "NATS",
logger: console,
logLevel: "info"
});
broker2.createService({
name: "aes",
actions: {
encrypt(ctx) {
const encrypt = crypto.createCipher("aes-256-ctr", password);
return ctx.params.pipe(encrypt);
},
decrypt(ctx) {
const decrypt = crypto.createDecipher("aes-256-ctr", password);
return ctx.params.pipe(decrypt );
}
}
});
broker1.Promise.all([broker1.start(), broker2.start()])
.delay(2000)
.then(() => {
broker1.repl();
const fileName = "d://src.zip";
const fileName2 = "d://received-src.zip";
return getSHA(fileName).then(hash1 => {
broker1.logger.info("Original SHA:", hash1);
const stream = fs.createReadStream(fileName);
broker1.call("aes.encrypt", stream)
.then(stream => broker1.call("aes.decrypt", stream))
.then(stream => {
const s = fs.createWriteStream(fileName2);
stream.pipe(s);
s.on("close", () => getSHA(fileName2).then(hash => broker1.logger.info("Received SHA:", hash)));
});
});
});
function getSHA(fileName) {
return new Promise((resolve, reject) => {
let hash = crypto.createHash("sha1");
let stream = fs.createReadStream(fileName);
stream.on("error", err => reject(err));
stream.on("data", chunk => hash.update(chunk));
stream.on("end", () => resolve(hash.digest("hex")));
});
}
I love this! When is 0.13 due?
Pff, I don't know yet. ~this month second half.
I've written streaming tests on both (req/res) side. But I realized, it is not too efficient in transfer because if a chunk size is 65k it sends 230k (stringify the buffer). So in the future maybe I need to change the protocol again... :(
FWIW:
I’m making making some tests using https://github.com/dcodeIO/protobuf.js as protobuf serializer to actually do some jobs using streams on my projects, but primary because is very fast, I’m get good results for now. I’m also making a websocket transport layer to make real time communication direct to the client side. When is done a will notify.
Merged. It will be released in v0.13.
Would be good that Moleculer supports streams in action requests & responses. It means you can return a
Stream
object in actions and Broker transfers the stream chunks while the stream is not closed. You can use it to send video or audio streams back to the caller or to API gateway. Or you can pass aStream
asparams
inbroker.call
. Broker transfers the stream chunks while the stream is not closed. With both of them, you can create encrypt, compress, ...etc services.Required changes on request side:
stream: boolean
field to REQUEST packet._sendRequest
in transit to handle streams_requestHandler
in transit to handle streamsRequired changes on response side:
stream: boolean
field to RESPONSE packet._responseHandler
in transit to handle streamssendResponse
in transit to handle streams