The Google Cloud Pub/Sub Group Kafka Connector library provides Google Cloud Platform (GCP) first-party connectors for Pub/Sub products with Kafka Connect. You can use the library to transmit data from Apache Kafka to Cloud Pub/Sub or Pub/Sub Lite and vice versa.
CloudPubSubSinkConnector
is a sink connector that reads records from Kafka
and publishes them to Cloud Pub/Sub.CloudPubSubSourceConnector
is a source connector that reads messages from
Cloud Pub/Sub and writes them to Kafka.PubSubLiteSinkConnector
is a sink connector that reads records from Kafka
and publishes them to Pub/Sub Lite.PubSubLiteSourceConnector
is a source connector that reads messages from
Pub/Sub Lite and writes them to Kafka.You must have a GCP project in order to use Cloud Pub/Sub or Pub/Sub Lite.
Follow these setup steps for Pub/Sub before doing the quickstart.
Follow these setup steps for Pub/Sub Lite before doing the quickstart.
For general information on how to authenticate with GCP when using the Google Cloud Pub/Sub Group Kafka Connector library, please visit Provide credentials for Application Default Credentials.
In this quickstart, you will learn how to send data from a Kafka topic to a Pub/Sub or Pub/Sub Lite topic and vice versa, using Kafka Connect running locally in standalone mode (single process).
Follow the Kafka quickstart to download Kafka, start the Kafka environment, and create a Kafka topic.
Note: Please use the same Kafka API major version as that used by the connector. Otherwise, the connector may not work properly. Check the Kafka version used by the connector in pom.xml.
Acquire the connector jar.
Update your Kafka Connect configurations.
Open /config/connect-standalone.properties
in the Kafka download folder.
Add the filepath of the downloaded connector jar to plugin.path
and
uncomment the line if needed. In addition, because the connector is using
Kafka Connect in standalone mode, include offset.storage.file.filename
with a valid filename to store offset data in.
Create a pair of Pub/Sub or Pub/Sub Lite topic and subscription.
CloudPubSubSinkConnector
and CloudPubSubSourceConnector
PubSubLiteSinkConnector
and PubSubLiteSourceConnector
Update the connector configurations.
Open the connector configuration files at /config. Update
variables labeled TODO (developer)
with appropriate input.
CloudPubSubSinkConnector
cps-sink-connector.properties
.topics
, cps.project
, and cps.topic
.CloudPubSubSourceConnector
cps-source-connector.properties
.kafka.topic
, cps.project
, and cps.subscription
.PubSubLiteSinkConnector
pubsub-lite-sink-connector.properties
.topics
, pubsublite.project
, pubsublite.location
and pubsublite.topic
.PubSubLiteSourceConnector
pubsub-lite-source-connector.properties
.kafka.topic
, pubsublite.project
, pubsublite.location
and pubsublite.subscription
.Run the following command to start the appropriate sink or source connector. You can run multiple connector tasks at the same time.
> bin/connect-standalone.sh \
config/connect-standalone.properties \
path/to/pubsub/sink/connector.properties [source.connector.properties ...]
Test the connector.
CloudPubSubSinkConnector
CloudPubSubSourceConnector
PubSubLiteSinkConnector
PubSubLiteSourceConnector
The connector is available from Maven Central repository. Select the latest version of the connector, then download "jar" from the Downloads dropdown menu.
You can also build the connector from head.
Please refer to Kafka User Guide for general information on running connectors using Kafka Connect.
To run this connector using Kafka Connect in standalone mode, follow these steps:
Copy the connector jar where you will run Kafka Connect.
Create a configuration file for your Kafka Connect instance. Make sure
to include the filepath to the connector jar in plugin.path
.
Make a copy of the connector configuration files at /config and update the configuration options accordingly.
Start Kafka Connect with your connector with the following command:
> bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]
If running the Kafka Connect behind a proxy, export the KAFKA_OPTS
variable with options for connecting around the proxy.
> export KAFKA_OPTS="-Dhttp.proxyHost=<host> -Dhttp.proxyPort=<port> -Dhttps.proxyHost=<host> -Dhttps.proxyPort=<port>"
When running the connector on a Kafka cluster in distributed mode, "the connector configurations are not passed on the command line. Instead, use the REST API described below to create, modify, and destroy connectors" (Kafka User Guide).
In addition to the Kafka Connect configurations supplied by the Kafka Connect API, the Pub/Sub connector supports the following configurations:
Config | Value Range | Default | Description |
---|---|---|---|
cps.subscription | String | REQUIRED (No default) | The Pub/Sub subscription ID, e.g. "baz" for subscription "/projects/bar/subscriptions/baz". |
cps.project | String | REQUIRED (No default) | The project containing the Pub/Sub subscription, e.g. "bar" from above. |
cps.endpoint | String | "pubsub.googleapis.com:443" | The Pub/Sub endpoint to use. |
kafka.topic | String | REQUIRED (No default) | The Kafka topic which will receive messages from the Pub/Sub subscription. |
cps.maxBatchSize | Integer | 100 | The maximum number of messages per batch in a pull request to Pub/Sub. |
cps.makeOrderingKeyAttribute | Boolean | false | When true, copy the ordering key to the set of attributes set in the Kafka message. |
kafka.key.attribute | String | null | The Pub/Sub message attribute to use as a key for messages published to Kafka. If set to "orderingKey", use the message's ordering key. |
kafka.partition.count | Integer | 1 | The number of Kafka partitions for the Kafka topic in which messages will be published to. NOTE: this parameter is ignored if partition scheme is "kafka_partitioner". |
kafka.partition.scheme | round_robin, hash_key, hash_value, kafka_partitioner, ordering_key | round_robin | The scheme for assigning a message to a partition in Kafka. The scheme "round_robin" assigns partitions in a round robin fashion, while the schemes "hash_key" and "hash_value" find the partition by hashing the message key and message value respectively. "kafka_partitioner" scheme delegates partitioning logic to Kafka producer, which by default detects number of partitions automatically and performs either murmur hash based partition mapping or round robin depending on whether message key is provided or not. "ordering_key" uses the hash code of a message's ordering key. If no ordering key is present, uses "round_robin". |
gcp.credentials.file.path | String | Optional | The filepath, which stores GCP credentials. If not defined, GOOGLE_APPLICATION_CREDENTIALS env is used.If specified, use the explicitly handed credentials. Consider using the externalized secrets feature in Kafka Connect for passing the value. |
gcp.credentials.json | String | Optional | GCP credentials JSON blob. If specified, use the explicitly handed credentials. Consider using the externalized secrets feature in Kafka Connect for passing the value. |
kafka.record.headers | Boolean | false | Use Kafka record headers to store Pub/Sub message attributes. |
cps.streamingPull.enabled | Boolean | false | Whether to use streaming pull for the connector to connect to Pub/Sub. If provided, cps.maxBatchSize is ignored. |
cps.streamingPull.flowControlMessages | Long | 1,000 | The maximum number of outstanding messages per task when using streaming pull. |
cps.streamingPull.flowControlBytes | Long | 100L 1024 1024 (100 MiB) | The maximum number of outstanding message bytes per task when using streaming pull. |
cps.streamingPull.parallelStreams | Integer | 1 | The maximum number of outstanding message bytes per task when using streaming pull. |
cps.streamingPull.maxAckExtensionMs | Long | 0 | The maximum number of milliseconds the subscribe deadline will be extended to in milliseconds when using streaming pull. A value of 0 implies the java-pubsub library default value. |
cps.streamingPull.maxMsPerAckExtension | Long | 0 | The maximum number of milliseconds to extend the subscribe deadline for at a time when using streaming pull. A value of 0 implies the java-pubsub library default value. |
Config | Value Range | Default | Description |
---|---|---|---|
cps.topic | String | REQUIRED (No default) | The Pub/Sub topic ID, e.g. "foo" for topic "/projects/bar/topics/foo". |
cps.project | String | REQUIRED (No default) | The project containing the Pub/Sub topic, e.g. "bar" from above. |
cps.endpoint | String | "pubsub.googleapis.com:443" | The Pub/Sub endpoint to use. |
maxBufferSize | Integer | 100 | The maximum number of messages that can be received for the messages on a topic partition before publishing them to Pub/Sub. |
maxBufferBytes | Long | 10,000,000 | The maximum number of bytes that can be received for the messages on a topic partition before publishing them to Pub/Sub. |
maxOutstandingRequestBytes | Long | Long.MAX_VALUE | The maximum number of total bytes that can be outstanding (including incomplete and pending batches) before the publisher will block further publishing. |
maxOutstandingMessages | Long | Long.MAX_VALUE | The maximum number of messages that can be outstanding (including incomplete and pending batches) before the publisher will block further publishing. |
maxDelayThresholdMs | Integer | 100 | The maximum amount of time to wait to reach maxBufferSize or maxBufferBytes before publishing outstanding messages to Pub/Sub. |
maxRequestTimeoutMs | Integer | 10,000 | The timeout for individual publish requests to Pub/Sub. |
maxTotalTimeoutMs | Integer | 60,000 | The total timeout for a call to publish (including retries) to Pub/Sub. |
maxShutdownTimeoutMs | Integer | 60,000 | The maximum amount of time to wait for a publisher to shutdown when stopping task in Kafka Connect. |
gcp.credentials.file.path | String | Optional | The filepath, which stores GCP credentials. If not defined, GOOGLE_APPLICATION_CREDENTIALS env is used. |
gcp.credentials.json | String | Optional | GCP credentials JSON blob. If specified, use the explicitly handed credentials. Consider using the externalized secrets feature in Kafka Connect for passing the value. |
metadata.publish | Boolean | false | When true, include the Kafka topic, partition, offset, and timestamp as message attributes when a message is published to Pub/Sub. |
headers.publish | Boolean | false | When true, include any headers as attributes when a message is published to Pub/Sub. |
orderingKeySource | String (none, key, partition) | none | When set to "none", do not set the ordering key. When set to "key", uses a message's key as the ordering key. If set to "partition", converts the partition number to a String and uses that as the ordering key. Note that using "partition" should only be used for low-throughput topics or topics with thousands of partitions. |
messageBodyName | String | "cps_message_body" | When using a struct or map value schema, this field or key name indicates that the corresponding value will go into the Pub/Sub message body. |
enableCompression | Boolean | false | When true, enable publish-side compression in order to save on networking costs between Kafka Connect and Cloud Pub/Sub. |
compressionBytesThreshold | Long | 240 | When enableCompression is true, the minimum size of publish request (in bytes) to compress. |
In addition to the Kafka Connect configurations supplied by the Kafka Connect API, the Pub/Sub Lite connector supports the following configurations:
Config | Value Range | Default | Description |
---|---|---|---|
pubsublite.subscription | String | REQUIRED (No default) | The Pub/Sub Lite subscription ID, e.g. "baz" for the subscription "/projects/bar/locations/europe-south7-q/subscriptions/baz". |
pubsublite.project | String | REQUIRED (No default) | The project containing the Pub/Sub Lite subscription, e.g. "bar" from above. |
pubsublite.location | String | REQUIRED (No default) | The location of the Pub/Sub Lite subscription, e.g. "europe-south7-q" from above. |
kafka.topic | String | REQUIRED (No default) | The Kafka topic which will receive messages from Pub/Sub Lite. |
pubsublite.partition_flow_control.messages | Long | Long.MAX_VALUE | The maximum number of outstanding messages per Pub/Sub Lite partition. |
pubsublite.partition_flow_control.bytes | Long | 20,000,000 | The maximum number of outstanding bytes per Pub/Sub Lite partition. |
Config | Value Range | Default | Description |
---|---|---|---|
pubsublite.topic | String | REQUIRED (No default) | The Pub/Sub Lite topic ID, e.g. "foo" for topic "/projects/bar/locations/europe-south7-q/topics/foo". |
pubsublite.project | String | REQUIRED (No default) | The project containing the Pub/Sub Lite topic, e.g. "bar" from above. |
pubsublite.location | String | REQUIRED (No default) | The location of the Pub/Sub Lite topic, e.g. "europe-south7-q" from above. |
These following configurations are shared by the Pub/Sub and Pub/Sub Lite connectors.
Config | Value Range | Default | Description |
---|---|---|---|
gcp.credentials.file.path | String | Optional | The filepath, which stores GCP credentials. If not defined, the environment variable GOOGLE_APPLICATION_CREDENTIALS is used. If specified, use the explicitly handed credentials. Consider using the externalized secrets feature in Kafka Connect for passing the value. |
gcp.credentials.json | String | Optional | GCP credentials JSON blob. If specified, use the explicitly handed credentials. Consider using the externalized secrets feature in Kafka Connect for passing the value. |
The message data field of PubSubMessage
is a ByteString
object that translates well to and from the byte[]
bodies of Kafka messages.
We recommend using a converter that produces primitive data types (i.e. integer,
float, string, or bytes types) where possible to avoid deserializing and
re-serializing the same message body.
Additionally, a Pub/Sub message cannot exceed 10 MB. We recommend checking your
message.max.bytes
configuration to prevent possible errors.
The sink connector handles message conversion in the following way:
toString()
method is called on
objects passed in as the key or value for a map and the value for a struct.
messageBodyName
configuration to a struct field or map
key, the value of the structure field or the map value (of integer, byte,
float, and array type) will be stored in the Pub/Sub message body as bytes.Note: Pub/Sub message attributes have the following limitations:
- Attributes per message: 100
- Attribute key size: 256 bytes
- Attribute value size: 1024 bytes
The connector will transform Kafka record-level message headers that meet these limitations and ignore those that don't.
The source connector handles the conversion from a Pub/Sub message into a Kafka
SourceRecord
in a similar way:
kafka.key.attribute
in the attributes of
a Pub/Sub message. If found, it will be used as the Kafka message key
as a
string. Otherwise, the Kafka message key
will be set to null.value
.kafka.key.attribute
,
they will be assigned a struct schema. The message attribute keys will become
struct field names, and the corresponding attribute values will become values
of those struct fields. The message body will be transformed into a struct
field of name message
and of type bytes.
Pub/Sub Lite messages have the following structure:
class Message {
ByteString key;
ByteString data;
ListMultimap<String, ByteString> attributes;
Optional<Timestamp> eventTime;
}
This table shows how each field in a Kafka SinkRecord
maps to a Pub/Sub
Lite message by the sink connector:
SinkRecord | Message |
---|---|
key{Schema} | key |
value{Schema} | data |
headers | attributes |
topic | attributes["x-goog-pubsublite-source-kafka-topic"] |
kafkaPartition | attributes["x-goog-pubsublite-source-kafka-partition"] |
kafkaOffset | attributes["x-goog-pubsublite-source-kafka-offset"] |
timestamp | eventTime |
timestampType | attributes["x-goog-pubsublite-source-kafka-event-time-type"] |
When a key, value or header value with a schema is encoded as a ByteString, the following logic will be used:
Schema.STRING_SCHEMA
copyFromUtf8
copyFromUtf8(Long.toString( x.longValue()))
copyFromUtf8( Double.toString(x.doubleValue()))
Long.toString(x.longValue())
Double.toString( x.doubleValue())
The source connector performs a one-to-one mapping from
SequencedMessage
fields to their Kafka SourceRecord
counterparts.
Pub/Sub Lite message of empty message.key
fields will have their field values
be converted to null
, and they will be assigned to Kafka partitions using the
round-robin scheme. Messages with identical, non-empty keys will be routed to
the same Kafka partition.
SequencedMessage | SourceRecord field | SourceRecord schema |
---|---|---|
message.key | key | BYTES |
message.data | value | BYTES |
message.attributes | headers | BYTES |
<source topic> |
sourcePartition["topic"] | String field in map |
<source partition> |
sourcePartition["partition"] | Integer field in map |
cursor.offset | sourceOffset["offset"] | Long field in map |
message.event_time | timestamp | long milliseconds since unix epoch if present |
publish_time | timestamp | long milliseconds since unix epoch if no event_time exists |
These instructions assume you are using Maven.
Clone this repository:
> git clone https://github.com/googleapis/java-pubsub-group-kafka-connector
Package the connector jar:
> mvn clean package -DskipTests=True
You should see the resulting jar at target/pubsub-group-kafka-connector-${VERSION}-SNAPSHOT.jar
on success.
This library follows Semantic Versioning.
Contributions to this library are always welcome and highly encouraged.
See CONTRIBUTING for more information how to get started.
Please note that this project is released with a Contributor Code of Conduct. By participating in this project you agree to abide by its terms. See Code of Conduct for more information.
Apache 2.0 - See LICENSE for more information.