confluentinc / terraform-provider-confluent

Terraform Provider for Confluent
Apache License 2.0
27 stars 63 forks source link

Incompatible schemas, and Client.Timeout while contacting schema registry corrupt tfstate file during apply #337

Open andwarnbros opened 10 months ago

andwarnbros commented 10 months ago

Hello,

We noticed 2 confluent provider issues that cause our state file schema to be updated, but not the cloud schema registry. This result in an out of sync tf state file, that can on longer successfully apply changes to the confluent.

Case 1. Any incompatible schema change will result in a failed confluent cloud schema update, but a successful tf state file update. Case 2. There was some kind of network issues that caused a (Client.Timeout exceeded while awaiting headers) error while trying to contact our cloud schema registry. This resulted in a failed schema update, but a successful tf state file update.

We found that manually rolling back the schema to the last version (working version) fixes the problem in both cases. But this is time consuming, and prolongs our downtime.

From the two cases above it seem like the tf state file is being updated before the successful application of our schema change. While the application fails our apply stops but the tf state file is not rolled back.

Expected Behavior: The tf state file should be updated only after the a successful update in confluent cloud.

confluent = { source = "confluentinc/confluent" version = "1.54.0" }

linouk23 commented 10 months ago

@andwarnbros thanks for reporting the issue!

Could you also provide the TF config for a minimal reproducible example for both scenarios?

andwarnbros commented 9 months ago

Hello we are using kind of a custom config. Any schema change that results in an incompatible schema from the below api will cause the issue: https://docs.confluent.io/platform/current/schema-registry/develop/api.html#sr-api-compatibility Dont think its possible to reproduce the network problem that caused the second case.

Schema Example: { "namespace": "com.rbauction.demo.events.models", "type": "record", "name": "DemoEvent", "doc": "Test event schema created by terraform. No References", "props": { "register" : true }, "fields": [ { "name": "sourceSystem", "type": [ "null", "string" ], "default": null, "doc": "Source system field" }, { "name": "id", "type": "string", "doc": "some id field", "props": { "defaultKey": true } }, { "name": "someSubSchema", "type": [ "null", "com.rbauction.demo.events.models.sub.DemoEventSub" ], "default": null, "doc": "sub schema reference" }, { "name": "description", "type": [ "null", "string" ], "default": null, "doc": "Description for the demo" } ] }

Configs: module "schemas" { source = "./modules/schema_module" identifier = var.identifier include_only_environment_resources = var.include_only_environment_resources topic_schemas = module.topics.topic_schemas }

%{ for schema_name, schema_data in schemas ~} resource "schemaregistry_schema" "${schema_data.identifier}" { subject = "${schema_name}" schema = ${jsonencode(schema_data.schema)} %{ for reference in schema_data.references } %{ if reference != jsondecode(schema_data.schema).name ~} reference { name = "%{ if length(split(".", reference)) == 1 }${jsondecode(schema_data.schema).namespace}.${reference}%{ else }${reference}%{ endif }" subject = "%{ if length(split(".", reference)) == 1 }${jsondecode(schema_data.schema).namespace}.${reference}%{ else }${reference}%{ endif }" version = resource.schemaregistry_schema.${replace("%{ if length(split(".", reference)) == 1 }${jsondecode(schema_data.schema).namespace}.${reference}%{ else }${reference}%{ endif }", ".", "")}.version } %{ endif ~} %{ endfor } }

%{~ endfor } locals { schema_versions = { %{ for schema_name, schema_data in schemas ~} ${schema_data.identifier} : resource.schemaregistry_schema.${schema_data.identifier}.version

%{~ endfor } } }

resource "schemaregistry_schema" "topic_key_schemas" { for_each = var.topic_schemas

subject = "${each.key}-key" schema = "\"string\"" }

locals { schema_files = fileset("../enterprise-library/build/expanded_schemas", "*/.avsc") schemas_raw = flatten( [for schema_file in local.schema_files : jsondecode(file("../enterprise-library/build/expanded_schemas/${schema_file}")) ])

schemas_to_register = { for item in local.schemas_raw : "${item.namespace}.${item.name}" => { identifier : format("%s%s", replace(item.namespace, ".", ""), item.name) schema : jsonencode(item) references : [] register : try(item.props.register, false) } }

schemas = { for key,item in local.schemas_to_register : key => item if item.register == true }

varfile = get_env("TG_VAR_FILE", "") vardata = local.varfile != "" ? jsondecode(file(local.varfile)) : null }

generate "record-schemas" { path = "modules/schema_module/confluent-record-schemas.tf" if_exists = "overwrite_terragrunt" contents = templatefile("modules/schema_module/confluent-record-schemas.tf.tmpl", { schemas = local.schemas }) }