streamthoughts / kafka-connect-file-pulse

🔗 A multipurpose Kafka Connect connector that makes it easy to parse, transform and stream any file, in any format, into Apache Kafka
https://streamthoughts.github.io/kafka-connect-file-pulse/
Apache License 2.0
322 stars 64 forks source link

CSV file, filter with list of columns #482

Open ypynda opened 1 year ago

ypynda commented 1 year ago

Hi team, and thanks for a great product!

I’m using FilePulseSourceConnector with CSVFilter type. My source files don’t have any headers.

When I have the following configuration, everything is working fine

  "filters.ParseCSVLine.type"       : "io.streamthoughts.kafka.connect.filepulse.filter.CSVFilter",
  "filters.ParseCSVLine.seperator"  : ",",
  "filters.ParseCSVLine.ignore.leading.whitespace": "true",
  "filters.ParseCSVLine.auto.generate.column.names": "true",

When I try to specify column name:type, I get unexpected results. It looks like the column list is sorted alphabetically and the column names don't match

  "filters.ParseCSVLine.type"       : "io.streamthoughts.kafka.connect.filepulse.filter.CSVFilter",
  "filters.ParseCSVLine.seperator"  : ",",
  "filters.ParseCSVLine.ignore.leading.whitespace": "true",
  "filters.ParseCSVLine.auto.generate.column.names": "false",
  "filters.ParseCSVLine.columns"    : "DataID1:string;TextField1:string; DataID2:string;TextField2:string;Date1:string;Date2:string;diffDate:string",

Can you please check it out? Thank you

mjseid commented 1 year ago

We are experiencing this as well on the latest 2.12.0 release

fhussonnois commented 1 year ago

Hi @ypynda, @mjseid thank you for reporting this issue. I will check the filter and release new version if necessary.

abgoswami commented 1 year ago

Hi @fhussonnois , Can you please assign this bug to me? Or add me as collaborator? I have forked the repository and want to submit a Pull Request for the fix, but want to make sure that someone else is not working simultaneously on this issue. Thanks

abgoswami commented 1 year ago

@fhussonnois , Thanks. I'll submit a PR and drop a note here.

thutch commented 1 year ago

I'm seeing the same issue. Has this been resolved? The latest version that does not have this issue is 2.8.0

kirito73 commented 1 year ago

The code indeed take the order in which it is provided in the config and doesn't reorder based on actual position in the csv. So if you provide the same ordering of columns in your config and csv both it will work as expected. But I think this must be handled in the code itself.

I can raise a PR for the fix if that is fine with @fhussonnois and @abgoswami

pmizenin commented 1 year ago

I'm facing a similar issue, with "filters.ParseCSVLine.extract.column.name": "headers" setting the columns of the CSV come out of order.

Where this bites me is when I try to publish in protobuf and attempt to use "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",. In this case the schema I provide doesn't work with the structure that gets produced by the filter.

Is somebody looking into this problem? If not, should I attempt to?

github-actions[bot] commented 11 months ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

goyaltu-deshaw commented 3 months ago

Does anyone know how to handle files that are already present and continuously updated. Essentially, new records ingested into these existing files are not being moved to the Kafka topic. Is there any explicit config need to be specified or something?

polasanthoshkumar commented 2 months ago

Hi, Is there any solution to the issue that is raised? I'm facing similar issue where I'm using FilePulseSourceConnector for reading a CSV file.

"connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector", "file.filter.regex.pattern": ".*\\.csv", "filters": "ParseDelimitedRow", "filters.ParseDelimitedRow.columns": "FirstName:STRING;lastFourPhoneNum:STRING;date:STRING;countryCode:STRING;availableBal:STRING", "filters.ParseDelimitedRow.separator": ",", "filters.ParseDelimitedRow.trimColumn": false, "filters.ParseDelimitedRow.type": "io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter", "fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3FileSystemListing", "fs.listing.filters": "io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter",

I'm able to parse the csv file while using file pulse connector with version 2.9.0 and I've updated it to 2.13.0 and the same file has mismatch while reading and mapping the column values. I'm not using headers in my csv.

Do provide an update if anyone faced similar issue and if there is a resolution or fix provided.