sclausen / ngx-mqtt

This library isn't just a wrapper around MQTT.js for angular. It uses observables and takes care of subscription handling and message routing.
https://sclausen.github.io/ngx-mqtt/
MIT License
184 stars 82 forks source link

Issue while getting Value not ack'd #222

Open th3112 opened 1 year ago

th3112 commented 1 year ago

Hi, I have a short Async procedure to get a value from ioBroker:

public async GetVal(topicname: string): Promise<string> {
    const source$: Observable<string> = this._mqttClt.observe(topicname, {qos: 1}).pipe(
      map((msg) => msg.payload.toString()),
      take(1)
    );
    let val = await lastValueFrom(source$);
    this.logMsg('GetVal >' + topicname + '< Value: >' + val + '<');
    return val;
  }

Unfortunately this only works if the Values has been set with Acknowledge. Otherwise this procedure will return noting.

Is there a way to get this also working for values that has been set without ACK? Or am I totally wrong and this not the right way?

Best regards Thomas

sclausen commented 1 year ago

Could you please add a minimal working example as asked for in the issue template?

th3112 commented 1 year ago

Hi,

I hope this can clearify the problem. This example defines a service (see iob-conn-svc.service.ts) that is used to connect to the iObroker instance.

The main app.component.ts provides a simple HTML-Page that shows the connection status and provide the function to get a state-Value from iObroker.

The default Object is hue-extended/0/lights/001-wohn_vorn/action/on.

If this state has been set within iObroker with flag "ACK" then this example works as expected when the Button "Hole Wert" is clicked. But if the state has been set without "ACK" a click on the Button returns nothing and the function GetVal seems to wait forever.

Unfortunately not all states are written with "ACK"-flag and therefore I cannot read that values with my function GetVal.

I hope this explanation can help to describe my problem. Maybe I have to write the function GetVal in an other way?

Thank you Thomas

app.components.ts:

`import { Component } from '@angular/core'; import { IobConnSvc } from './iob-conn-svc.service'; import { IiobObject } from './iob-object';

@Component({ selector: 'app-root', templateUrl: './app.component.html', styleUrls: ['./app.component.scss'] }) export class AppComponent { title = 'my1stApp'; cltOnline: boolean; public obj: IiobObject = { topicname: 'hue-extended/0/lights/001-wohn_vorn/action/on', val: 'n/a' };

constructor(public iobConn: IobConnSvc) { this.Connect(); this.iobConn.state$.subscribe((stat: boolean) => { this.cltOnline = stat; }); }

Connect(): void { this.iobConn.connect(); }

Disconnect(): void { this.iobConn.disconnect(true); }

GetVal(obj: IiobObject): void { this.iobConn.GetVal(obj.topicname).then((val) => { obj.val = val; }); }

SetVal(obj: IiobObject): void { this.iobConn.sendVal(obj); } } `

app.components.html: `

{{title}}

<ng-template *ngIf="cltOnline ;then showBlock; else notShowBlock"> <ng-template #showBlock>

Verbindungs-Status:

<ng-template #notShowBlock>

Verbindungs-Status:

Token Wert

`

iob-conn-svc.service.ts:

`// MQTT-Client // publiziert Status und bietet Connect/Disconnect // subscribed zu den angeforderten Objekten und fuehrt eine Liste // Objekte, die subscribed sind werden publiziert // Anmerkung: Shelly's haben einen Bug. deswegen muss zwingend zum Setzen // von werten '/set' an den Topic-Namen angehaengt werden und im MQTT-Server // der Haken bei 'unterschiedliche Topic-Namen für setzen und lesen nutzen' gesetzt sein

import { Injectable, OnDestroy } from '@angular/core'; import { MqttConnectionState, MqttService, IMqttServiceOptions } from 'ngx-mqtt'; import { Subject, BehaviorSubject, Observable, Observer, Subscription, pipe, lastValueFrom } from 'rxjs'; import { map, subscribeOn, take, takeLast } from 'rxjs/operators'; import { IiobObject } from './iob-object';

@Injectable({ providedIn: 'root' }) export class IobConnSvc implements OnDestroy { private readonly onDestroy = new Subject(); private _isConnected: boolean; private stateSubject = new BehaviorSubject(false); private getterDataSubject = new BehaviorSubject(0); private _connStat: Subscription; private _subscribers: any[] = [];

public state$ = this.stateSubject.asObservable(); public getterData$ = this.getterDataSubject.asObservable();

constructor(private _mqttClt: MqttService) { this._mqttClt = _mqttClt;

}

init() { this._mqttClt.connect(); this._connStat = this._mqttClt.state.subscribe((errmsg: MqttConnectionState) => { if(MqttConnectionState[errmsg] === 'CONNECTED') { this._isConnected = true; } else { this._isConnected = false; } this.logMsg('MQTTClient Status ' + MqttConnectionState[errmsg]); this.stateSubject.next(this._isConnected); }); };

get isConnected() { return this._isConnected; }

public connect() { if (!this._isConnected) { this.logMsg('Verbindung herstellen...'); this.init(); } else { this.logMsg('Client ist bereits verbunden!'); } }

public disconnect(unsub: boolean) { if(this._isConnected) { this.logMsg('Verbindung trennen...'); if (unsub === true) { this._subscribers.forEach(function(obj) { obj.sub.unsubscribe(); }); this._subscribers = []; } this.stateSubject.next(false); this._connStat.unsubscribe(); this._mqttClt.disconnect(); this._isConnected = false; this.logMsg('Verbindung getrennt.'); } else { this.logMsg('Verbindung ist bereits getrennt!'); } }

public addSubscriber(state: string) { //wenn noch noch nicht verbunden -- Connect if (!this._isConnected) { this.connect(); } //wenn schon subscribed Zaehler hochzaehlen let alreadySubscribed: boolean = false; this._subscribers.map((x, index) => { if (x.id === state) { alreadySubscribed = true; } }); if (!alreadySubscribed) { let sub: Subscription = this._mqttClt.observeRetained(state).subscribe((message) => { this.logMsg('Topic: ' + state +': ' + message.payload.toString()); this.getterDataSubject.next({id: state, val: message.payload}); }); this._subscribers.push({id: state, sub: sub}); } }

public delSubscriber(state: string) { this._subscribers.map((x, index) => { if (x.id === state) { x.sub.unsubscribe(); this._subscribers.splice(index, 1); } }); }

public async GetVal(topicname: string): Promise { const source$: Observable = this._mqttClt.observe(topicname, {qos: 1}).pipe( map((msg) => msg.payload.toString()), take(1) ); let val = await lastValueFrom(source$); this.logMsg('GetVal >' + topicname + '< Value: >' + val + '<'); return val; }

public sendVal(obj: IiobObject): void { if(this._isConnected) { //this._mqttClt.unsafePublish(state, val, {qos: 1, retain: true}); //this._mqttClt.unsafePublish(state + '/set', val, {qos: 1, retain: true}); let obs: Observer = { next: x => { this.logMsg('SendVal >' + obj.topicname + '< Value: >' + obj.val + '<'); }, error: err => { this.logMsg('Fehler bei Value setzen!' + err.toString()); }, complete: () => { this.logMsg('Value erfolgreich gesetzt.'); } }; this._mqttClt.publish(obj.topicname + '/set', obj.val, {qos: 1, retain: true}).subscribe(obs); } else { this.logMsg('Value nicht gesetzt, da keine Verbindung!'); } }

public ngOnDestroy() { this.onDestroy.next(); this.onDestroy.complete(); this.disconnect(true); }

private logMsg(msg: string) { let date = new Date(); console.log(date.toISOString() + ':' + msg); } } `

iob.objects.ts: export interface IiobObject { topicname: string; val?: string | undefined; }

sclausen commented 1 year ago

Hello Thomas, could you please put this in a runnable project?

From the issue template:

[…] https://codesandbox.io/ or in a separated github repository

PS: with triple backticks and the language

``typescript class Foo {} \``
you can format your code in github markdown

th3112 commented 1 year ago

Sorry, next try...

app.components.ts:

import { Component } from '@angular/core';
import { IobConnSvc } from './iob-conn-svc.service';
import { IiobObject } from './iob-object';

https://github.com/component({
selector: 'app-root',
templateUrl: './app.component.html',
styleUrls: ['./app.component.scss']
})
export class AppComponent {
title = 'my1stApp';
cltOnline: boolean;
public obj: IiobObject = { topicname: 'hue-extended/0/lights/001-wohn_vorn/action/on', val: 'n/a' };

constructor(public iobConn: IobConnSvc) {
this.Connect();
this.iobConn.state$.subscribe((stat: boolean) => {
this.cltOnline = stat;
});
}

Connect(): void {
this.iobConn.connect();
}

Disconnect(): void {
this.iobConn.disconnect(true);
}

GetVal(obj: IiobObject): void {
this.iobConn.GetVal(obj.topicname).then((val) => {
obj.val = val;
});
}

SetVal(obj: IiobObject): void {
this.iobConn.sendVal(obj);
}
}

iob-conn-svc.service.ts:

// MQTT-Client
// publiziert Status und bietet Connect/Disconnect
// subscribed zu den angeforderten Objekten und fuehrt eine Liste
// Objekte, die subscribed sind werden publiziert
// Anmerkung: Shelly's haben einen Bug. deswegen muss zwingend zum Setzen
// von werten '/set' an den Topic-Namen angehaengt werden und im MQTT-Server
// der Haken bei 'unterschiedliche Topic-Namen für setzen und lesen nutzen' gesetzt sein

import { Injectable, OnDestroy } from '@angular/core';
import { MqttConnectionState, MqttService, IMqttServiceOptions } from 'ngx-mqtt';
import { Subject, BehaviorSubject, Observable, Observer, Subscription, pipe, lastValueFrom } from 'rxjs';
import { map, subscribeOn, take, takeLast } from 'rxjs/operators';
import { IiobObject } from './iob-object';

@Injectable({
providedIn: 'root'
})
export class IobConnSvc implements OnDestroy {
private readonly onDestroy = new Subject();
private _isConnected: boolean;
private stateSubject = new BehaviorSubject(false);
private getterDataSubject = new BehaviorSubject(0);
private _connStat: Subscription;
private _subscribers: any[] = [];

public state$ = this.stateSubject.asObservable();
public getterData$ = this.getterDataSubject.asObservable();

constructor(private _mqttClt: MqttService) {
this._mqttClt = _mqttClt;

}

init() {
this._mqttClt.connect();
this._connStat = this._mqttClt.state.subscribe((errmsg: MqttConnectionState) => {
if(MqttConnectionState[errmsg] === 'CONNECTED') {
this._isConnected = true;
} else {
this._isConnected = false;
}
this.logMsg('MQTTClient Status ' + MqttConnectionState[errmsg]);
this.stateSubject.next(this._isConnected);
});
};

get isConnected() {
return this._isConnected;
}

public connect() {
if (!this._isConnected) {
this.logMsg('Verbindung herstellen...');
this.init();
} else {
this.logMsg('Client ist bereits verbunden!');
}
}

public disconnect(unsub: boolean) {
if(this._isConnected) {
this.logMsg('Verbindung trennen...');
if (unsub === true) {
this._subscribers.forEach(function(obj) {
obj.sub.unsubscribe();
});
this._subscribers = [];
}
this.stateSubject.next(false);
this._connStat.unsubscribe();
this._mqttClt.disconnect();
this._isConnected = false;
this.logMsg('Verbindung getrennt.');
} else {
this.logMsg('Verbindung ist bereits getrennt!');
}
}

public addSubscriber(state: string) {
//wenn noch noch nicht verbunden -- Connect
if (!this._isConnected) {
this.connect();
}
//wenn schon subscribed Zaehler hochzaehlen
let alreadySubscribed: boolean = false;
this._subscribers.map((x, index) => {
if (x.id === state) {
alreadySubscribed = true;
}
});
if (!alreadySubscribed) {
let sub: Subscription = this._mqttClt.observeRetained(state).subscribe((message) => {
this.logMsg('Topic: ' + state +': ' + message.payload.toString());
this.getterDataSubject.next({id: state, val: message.payload});
});
this._subscribers.push({id: state, sub: sub});
}
}

public delSubscriber(state: string) {
this._subscribers.map((x, index) => {
if (x.id === state) {
x.sub.unsubscribe();
this._subscribers.splice(index, 1);
}
});
}

public async GetVal(topicname: string): Promise {
const source$: Observable = this._mqttClt.observe(topicname, {qos: 1}).pipe(
map((msg) => msg.payload.toString()),
take(1)
);
let val = await lastValueFrom(source$);
this.logMsg('GetVal >' + topicname + '< Value: >' + val + '<');
return val;
}

public sendVal(obj: IiobObject): void {
if(this._isConnected) {
//this._mqttClt.unsafePublish(state, val, {qos: 1, retain: true});
//this._mqttClt.unsafePublish(state + '/set', val, {qos: 1, retain: true});
let obs: Observer = {
next: x => {
this.logMsg('SendVal >' + obj.topicname + '< Value: >' + obj.val + '<');
},
error: err => {
this.logMsg('Fehler bei Value setzen!' + err.toString());
},
complete: () => {
this.logMsg('Value erfolgreich gesetzt.');
}
};
this._mqttClt.publish(obj.topicname + '/set', obj.val, {qos: 1, retain: true}).subscribe(obs);
} else {
this.logMsg('Value nicht gesetzt, da keine Verbindung!');
}
}

public ngOnDestroy() {
this.onDestroy.next();
this.onDestroy.complete();
this.disconnect(true);
}

private logMsg(msg: string) {
let date = new Date();
console.log(date.toISOString() + ':' + msg);
}
}

iob-objects.ts:

export interface IiobObject {
    topicname: string;
    val?: string | undefined;
}

app.component.html:

<h1 class="th-title">{{title}}</h1>

<ng-template
  *ngIf="cltOnline ;then showBlock; else notShowBlock">
</ng-template>
<ng-template #showBlock>
  <div>
    <table>
      <tr>
        <td class="th-subtitle">Verbindungs-Status:</td>
        <td><button mat-icon-button color="primary" aria-label="Connection Status" matTooltip="Verbindungs-Status" (click)="Disconnect()">
          <mat-icon>done</mat-icon>
        </button></td>
      </tr>
    </table>
  </div>
</ng-template>

<ng-template #notShowBlock>
  <div>
    <table>
      <tr>
        <td>Verbindungs-Status:</td>
        <td><button mat-icon-button color="warn" aria-label="Connection Status" matTooltip="Verbindungs-Status" (click)="Connect()">
          <mat-icon>close</mat-icon>
        </button></td>
      </tr>
    </table>
  </div>
</ng-template>

<table>
  <tr>
    <td><mat-form-field appearance="outline" style="width:100px;font-size:small;"><mat-label>Token</mat-label><input matInput [(ngModel)]="obj.topicname" type="text" /></mat-form-field></td>
    <td><mat-form-field appearance="outline" style="width:100px;font-size:small;"><mat-label>Wert</mat-label><input matInput [(ngModel)]="obj.val" type="text" /></mat-form-field></td>
    <td><button mat-stroked-button type="button" style="background-color:lightgrey;" (click)="GetVal(obj)">Hole Wert</button></td>
  </tr>
</table>
sclausen commented 1 year ago

Oh man, I'm very sorry, but the essential thing is still missing.

could you please put this in a runnable project? Doing this by myself not only leads to a source of errors, which you don't have so it decreases reproducability of your issue and furthermore, it consumes time on my side so, please help me helping you 😉

From the issue template:

[…] https://codesandbox.io/ or in a separated github repository