redpanda-data / connect

Fancy stream processing made operationally mundane
https://docs.redpanda.com/redpanda-connect/about/
8.01k stars 792 forks source link

Salesforce Object Query Language (SOQL) support #1588

Open loicalleyne opened 1 year ago

loicalleyne commented 1 year ago

I'd like to suggest adding support for Salesforce Object Query Language (SOQL) as an input/output. I'm willing to take a stab at implementing this if I get the spare time to do it.

mihaitodor commented 1 year ago

Hey @loicalleyne, that sounds useful! I took a stab at using https://github.com/simpleforce/simpleforce for an internal demo, but I don't have much experience with Salesforce, so I didn't spend much time refining it. I'm also not sure what kind of SOQL queries people would like to run, so I hope you have some more familiarity with this to test it. Here's the demo code I put together:

package input

import (
    "context"
    "encoding/json"
    "errors"
    "fmt"
    "net/url"

    "github.com/benthosdev/benthos/v4/public/service"
    "github.com/simpleforce/simpleforce"
)

var salesforceInputConfigSpec = service.NewConfigSpec().
    Summary("Creates an input that reads Salesforce data.").
    Field(service.NewStringField("url")).
    Field(service.NewStringField("user")).
    Field(service.NewStringField("password")).
    Field(service.NewStringField("token")).
    Field(service.NewStringField("query"))

func newSalesforceInput(conf *service.ParsedConfig) (service.Input, error) {
    input := salesforceInput{}
    var err error
    if input.url, err = conf.FieldString("url"); err != nil {
        return nil, err
    }

    if _, err = url.ParseRequestURI(input.url); err != nil {
        return nil, fmt.Errorf("invalid URL %q: %s", input.url, err)
    }

    if input.user, err = conf.FieldString("user"); err != nil {
        return nil, err
    }

    if input.password, err = conf.FieldString("password"); err != nil {
        return nil, err
    }

    if input.token, err = conf.FieldString("token"); err != nil {
        return nil, err
    }

    if input.query, err = conf.FieldString("query"); err != nil {
        return nil, err
    }

    return service.AutoRetryNacks(&input), nil
}

func init() {
    err := service.RegisterInput(
        "salesforce_select", salesforceInputConfigSpec,
        func(conf *service.ParsedConfig, mgr *service.Resources) (service.Input, error) {
            return newSalesforceInput(conf)
        })
    if err != nil {
        panic(err)
    }
}

//------------------------------------------------------------------------------

type salesforceInput struct {
    url      string
    user     string
    password string
    token    string
    query    string

    client        *simpleforce.Client
    iterator      *simpleforce.QueryResult
    currentRecord int
}

func (si *salesforceInput) Connect(ctx context.Context) error {
    si.client = simpleforce.NewClient(si.url, simpleforce.DefaultClientID, simpleforce.DefaultAPIVersion)
    if si.client == nil {
        return errors.New("failed to create client")
    }

    err := si.client.LoginPassword(si.user, si.password, si.token)
    if err != nil {
        return fmt.Errorf("failed to login: %s", err)
    }

    si.iterator, err = si.client.Query(si.query) // Note: for Tooling API, use client.Tooling().Query(q)
    if err != nil {
        return fmt.Errorf("failed to run SOQL query: %s", err)
    }

    si.currentRecord = 0

    return nil
}

func (si *salesforceInput) Read(ctx context.Context) (*service.Message, service.AckFunc, error) {
    if si.currentRecord >= len(si.iterator.Records) {
        return nil, nil, service.ErrEndOfInput
    }

    record := si.iterator.Records[si.currentRecord]

    b, err := json.Marshal(record)
    if err != nil {
        return nil, nil, fmt.Errorf("failed to marshal record: %s", err)

    }

    si.currentRecord++

    return service.NewMessage(b), func(ctx context.Context, err error) error {
        // Nacks are retried automatically when we use service.AutoRetryNacks
        return nil
    }, nil
}

func (si *salesforceInput) Close(ctx context.Context) error {
    return nil
}

Note that the above code is quite crude and one might want to add an args_mapping field similar to the sql_* components, such as https://www.benthos.dev/docs/components/inputs/sql_select#args_mapping. I guess it's also missing pagination...

Also, this simpleforce library doesn't do certain things properly, like, for example, it emits logs using the log package from the standard library, which I think can't be supressed.

Finally, it would be nice to have a few integration tests for it. This Salesforce API is HTTP-based (not sure if they have any alternative ones), so maybe the https://pkg.go.dev/net/http/httptest package can help.

loicalleyne commented 1 year ago

How should a package be emitting logs in a way that makes it ok to use in Benthos?

Someone submitted an issue last year to add an option to disable logging in that package: [https://github.com/simpleforce/simpleforce/issues/35]

mihaitodor commented 1 year ago

How should a package be emitting logs in a way that makes it ok to use in Benthos?

Have a look at any of the existing implementations under internal/impl. For example here. You can see how that logger object is hooked up.

Someone submitted an issue last year to add an option to disable logging in that package: [https://github.com/https://github.com/simpleforce/simpleforce/issues/35]

Yeah, the package doesn't seem to be great or well-maintained. It also doesn't contain that much code, so I wonder if it's really useful to import it. It might be better to leverage the Benthos http_client directly and write templates, similar to how the Discord input is implemented.

loicalleyne commented 1 year ago

☹️ Looked at the simpleforce code again, Salesforce has yet another custom auth flow which doesn't seem to fit with the current http_client auth, templates alone might not make this work out of the box.

This is turning into another one of those 'feature request to enable another feature request'...

What about a custom auth option for http_client that lets you define a stream within a stream to go retrieve tokens in a custom auth flow and put them in auth variables accessible to the http_client? Maybe two config options, auth flow and refresh flow. That could remove a lot of the complexity of working with non-standard or rarely used auth flows.

mihaitodor commented 1 year ago

What about a custom auth option for http_client that lets you define a stream within a stream to go retrieve tokens in a custom auth flow and put them in auth variables accessible to the http_client? Maybe two config options, auth flow and refresh flow.

Could that be done perhaps using a separate http_client and using an in-memory cache to store the auth variables? Then before calling the main http_client, one would first query this cache for the auth variables in a branch processor, place them in the metadata (or, if they don't exist, trigger this auxiliary http_client and then cache then place them in the metadata and cache them) and then call the main http_client? If not, I guess some advanced fields can be added, but it would be best to check with @Jeffail as well before putting in the effort.

loicalleyne commented 1 year ago

Is what you're thinking of a sequence input of http_client with one input getting put in variable then discarded? How would that work with tokens that expire while the stream is still running?

mihaitodor commented 1 year ago

No, it doesn't require a sequence input and you can leverage the memory cache default_ttl to expire tokens. Here's how I'd do it:

input:
  generate:
    interval: 10s # Trigger the pipeline every 10s
    mapping: root = ""
    count: 0 # Never stop

  processors:
    - cache: # Try to fetch the auth token from the cache
        resource: token_cache
        operator: get
        key: token
    - catch:
        - http: {} # Fetch the auth token from the API
        - catch:
            - log:
                level: WARN
                message: "Failed to get token: ${! error() }"
            - bloblang: root = deleted() # Prevent the rest of the pipeline from running
        - mapping: |
            meta token = ... # Do some processing to extract the actual token and place it in the metadata
        - branch:
            request_map: |
              root = meta("token")
            processors:
              - cache: # Cache the current auth token
                  resource: token_cache
                  operator: set
                  key: token
                  value: "${! content() }"
    - http: {} # Fetch some data using meta("token")

output:
  stdout: {}

cache_resources:
  - label: token_cache
    memory:
      default_ttl: 1h # Expire auth tokens after 1h

Note that the above is just a sketch config and it might have bugs...