streamthoughts / kafka-connect-file-pulse

🔗 A multipurpose Kafka Connect connector that makes it easy to parse, transform and stream any file, in any format, into Apache Kafka
https://streamthoughts.github.io/kafka-connect-file-pulse/
Apache License 2.0
321 stars 62 forks source link

Allow Special Character Handling #557

Open e160842 opened 11 months ago

e160842 commented 11 months ago

Describe the issue Using this programming to transfer data fails for data that includes Special Chars ( $%^& etc.) , especially when those Special Char's are in the Header (field names) of the data :(

Describe the solution you'd like Enable this programming to 'handle' Special Chars

Describe alternatives you've considered we have to create extra step to "cleanse" Special Chars before we use this programming - not efficient.

Additional context Please let me know if any clarifications needed :)

fhussonnois commented 11 months ago

Hi @e160842, could you please provide the connector's configuration you used to process data ? Thanks

e160842 commented 10 months ago

@fhussonnois - thank you for your support - below is sample connector config, with proprietary data obscured - Please let me know if any parts are unclear . .. .

apiVersion: platform.confluent.io/v1beta1 kind: Connector metadata: name: XXXXXXXXXXXXXXXXXXXXX namespace: ${namespace} spec: name: XXXXXXXXXXXXXXXXXXXXXXXXXX class: "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector" taskMax: 1 restartPolicy: type: "OnFailure" maxRetry: 10 connectRest: endpoint: https://connector.${nspace}.xxx.cluster.local:8XXX authentication: type: bearer bearer: secretRef: v2-connectors-apikeys configs: errors.log.enable: "true" errors.log.include.messages: "true" connector.class: "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector" tasks.max: 1 fs.listing.class: "io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3FileSystemListing" aws.credentials.provider.class: "com.amazonaws.auth.InstanceProfileCredentialsProvider" aws.s3.region: "us-east-1" aws.s3.bucket.name: "XXXXXXXXXXXXXXXXXXXXXX" aws.s3.bucket.prefix: "XXXXXXXXXXXXXXXXXX" aws.s3.default.object.storage.class: "STANDARD" fs.listing.filters: "io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter" fs.listing.interval.ms: "300000" tasks.reader.class: "io.streamthoughts.kafka.connect.filepulse.fs.reader.AmazonS3RowFileInputReader" topic: "${env}.stage.XXXXXXXXXXXXXXXX" offset.policy.class: "io.streamthoughts.kafka.connect.filepulse.offset.DefaultSourceOffsetPolicy" filters: "ParseCSVLine,SetKey,ProvideId,AnnotateWithType" filters.ParseCSVLine.type: "io.streamthoughts.kafka.connect.filepulse.filter.CSVFilter" filters.ParseCSVLine.trim.column: "true" filters.ParseCSVLine.separator: "|" filters.ParseCSVLine.columns:

"AMS: # Alternative - Hybrid Tru|AMS: % Alternative - Hybrid Tru|AMS: $ Electric/Hybrid Cars|AMS: % Electric/Hybrid Cars|AMS: # Alternative - Natural_Gas|AMS: Alternative Power>Natural Ga|Property/Realty: Home Stories|Property/Realty: Home Bath|Property/Realty: Home Bedrooms|Property/Realty: Home Total Rooms|Property/Realty: Home Exterior Wall T|BehaviorBank: Hi-tech owner|BehaviorBank: Internet/online subscri|SRVY:HH Acty/Int:Socl Caus/Con:Enviro"

filters.SetKey.type: "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter"
filters.SetKey.field: "$key"
filters.SetKey.value: "$value.Number"
filters.AnnotateWithType.type: "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter"
filters.AnnotateWithType.field: "$._t"
filters.AnnotateWithType.value: "XXXXXXXXXX"
filters.ProvideId.type: "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter"
filters.ProvideId.field: "$._id"
filters.ProvideId.value: "$value.Number"
fs.cleanup.policy.class: "io.streamthoughts.kafka.connect.filepulse.fs.clean.AmazonS3MoveCleanupPolicy"
fs.cleanup.policy.triggered.on: "COMMITTED"
fs.cleanup.policy.move.success.aws.bucket.name: "SUCCESS_XXXXXXXX"
fs.cleanup.policy.move.success.aws.prefix.path: "SUCCESS/processed"
fs.cleanup.policy.move.failure.aws.bucket.name: "FAILURE_XXXXXX"
fs.cleanup.policy.move.failure.aws.prefix.path: "FAILURE/error" 
file.filter.regex.pattern: ".*\\.csv|.*\\.txt$"
offset.attributes.string: "name"
skip.headers: "1"
tasks.file.status.storage.class: "io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore"
tasks.file.status.storage.bootstrap.servers: "kafka.${namespace}.svc.cluster.local:9071"
tasks.file.status.storage.topic: "${env}.prep.XXXXXXXXData-Status"
tasks.file.status.storage.topic.partitions: "1"
tasks.file.status.storage.topic.replication.factor: "3"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
key.converter.schemas.enable: "false"
value.converter: "org.apache.kafka.connect.json.JsonConverter"
value.converter.schemas.enable: "false"
tasks.file.status.storage.producer.security.protocol : "SASL_SSL"
tasks.file.status.storage.producer.ssl.endpoint.identification.algorithm : "https"
tasks.file.status.storage.producer.sasl.mechanism : "PLAIN"
tasks.file.status.storage.producer.sasl.jaas.config : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$${file:/mnt/secrets/v2-connectors-apikeys/plain.txt:username}\" password =\"$${file:/mnt/secrets/apikeys/plain.txt:password}\";"
tasks.file.status.storage.producer.request.timeout.ms : "20000"
tasks.file.status.storage.consumer.security.protocol : "SASL_SSL"
tasks.file.status.storage.consumer.ssl.endpoint.identification.algorithm : "https"
tasks.file.status.storage.consumer.sasl.mechanism : "PLAIN"
tasks.file.status.storage.consumer.sasl.jaas.config : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$${file:/mnt/secretkeys/plain.txt:username}\" password=\"$${file:/mnt/secretkeys/plain.txt:password}\";"
tasks.file.status.storage.consumer.request.timeout.ms : "20000"
principal.service.name: "$${file:/mnt/secrets/connector-configs/s3-src:username}"
principal.service.password: "$${file:/mnt/secrets/src:password}"
github-actions[bot] commented 7 months ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.