Aiven-Open / opensearch-connector-for-apache-kafka

Aiven's OpenSearch® Connector for Apache Kafka®
Apache License 2.0
66 stars 36 forks source link

Not working with AWS MSK Connect #195

Open rsharma1980 opened 1 year ago

rsharma1980 commented 1 year ago

I am trying to create the connector in AWS MSK. I took the latest zip file and created a custom plugin. It gives me error "There is an issue with the connector, Code: UnknownError.Unknown Message: The last operation failed. Retry the operation" . I passed there variables connection connector.class=io.aiven.kafka.connect.opensearch.OpensearchSinkConnector connection.url=<> port used was 9200 tasks.max=1. I am not sure what is causing it. Is expected to work with AWS?

gharris1727 commented 1 year ago

@rsharma1980 That error is very generic and you will need to investigate further. A variety of things could be wrong: networking, permissions, outages, etc. I am not aware of any incompatibilities that would prevent the connector from working and that the connector needs to fix, but if you do find any, please submit a more detailed bug report.

HaroonSaid commented 1 year ago

@rsharma1980 - We use Terraform to create connectors and don't have any issues connecting MSK with Open Search Sample TF fragment Code (if anyone is interested)


resource "aws_mskconnect_connector" "connectors" {
  for_each = {
    for entry in var.kafkaelasticsearch_connector : "${entry.data_source_tenant}.${entry.data_source_environment}.${replace(entry.index_name, "_", "-")}" => entry if entry.enabled == true
  }

  name                 = "sink-enriched-${replace(each.value.index_name, "_", "-")}-${each.value.data_source_tenant}-${each.value.data_source_environment}"
  kafkaconnect_version = "2.7.1"
  capacity {
    autoscaling {
      mcu_count        = 1
      min_worker_count = 1
      max_worker_count = 2

      scale_in_policy {
        cpu_utilization_percentage = 20
      }

      scale_out_policy {
        cpu_utilization_percentage = 80
      }
    }
  }

  connector_configuration = {
    "connector.class"                                 = "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector"
    "tasks.max"                                       = "2"
    "topics"                                          = each.value.topic_name
    "behavior.on.malformed.documents"                 = "FAIL"
    "behavior.on.null.values"                         = each.value.on_null_values
    "behavior.on.version.conflict"                    = "ignore"
    "connection.url"                                  = each.value.elastic_search_url
    "errors.deadletterqueue.context.headers.enable"   = "true"
    "errors.deadletterqueue.topic.name"               = each.value.topic_dlq
    "errors.deadletterqueue.topic.replication.factor" = "1"
    "errors.tolerance"                                = "all"
    "key.converter"                                   = "org.apache.kafka.connect.storage.StringConverter"
    "key.ignore"                                      = "false"
    "name"                                            = "sink-enriched-${replace(each.value.index_name, "_", "-")}-${each.value.data_source_tenant}-${each.value.data_source_environment}"
    "schema.ignore"                                   = "true"
    "transforms"                                      = "addSuffix"
    "transforms.addSuffix.regex"                      = each.value.topic_name
    "transforms.addSuffix.replacement"                = each.value.index_name
    "transforms.addSuffix.type"                       = "org.apache.kafka.connect.transforms.RegexRouter",
    "type.name"                                       = "_doc",
    "value.converter"                                 = "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable"                  = "false"
    "read.timeout.ms"                                 = "60000"
    "flush.timeout.ms"                                = "60000"
    "write.method"                                    = "UPSERT"
  }

  kafka_cluster {
    apache_kafka_cluster {
      bootstrap_servers = var.msk_bootstrap_brokers

      vpc {
        security_groups = [var.msk_sg.id]
        subnets         = slice(tolist(data.aws_subnets.data.ids), 0, 3)
      }
    }
  }

  kafka_cluster_client_authentication {
    authentication_type = "NONE"
  }

  kafka_cluster_encryption_in_transit {
    encryption_type = "PLAINTEXT"
  }

  plugin {
    custom_plugin {
      arn      = data.aws_mskconnect_custom_plugin.opensearch.arn
      revision = data.aws_mskconnect_custom_plugin.opensearch.latest_revision
    }
  }
  log_delivery {
    worker_log_delivery {
      cloudwatch_logs {
        enabled   = true
        log_group = aws_cloudwatch_log_group.connector.name
      }
    }
  }
  service_execution_role_arn = aws_iam_role.kafka_connector.arn
}
resource "aws_cloudwatch_log_group" "connector" {
  name              = "/${replace(var.tenant, "_", "-")}/${var.environment}/kafka/connectors"
  retention_in_days = var.log_retention_in_days
}
rsharma1980 commented 1 year ago

Thank You @HaroonSaid . This is helpful

leopoloc0 commented 1 year ago

@HaroonSaid how did you created your custom plugin? Im facing the following error: org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches io.aiven.kafka.connect.opensearch.OpensearchSinkConnector but in the logs i also see the instalation of the custom plugin:

[Worker-0fac0a20b870f596d] [2023-10-19 22:50:08,619] INFO Loading plugin from: KAFKA_DIR/plugins/OpenSearchSinkPlugIn.zip_inflated (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:246)
[Worker-0fac0a20b870f596d] [2023-10-19 22:50:08,844] INFO Registered loader: PluginClassLoader{pluginLocation=file:KAFKA_DIR/plugins/OpenSearchSinkPlugIn.zip_inflated/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:269)
HaroonSaid commented 1 year ago

This sounds like you haven't uploaded the plug-in into S3 and created the custom plugin via AWS Console

leopoloc0 commented 1 year ago

@HaroonSaid I actually did, but i just downloaded the latest zip and uploaded to s3... probably there's something wrong with it. image

HaroonSaid commented 1 year ago

Unzip locally and compare with older file zip

HaroonSaid commented 1 year ago

Try older, not much changes

HaroonSaid commented 1 year ago

@leopoloc0 keep a close eye on your cost of running the MSK connector It's not cheap when you start running more than a couple when compared to running it on a container

leopoloc0 commented 1 year ago

@HaroonSaid thanks for the tip man! Also, you were right, the .zip was the issue. It is now working as an MSK connector.

HaroonSaid commented 1 year ago

@leopoloc0 - we liked the TF integration, but found the cost at scale became prohibitive

We run around several hundred connectors per production environment regions