Open frank-dspeed opened 3 years ago
const readUntilClose = stream => {
let done = false;
stream.on('close', () => done = true);
return () => {
const value = stream.read();
return Promise.resolve({ value, done });
}
}
while (true) {
const { value, done } = await readUntilClose();
if (done) { break; }
}
async function getReader() {
port = await navigator.serial.requestPort({});
await port.open({ baudrate: 9600 });
connectButton.innerText = '🔌 Disconnect';
document.querySelector('figure').classList.remove('fadeOut');
document.querySelector('figure').classList.add('bounceIn');
const appendStream = new WritableStream({
write(chunk) {
lineBuffer += chunk;
let lines = lineBuffer.split('\n');
if (lines.length > 1) {
lineBuffer = lines.pop();
latestValue = parseInt(lines.pop().trim());
}
}
});
port.readable
.pipeThrough(new TextDecoderStream())
.pipeTo(appendStream);
}
some new ideas and iterations
const getReader = () => {
const deferedPromise = { resolve: () => { /** NoOp */ } };
const write = data => deferedPromise.resolve(data);
const read = () => {
const currentTime = new Date();
return Promise.race([
new Promise( resolve => defredPromise.resolve = resolve ),
new Promise( resolve =>
setTimeout(() => resolve(
Promise.reject(
`Timeout: after ${currentTime - new Date()} ms`)
), 500)
)
]);
}
return { read, write }
}
const onData = new WritableStream({ write });
const emitterPromises = new WeakMap();
const EventEmitterToPromise = (emitter,event) => {
const promises = emitterPromises.get(emitter) || {};
if (!promises[`${event}Promise`]) {
emitterPromises.set(emitter, {
[`${event}Promise`]: new Promise( resolve => emitter.on(event,resolve) )
});
}
return promises[`${event}Promise`]
}
EventEmitterToPromise(stream,'close')
EventEmitterToPromise(stream,'drain')
EventEmitterToPromise(stream,'open')
const EventEmitter = port => {
//need to add nodeJS like backpresure
/**
* This can be archived via a custom writeable
*/
/** return pattern! */
port.read().then(function processPayload({ done, value }) {
if (done) { return; }
return reader.read().then(processPayload);
});
const listener = {}
const on = (event, cb) => {
//if event=== 'data' writeStream
//if event=== 'readable' read()
(!listener[event]) && listener[event] = [];
listener[event].push(cb);
}
const emit = (event, data) => {
const cbs = listener[event]
cbs && cbs.forEach(cb => cb(data));
}
const open = () => port.open().then(() => emit('open'));
const writer = port.writable.getWriter()
writer.closed().then(() => emit('close'))
writer.closed().then(() => emit('end'))
const close = () => port.close().then(() => emit('close'));
let drain = false;
/**
* Nodejs Streams write(data,encoding) => true,false based on ready
*/
const write = (data,encoding )=>{
drain = false;
writer.ready()
.then(()=>writer.write(data)
.then(() => drain=true)
.then(() => emit('drain')))
.catch(() => drain = false);
// Trivial Pump should be done with writer.desiredSize
return drain;
}
// draining
//stream.once('drain',()=>'dd')
const once = (event,fn) => {
if (event === 'drain') {
writer.ready().then(fn)
}
if (event === 'close'){
writer.closed().then(fn)
}
}
const nodejs = {
pipeNodeJS: function(dest, options) {
function ondata(chunk) {
if (dest.writable) {
if (false === dest.write(chunk) && stream.pause) {
stream.pause();
}
}
}
stream.on('data', ondata);
function ondrain() {
if (stream.readable && stream.resume) {
stream.resume();
}
}
dest.on('drain', ondrain);
// If the 'end' option is not supplied, dest.end() will be called when
// source gets the 'end' or 'close' events. Only dest.end() once.
if (!dest._isStdio && (!options || options.end !== false)) {
stream.on('end', onend);
stream.on('close', onclose);
}
var didOnEnd = false;
function onend() {
if (didOnEnd) return;
didOnEnd = true;
dest.end();
}
function onclose() {
if (didOnEnd) return;
didOnEnd = true;
if (typeof dest.destroy === 'function') dest.destroy();
}
// don't leave dangling pipes when there are errors.
function onerror(er) {
cleanup();
if (!this.hasListeners('error')) {
throw er; // Unhandled stream error in pipe.
}
}
stream.on('error', onerror);
dest.on('error', onerror);
// remove all the event listeners that were added.
function cleanup() {
stream.off('data', ondata);
dest.off('drain', ondrain);
stream.off('end', onend);
stream.off('close', onclose);
stream.off('error', onerror);
dest.off('error', onerror);
stream.off('end', cleanup);
stream.off('close', cleanup);
dest.off('end', cleanup);
dest.off('close', cleanup);
}
stream.on('end', cleanup);
stream.on('close', cleanup);
dest.on('end', cleanup);
dest.on('close', cleanup);
dest.emit('pipe', source);
// Allow for unix-like usage: A.pipe(B).pipe(C)
return dest;
},
pipeFrom(webStream,nodeStream) {
},
pipeTo(nodeStream,webStream) {
}
}
return {listener, on, emit, ...port, open, close }
}
// Read data that is available but keep the stream in "paused mode"
port.on('readable', function () {
console.log('Data:', port.read())
})
// Switches the port into "flowing mode"
port.on('data', function (data) {
console.log('Data:', data)
})
// Pipe the data into another stream (like a parser or standard out)
const lineStream = port.pipe(new Readline())
port.write('Hi Mom!')
port.write(Buffer.from('Hi Mom!'))
SerialPort.open()
CCTalk Loop
All your devices need to send a simplePoll on regular bases this can be used to read the Answers
so you need a sequence of
Promise method
write should return a promise that resolves once data is there