Read HTTP Responses given input HTTP request configuration options and produce them to Fluvio topics.
This connector can be configured to operate in three modes.
interval
config option. Each response will be produced as an individual Fluvio record.stream
config option is provided, the HTTP response will be processed as a data stream. A record will be produced to Fluvio every time a delimiter
segment is encountered, which is set to \n
by default.endpoint
config option is prefixed with ws://
, a WebSocket connection will be established, and each incoming message will be produced.Supports HTTP/1.0, HTTP/1.1, HTTP/2.0 protocols.
See docs here. Tutorial for HTTP to SQL Pipeline.
Option | default | type | description |
---|---|---|---|
interval | 10s | String | Interval between each HTTP Request. This is in the form of "1s", "10ms", "1m", "1ns", etc. |
method | GET | String | GET, POST, PUT, HEAD |
endpoint | - | String | HTTP URL endpoint. Use ws:// for websocket URLs. |
headers | - | Array\<String> | Request header(s) "Key:Value" pairs |
body | - | String | Request body e.g. in POST |
user-agent | "fluvio/http-source 0.1.0" | String | Request user-agent |
output_type | text | String | text = UTF-8 String Output, json = UTF-8 JSON Serialized String |
output_parts | body | String | body = body only, full = all status, header and body parts |
stream | false | bool | Flag to indicate HTTP streaming mode |
delimiter | '\n' | String | Delimiter to separate records when producing from an HTTP streaming endpoint |
websocket_config | {} | Object | WebSocket configuration object. See below. |
Matrix | Output |
---|---|
output_type = text (default), output_parts = body (default) | Only the body of the HTTP Response |
output_type = text (default), output_parts = full | The full HTTP Response |
output_type = json, output_parts = body (default) | Only the "body" in JSON struct |
output_type = json, output_parts = full | HTTP "status", "body" and "header" JSON |
Option | default | type | description |
---|---|---|---|
subscription_messages | [] | Array\<String> | List of messages to send to the server after connection is established. |
ping_interval_ms | 10000 | int | Interval in milliseconds to send ping messages to the server. |
subscription_message | - | String | (deprecated) Message to send to the server after connection is established. If provided with subscription_messages, subscription_message will be sent first. |
This is an example of simple connector config file for polling an endpoint:
# config-example.yaml
apiVersion: 0.1.0
meta:
version: 0.4.3
name: cat-facts
type: http-source
topic: cat-facts
create-topic: true
secrets:
- name: AUTHORIZATION_TOKEN
http:
endpoint: "https://catfact.ninja/fact"
interval: 10s
headers:
- "Authorization: token ${{ secrets.AUTHORIZATION_TOKEN }}"
- "Cache-Control: no-cache"
The produced record in Fluvio topic will be:
{
"fact": "The biggest wildcat today is the Siberian Tiger. It can be more than 12 feet (3.6 m) long (about the size of a small car) and weigh up to 700 pounds (317 kg).",
"length": 158
}
Fluvio HTTP Source Connector supports Secrets in the endpoint
and in the headers
parameters:
# config-example.yaml
apiVersion: 0.1.0
meta:
version: 0.4.3
name: cat-facts
type: http-source
topic: cat-facts
create-topic: true
secrets:
- name: MY_SECRET_URL
- name: MY_AUTHORIZATION_HEADER
http:
endpoint:
secret:
name: MY_SECRET_URL
headers:
- "Authorization: ${{ secrets.MY_AUTHORIZATION_HEADER }}
interval: 10s
Fluvio HTTP Source Connector supports Transformations. Records can be modified before sending to Fluvio topic.
The previous example can be extended to add extra transformations to outgoing records:
# config-example.yaml
apiVersion: 0.1.0
meta:
version: 0.4.3
name: cat-facts
type: http-source
topic: cat-facts
create-topic: true
http:
endpoint: "https://catfact.ninja/fact"
interval: 10s
transforms:
- uses: infinyon/jolt@0.1.0
with:
spec:
- operation: default
spec:
source: "http-connector"
- operation: remove
spec:
length: ""
In this case, additional transformation will be performed before records are sent to Fluvio topic: field length
will be removed and
field source
with string value http-connector
will be added.
Now produced records will have a different shape, for example:
{
"fact": "A cat has more bones than a human; humans have 206, and the cat - 230.",
"source": "http-connector"
}
Read more about JSON to JSON transformations.
Provide the stream
configuration option to enable streaming mode with delimiter
to determine how the incoming records are separated.
# config-example.yaml
apiVersion: 0.1.0
meta:
version: 0.4.3
name: wiki-updates
type: http-source
topic: wiki-updates
http:
endpoint: "https://stream.wikimedia.org/v2/stream/recentchange"
method: GET
stream: true
delimiter: "\n\n"
Connect to a websocket endpoint using a ws://
URL. When reading text messages, they are emitted as equivalent records. Binary messages are initially attempted to be converted into strings.
# config-example.yaml
apiVersion: 0.1.0
meta:
version: 0.4.3
name: websocket-connector
type: http-source
topic: websocket-updates
http:
endpoint: ws://websocket.example/websocket