TheThingsNetwork / lorawan-stack

The Things Stack, an Open Source LoRaWAN Network Server
https://www.thethingsindustries.com/stack/
Apache License 2.0
975 stars 306 forks source link

Event Pipelines Registry #4936

Closed htdvisser closed 2 years ago

htdvisser commented 2 years ago

Summary

I propose to add an "Event Pipelines Registry" to the Identity Server.

The goal of the "Event Pipelines Registry" is to have a central API and storage model that allows us to configure event pipelines in different scopes: Entity (Application/Gateway to start with), Organization, and User. This allows organizations and users to configure event pipelines for all their applications/gateways at once, instead of having to configure them on different entities (applications/gateways), on different servers (GS/NS/AS/JS) in different clusters.

Additionally, the API, the storage and the (web) UI should be flexible enough to support "plugins" similar to what I described in https://github.com/TheThingsNetwork/lorawan-stack/issues/4935, allowing for (enterprise) plugins to provide event pipeline types that are understood by the "Event Server" and (web) UI that can be (dynamically) loaded by the Console.

How do you propose to implement this?

The API for a Pipeline will be as follows:

message GatewayEventPipeline {
  string pipeline_id = 1; // Generated by the server.
  string pipeline_type = 2;
  google.protobuf.Timestamp created_at = 3;
  google.protobuf.Any configuration = 4;
}

The pipeline_type is the name of the "plugin" that provides the pipeline. This name is used by the Event Server to load the plugin, which is configured with the data. The name is also used by the Console to load the web UI for configuring the pipeline.

The API for the EventPipelineRegistry would look like this:

message CreateGatewayEventPipelineRequest {
  oneof scope {
    GatewayIdentifiers gateway_ids = 1;
    OrganizationIdentifiers organization_ids = 2;
    UserIdentifiers user_ids = 3;
  }
  GatewayEventPipeline pipeline = 4;
}

// etc. and similar for Applications...

service EventPipelineRegistry {
  // ... Application pipelines

  rpc CreateGatewayEventPipeline(CreateGatewayEventPipelineRequest) returns (CreateGatewayEventPipelineResponse) {
    option (google.api.http) = {
      post: "/gateways/{gateway_ids.gateway_id}/event-pipelines"
      body: "*"
      additional_bindings {
        post: "/organizations/{organization_ids.organization_id}/gateways/event-pipelines"
        body: "*"
      }
      additional_bindings {
        post: "/users/{user_ids.user_id}/gateways/event-pipelines"
        body: "*"
      }
    };
  }

  // List event pipelines for the gateway.
  // For the gateway_ids scope, this will also return pipelines for the organization_ids and user_ids scopes.
  rpc ListGatewayEventPipelines(ListGatewayEventPipelinesRequest) returns (ListGatewayEventPipelinesResponse) {
    option (google.api.http) = {
      get: "/gateways/{gateway_ids.gateway_id}/event-pipelines"
      additional_bindings {
        get: "/organizations/{organization_ids.organization_id}/gateways/event-pipelines"
      }
      additional_bindings {
        get: "/users/{user_ids.user_id}/gateways/event-pipelines"
      }
    };
  }

  rpc GetGatewayEventPipeline(GetGatewayEventPipelineRequest) returns (GatewayEventPipeline) {
    option (google.api.http) = {
      get: "/gateways/{gateway_ids.gateway_id}/event-pipelines/{pipeline_id}"
      additional_bindings {
        get: "/organizations/{organization_ids.organization_id}/gateways/event-pipelines/{pipeline_id}"
      }
      additional_bindings {
        get: "/users/{user_ids.user_id}/gateways/event-pipelines/{pipeline_id}"
      }
    };
  }

  // maybe Update?

  rpc DeleteGatewayEventPipeline(DeleteGatewayEventPipelineRequest) returns (google.protobuf.Empty) {
    option (google.api.http) = {
      delete: "/gateways/{gateway_ids.gateway_id}/event-pipelines/{pipeline_id}"
      additional_bindings {
        delete: "/organizations/{organization_ids.organization_id}/gateways/event-pipelines/{pipeline_id}"
      }
      additional_bindings {
        delete: "/users/{user_ids.user_id}/gateways/event-pipelines/{pipeline_id}"
      }
    };
  }
}

Since ListGatewayEventPipelines would also return pipelines configured on the Organization and User scopes, this would need to be implemented in the Identity Server so that we can derive memberships.

The initial implementation may only support the Entity (Application/Gateway) scope.

Event Servers would fetch and cache event pipelines for each gateway/application for which they process events (https://github.com/TheThingsIndustries/lorawan-stack/issues/3001 may also apply here).

htdvisser commented 2 years ago

Re-assigning to @johanstokking and @adriansmares because they're also assigned on https://github.com/TheThingsIndustries/lorawan-stack/issues/1804

johanstokking commented 2 years ago

We'll revisit this later as needed. Many things have changed in the meantime.

I think that triggering alerts for gateway disconnection, for example, should be handled by the NOC.