ballerina-platform / ballerina-library

The Ballerina Library
https://ballerina.io/learn/api-docs/ballerina/
Apache License 2.0
138 stars 56 forks source link

Revamp listener support in Salesforce connector #6177

Open aashikam opened 3 months ago

aashikam commented 3 months ago

Description: The salesforce connector currently supports the listener implementation through the ballerinax/trigger.salesforce package. Ideally this should be supported through the ballerinax/salesforce package itself.

aashikam commented 3 months ago

Update

  1. The Salesforce listener is moved to the Salesforce package with minor updates to the exiting code bases. This is supported under the salesforce.events module. There are few couple of build script issues to be fixed.

  2. There is a new API available called Pub/Sub API, which provides a single interface for publishing and subscribing to platform events.

Streaming API (Existing one)

Ref: https://developer.salesforce.com/docs/atlas.en-us.api_streaming.meta/api_streaming/intro_stream.htm

Allows you to receive updates or events from Salesforce in real-time without needing to constantly ask for them. Instead of your application repeatedly asking Salesforce if there are any updates, Salesforce sends the updates directly to your application as they happen.

Pub/Sub API

Ref: https://developer.salesforce.com/docs/platform/pub-sub-api/guide/intro.html

Lets you easily send and receive messages about events happening in a system. These events could be anything from real-time updates to changes in data.

Similarities:

  1. Real-time event handling: Both APIs support real-time event handling, allowing applications to receive updates as they occur without needing to constantly poll for them.
  2. Subscription mechanism: Both APIs provide a subscription mechanism where clients can subscribe to specific types of events they're interested in.
  3. Reduction in unnecessary requests: Both APIs aim to reduce unnecessary requests to the server by delivering updates only when there is new information available.
  4. Efficiency: Both APIs are designed to be efficient in delivering updates to clients, improving the overall performance of applications that rely on real-time data.

Differences:

  1. Technology stack: The Pub/Sub API is based on gRPC API and HTTP/2, while the Streaming API uses HTTP/1.1 and the Bayeux protocol (CometD implementation). This difference in underlying technologies might impact the implementation details and integration with different systems.
  2. Event types: The Pub/Sub API primarily focuses on publishing and subscribing to platform events and change data capture events, whereas the Streaming API supports a broader range of event types including PushTopic events, generic events, platform events, and Change Data Capture events.
  3. Message reliability: The Pub/Sub API provides reliable message delivery by enabling replay of past events through durable streaming. The Streaming API offers similar functionality for clients subscribed with API version 37.0 or later.
  4. Message durability: Salesforce stores different types of events for different durations, and the mechanisms for retrieving missed events differ between the two APIs. The Pub/Sub API allows retrieval of events within the retention window through durable streaming, whereas the Streaming API stores events for a certain period and allows clients to retrieve missed events when they reconnect.

Considering the implementation and integration complexity, this migration would take some time as this is entirely new. Can't guarantee there wont be any breaking changes either.

I tried out the event listener of the Pub/Sub API here. Works as expected and is pretty straightforward.

aashikam commented 2 months ago

Streaming API vs Pub/Sub API

Streaming API

The Salesforce Streaming API utilizes the Bayeux Protocol and the CometD framework to enable real-time event streaming and communication between Salesforce servers and client applications.

The Salesforce Streaming API primarily operates using HTTP/1.1, with support for HTTP/2 in some cases, depending on the client and server configurations.

Pub/Sub API

A newer API offering similar capabilities to the Streaming API but based on gRPC API and HTTP/2, providing efficient binary event messages., delivering binary event messages in the Apache Avro format, ensuring efficient publishing and delivery.

Considerations

In summary, if the application requires real-time updates and continuous streaming of events, the Streaming API might be a better choice. However, if you need more flexibility in event filtering and control over the types of events received, the Pub/Sub API could be a better fit.

aashikam commented 1 month ago

Design Review: Ballerina Salesforce Listener

Overview

The ballerinax/salesforce.events module provides a Listener to grasp events triggered from a Salesforce org. This functionality is provided by Salesforce Streaming API. (We have decided on using the Streaming API after an internal discussion weighing the pros and cons)

Type Definitions

# Salesforce listener configuration.
# 
# + username - Salesforce login username
# + password - Salesforce login password appended with the security token (<password><security token>)
# + channelName - The channel name to which a client can subscribe to receive event notifications
# + replayFrom - The replay ID to change the point in time when events are read
#   - `-1` - Get all new events sent after subscription. This option is the default
#   - `-2` - Get all new events sent after subscription and all past events within the retention window
#   - `Specific number` - Get all events that occurred after the event with the specified replay ID
# + environment - The type of salesforce environment
#   - `PRODUCTION` - Production environment
#   - `SANDBOX` - Sandbox environment
#   - `DEVELOPER` - Developer environment
@display{label: "Listener Config"}
public type ListenerConfig record {|
    @display{label: "Username", "description": "Salesforce login username"}
    string username;
    @display{label: "Password", "description": "Salesforce login password appended with the security token (<password><security token>)"}
    string password;
    @display{label: "Channel Name", "description": "The channel name to which a client can subscribe to receive event notifications"}
    string channelName;
    @display{label: "Replay ID", "description": "The replay ID to change the point in time when events are read"}
    int replayFrom = REPLAY_FROM_TIP;
    @display{label: "Environment", "description": "The type of Salesforce environment"}
    string environment = PRODUCTION;
|};

# The type of Salesforce environment
# + PRODUCTION - Production environment
# + SANDBOX - Sandbox environment
# + DEVELOPER - Developer environment
public enum Organization {
    PRODUCTION = "Production",
    DEVELOPER = "Developer",
    SANDBOX = "Sandbox"
}

# Replay ID `-1` to get all new events sent after subscription. This option is the default
public const REPLAY_FROM_TIP = -1;
# Replay ID `-2` to get all new events sent after subscription and all past events within the retention window
public const REPLAY_FROM_EARLIEST = -2;

#  Contains data returned from a Change Data Event.
#
# + changedData - A JSON map which contains the changed data
# + metadata - Header fields that contain information about the event
public type EventData record {
    map<json> changedData;
    ChangeEventMetadata metadata?;
};

# Header fields that contain information about the event.
#
# + commitTimestamp - The date and time when the change occurred, represented as the number of milliseconds 
#                     since January 1, 1970 00:00:00 GMT
# + transactionKey - Uniquely identifies the transaction that the change is part of
# + changeOrigin - Origin of the change. Use this field to find out what caused the change.  
# + changeType - The operation that caused the change  
# + entityName - The name of the standard or custom object for this record change
# + sequenceNumber - Identifies the sequence of the change within a transaction
# + commitUser - The ID of the user that ran the change operation
# + commitNumber - The system change number (SCN) of a committed transaction
# + recordId - The record ID for the changed record
public type ChangeEventMetadata record {
    int commitTimestamp?;
    string transactionKey?;
    string changeOrigin?;
    string changeType?;
    string entityName?;
    int sequenceNumber?;
    string commitUser?;
    int commitNumber?;
    string recordId?;
};

Prerequisites to using the listener

Salesforce listener usage

To use the Salesforce listener in your Ballerina application, update the .bal file as follows:

Step 1: Import listener

Import the ballerinax/salesforce.events module as shown below.

import ballerinax/salesforce.events as sfdc;

Step 2: Create a new listener instance

Create a sfdc:ListenerConfig configuration object with your Salesforce User Name, Salesforce Password, Salesforce Security Token, and Subscribe Channel Name. Then, initialize the listener with it.

Notes:

sfdc:ListenerConfig configuration = {
    username: "USER_NAME",
    password: "PASSWORD" + "SECURITY_TOKEN",
    channelName: "CHANNEL_NAME"
};
listener Listener sfdc:Listener = new (configuration);

Step 3: Implement a listener remote function

The Ballerina Salesforce Listener is designed to subscribe to Salesforce events, filter them based on specified criteria, and react to these events accordingly. Below is a generalized skeleton of the design. Now, implement a listener service with remote functions to handle specific event types.

import ballerinax/salesforce.events as sfdc;

service sfdc:RecordService on sfdcListener {
    # Triggers on a record update event.
    remote function onUpdate(sfdc:EventData event) returns error? {
    }
    # Triggers on a new record create event.
    remote function onCreate(sfdc:EventData event) returns error? {
    }
    # Triggers on a record delete event.
    remote function onDelete(sfdc:EventData event) returns error? {
    }
    # Triggers on a record restore event.
    remote function onRestore(sfdc:EventData event) returns error? {
    }
}

4. Subscribing to Events

To subscribe to events using the Ballerina Salesforce Listener:

5. Filtering Events

Events can be filtered based on specific criteria within the service functions. For example:

remote function onCreate(EventData payload) {
    // Handle creation events
}

Sample

ListenerConfig listenerConfig = {
    username: username,
    password: password,
    channelName: "/data/ChangeEvents"
};
listener Listener eventListener = new (listenerConfig);

service RecordService on eventListener {
    remote function onCreate(EventData payload) {
        io:println("Created " + payload.toString());
    }

    remote isolated function onUpdate(EventData payload) {
        json accountName = payload.changedData.get("Name");
        if (accountName.toString() == "WSO2 Inc") {
            io:println("Updated " + payload.toString());
        } else {
            io:println(payload.toString());
        }
    }

    remote function onDelete(EventData payload) {
        io:println("Deleted " + payload.toString());
    }

    remote function onRestore(EventData payload) {
        io:println("Restored " + payload.toString());
    }
}

Discussion Points

aashikam commented 1 month ago

The changes proposed at the design review

  1. Move the listener in to the salesforce default package, so the listener would be available as salesforce:Listener.
  2. Changes to Listener configuration and init:

Configurations related to authentication.

#

+ username - Username to use for authentication

+ password - Password/secret/token to use for authentication

public type Credentials record {| string username; string password; |};

Salesforce listener configuration.

+ auth - Configurations related to username/password authentication

+ replayFrom - The replay ID to change the point in time when events are read

+ isSandbox - The type of salesforce environment, if sandbox environment or not

public type ListenerConfig record {| Credentials auth; int|ReplayOpitons replayFrom?; boolean isSandbox = false; |};

public enum ReplayOpitons { REPLAY_FROM_TIP, REPLAY_FROM_EARLIEST }

3. Move subscription to service level, currently there should be one topic subscription per listener but this can be changed to one subscription per service. Therefore the listener can be shared among the services. The topic/channel name can be given as the service name. 
```ballerina
service "data/ChangeEvents" on eventListener {
    remote function onCreate(EventData payload) {
        io:println("Created " + payload.toString());
    }
}
  1. Changes to remote functions These are the generic structures of the event types.
    • create
      {
      "eventType": "created",
      "objectType": "Account",
      "recordId": "001XXXXXXXXXXXX",
      "fields": {
      "Name": {
      "oldValue": null,
      "newValue": "New Account Name"
      },
      "Industry": {
      "oldValue": null,
      "newValue": "Technology"
      },
      // Other fields that were modified
      }
      }
    • update
      
      {
      "eventType": "updated",
      "objectType": "Contact",
      "recordId": "003XXXXXXXXXXXX",
      "fields": {
      "FirstName": {
      "oldValue": "John",
      "newValue": "Jane"
      },
      "LastName": {
      "oldValue": "Doe",
      "newValue": "Smith"
      },
      // Other fields that were modified
      }
      }
  - delete
```json
{
  "eventType": "deleted",
  "objectType": "Opportunity",
  "recordId": "006XXXXXXXXXXXX",
  "fields": {
    // No fields will be present in a delete event
  }
}
niveathika commented 1 month ago

+1 to the above changes. I have added a few more suggestions

  1. Listener inialisation using included param of the ListenerConfig
    listener Listener sfdc:Listener = new (*sfdc:ListenerConfig);

Lets update listener initalisation as above, This ensures user only have to give auth details

listener Listener sfdc:Listener = new (auth = {username = "", password = ""});
  1. Rename Credentials record to CredentialsConfig to be consistent with other libs.
aashikam commented 1 day ago

This was de-prioritized due to support tasks and other feature requests. We have planned this for the upcoming sprint. Pending tasks: