tabular-io / iceberg-kafka-connect

Apache License 2.0
169 stars 31 forks source link

Connector is not producing anything on AWS ECS #245

Open farbodahm opened 2 months ago

farbodahm commented 2 months ago

Hey everyone; For a POC, we are trying to run the connector on an AWS ECS.

Connector is deployed and has a RUNNING state and is not printing any errors or exceptions; But it's not producing anything on Glue/S3 and it has a consumer lag of 3000 messages. Do you have any suggestions on how can we debug this? Thanks.

Deployed task configuration:

{
        "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
        "tasks.max": "1",
        "topics": "TOPIC_NAME",
        "iceberg.tables": "hvr_temp_investigation_database.oeheader",
        "iceberg.catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
        "iceberg.catalog.warehouse": "s3://S3_BUCKET",
        "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
        "iceberg.catalog.client.region": "us-east-1",
        "iceberg.tables.default-id-columns": "PK1,PK2,PK3",
        "iceberg.tables.upsert-mode-enabled": "true",
        "iceberg.tables.auto-create-enabled": "true",
        "iceberg.tables.evolve-schema-enabled": "true",
        "iceberg.tables.schema-case-insensitive": "true"
}

Status of the connector:

Screenshot 2024-04-26 at 12 20 16

Logs: log-events-viewer-result(2).csv

Docker Image we have created to run the connector on ECS:

FROM confluentinc/cp-server-connect-base:7.6.1

ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"
# TODO is it sufficient?
ENV CONNECT_GROUP_ID="Commerce-Salesforce-kc-iceberg-connect"

ENV CONNECT_CONFIG_STORAGE_TOPIC="_kc-iceberg-connect-configs"
ENV CONNECT_OFFSET_STORAGE_TOPIC="_kc-iceberg-connect-offsets"
ENV CONNECT_STATUS_STORAGE_TOPIC="_kc-iceberg-connect-status"
ENV CONNECT_REPLICATION_FACTOR=3
ENV CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=3
ENV CONNECT_OFFSET_STORAGE_REPLICATION_FACTO=3
ENV CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=3
ENV CONNECT_KEY_CONVERTER="org.apache.kafka.connect.storage.StringConverter"

# Schema Registry
ENV CONNECT_VALUE_CONVERTER="io.confluent.connect.avro.AvroConverter"
ENV CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE="true"

ENV CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE="USER_INFO"
ENV CONNECT_REST_ADVERTISED_HOST_NAME="connect-1"

# Worker
ENV CONNECT_LOG4J_ROOT_LOGLEVEL="INFO"
ENV CONNECT_LOG4J_LOGGERS="org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
ENV CONNECT_REQUEST_TIMEOUT_MS="20000"
ENV CONNECT_RETRY_BACKOFF_MS="500"

# Connect
ENV CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM="https"
ENV CONNECT_SECURITY_PROTOCOL="SASL_SSL"
ENV CONNECT_SASL_MECHANISM="PLAIN"

# Producer
ENV CONNECT_PRODUCER_SECURITY_PROTOCOL="SASL_SSL"
ENV CONNECT_PRODUCER_SASL_MECHANISM="PLAIN"
ENV CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM="https"
ENV CONNECT_PRODUCER_REQUEST_TIMEOUT_MS="20000"
ENV CONNECT_PRODUCER_RETRY_BACKOFF_MS="500"

# Consumer
ENV CONNECT_CONSUMER_SECURITY_PROTOCOL="SASL_SSL"
ENV CONNECT_CONSUMER_SASL_MECHANISM="PLAIN"
ENV CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM="https"
ENV CONNECT_CONSUMER_REQUEST_TIMEOUT_MS="20000"
ENV CONNECT_CONSUMER_RETRY_BACKOFF_MS="500"

# Needed to install unzip and the plugin
USER root
RUN yum install -y unzip 
RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
RUN unzip awscliv2.zip
RUN ./aws/install --bin-dir /usr/local/bin --install-dir /usr/local/aws-cli --update

ENV SINK_PLUGIN=iceberg-kafka-connect-runtime-0.6.15.zip
ADD https://github.com/tabular-io/iceberg-kafka-connect/releases/download/v0.6.15/$SINK_PLUGIN /tmp/
RUN unzip /tmp/$SINK_PLUGIN -d /usr/share/java/
RUN rm /tmp/$SINK_PLUGIN

# To set REST advertised host name
COPY set_env.sh /etc/set_env.sh
COPY binaries/cli/jq /usr/bin/

USER root
RUN chmod +x /etc/set_env.sh
USER appuser

CMD ["/etc/confluent/docker/run"]

And we are setting these variables on runtime: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL, CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO, CONNECT_SASL_JAAS_CONFIG, CONNECT_CONSUMER_SASL_JAAS_CONFIG, CONNECT_PRODUCER_SASL_JAAS_CONFIG, CONNECT_BOOTSTRAP_SERVERS.

tabmatfournier commented 2 months ago

To confirm:

farbodahm commented 2 months ago

@tabmatfournier Thanks for the answer:

farbodahm commented 2 months ago

@tabmatfournier We found the issue; a firewall rule in the VPC which the ECS cluster was deployed to was blocking the route to Confluent Schema Registry.

However it was really difficult for us to find the error as there was nothing getting sinked nor producing any errors or warnings regarding the connection issue.

If you agree, I can work on a PR to first check the connection to both Kafka Broker and Schema Registry to avoid confusion and making debugging hard.

Please let me know what do you think, Thanks.

tabmatfournier commented 2 months ago

I'm surprised there were no ERROR level logs indicating this already.

farbodahm commented 2 months ago

It should be related to the underlying Kafka library which is used for connecting to Kafka. It accepts different TIMEOUT_MS configs (like CONNECT_REQUEST_TIMEOUT_MS and CONNECT_RETRY_BACKOFF_MS) and it remains silent during the retrying phase.

I think that would make sense to easily first check if our execution environment has access to the broker and schema registry before directly initiating connection phase.

What do you think? If it makes sense for you I can add it.

shubhamsawantsjsu commented 1 month ago

hey @farbodahm can you please share your set_env.sh file?

farbodahm commented 1 month ago

Hey @shubhamsawantsjsu , It's just for running the image on ECS:

set -Eeuo pipefail

JSON=$(curl ${ECS_CONTAINER_METADATA_URI}/task)
echo $JSON
TASK=$(echo $JSON | jq -r '.Containers[0].Networks[0].IPv4Addresses[0]')
echo $TASK

CONNECT_REST_ADVERTISED_HOST_NAME=$TASK /etc/confluent/docker/run