streamthoughts / kafka-connect-file-pulse

🔗 A multipurpose Kafka Connect connector that makes it easy to parse, transform and stream any file, in any format, into Apache Kafka
https://streamthoughts.github.io/kafka-connect-file-pulse/
Apache License 2.0
322 stars 63 forks source link

Unsupported node type '10' #207

Closed ikemeneghetti closed 2 years ago

ikemeneghetti commented 2 years ago

Hi! First of all, thanks for maintaining this awesome plugin.

I'm getting "Unsupported node type 10" error when I try to read xml records in the following format:

<?xml version="1.0" encoding="ISO-8859-1"?>
<!DOCTYPE myCDR>
<myCDR version="21.0">
    <cdrData>
        <headerModule>
            <recordId>
                <info1>value</info1>
                <info2>value</info2>
                <info3>value</info3>
                <info4>value</info4>
            </recordId>
            <type>value</type>
        </headerModule>
    </cdrData>
</myCDR>

Error output:

2021-12-13 14:17:34,573 INFO Opening new iterator for: file:/tmp/xml/input/file_31.xml (io.streamthoughts.kafka.connect.filepulse.source.DelegateFileInputIterator) [task-thread-xml-sa04-0]
2021-12-13 14:17:34,577 INFO Closed iterator for: [uri=file:/tmp/xml/input/file_31.xml, name='file_31.xml', contentLength=6501, lastModified=1639405046972, contentDigest=[digest=3456093430, algorithm='CRC32'], userDefinedMetadata={system.inode=6460595, system.hostname=connect-cluster-2-connect-d969dc856-fvdzc}]  (io.streamthoughts.kafka.connect.
2021-12-13 14:17:34,577 ERROR Error while processing source file '[uri=file:/tmp/xml/input/file_31.xml, name='file_31.xml', contentLength=6501, lastModified=1639405046972, contentDigest=[digest=3456093430, algorithm='CRC32'], userDefinedMetadata={system.inode=6460595, system.hostname=connect-cluster-2-connect-d969dc856-fvdzc}]' (io.streamthought
io.streamthoughts.kafka.connect.filepulse.reader.ReaderException: Failed to convert XML document to connect struct object: [partition=null, metadata=[uri=file:/tmp/xml/input/file_31.xml, name='file_31.xml', contentLength=6501, lastModified=1639405046972, contentDigest=[digest=3456093430, algorithm='CRC32'], userDefinedMetadata={system.inode=6460595,
        at io.streamthoughts.kafka.connect.filepulse.fs.reader.xml.XMLFileInputIterator.next(XMLFileInputIterator.java:188)
        at io.streamthoughts.kafka.connect.filepulse.source.DelegateFileInputIterator.next(DelegateFileInputIterator.java:137)
        at io.streamthoughts.kafka.connect.filepulse.source.DefaultFileRecordsPollingConsumer.next(DefaultFileRecordsPollingConsumer.java:175)
        at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask.poll(FilePulseSourceTask.java:196)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:270)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:237)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: io.streamthoughts.kafka.connect.filepulse.reader.ReaderException: Unsupported node type '10'
        at io.streamthoughts.kafka.connect.filepulse.xml.XMLNodeToStructConverter.readObjectNodeValue(XMLNodeToStructConverter.java:170)
        at io.streamthoughts.kafka.connect.filepulse.xml.XMLNodeToStructConverter.convertObjectTree(XMLNodeToStructConverter.java:131)
        at io.streamthoughts.kafka.connect.filepulse.xml.XMLNodeToStructConverter.apply(XMLNodeToStructConverter.java:113)
        at io.streamthoughts.kafka.connect.filepulse.fs.reader.xml.XMLFileInputIterator.next(XMLFileInputIterator.java:186)
        ... 12 more

As expected, when I remove the DOCTYPE line, the record is processed normally. I would like to be able to process these files without having to remove the DOCTYPE. I don't need to do any xml validation so I can just ignore it. I couldn't find a way to ignore this attribute or extract what I need via an xpath expression.

Here are some of the settings I am using for this connector:

spec:
  class: io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector
  config:
    tasks.max: 1
    connector.class: io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector
    fs.listing.class: io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing 
    fs.listing.filters: io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter
    fs.listing.directory.path: /tmp/xml/input
    file.filter.regex.pattern: "^.*xml$"
    fs.cleanup.policy.class: io.streamthoughts.kafka.connect.filepulse.fs.clean.LocalMoveCleanupPolicy
    tasks.reader.class: io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalXMLFileInputReader
    tasks.empty.poll.wait.ms: 2000
    tasks.file.status.storage.class: io.streamthoughts.kafka.connect.filepulse.state.InMemoryFileObjectStateBackingStore
    cleaner.output.failed.path: /tmp/xml/reject
    cleaner.output.succeed.path: /tmp/xml/archive
    value.converter: io.confluent.connect.avro.AvroConverter

Thanks in advance!

fhussonnois commented 2 years ago

Hi @ikemeneghetti, thank you for opening this issue, I will investigate on it. Here is the documentation about the XML reader : https://streamthoughts.github.io/kafka-connect-file-pulse/docs/developer-guide/file-readers/#xxxxmlfileinputreader

You can configure an XPATH using the property reader.xpath.expression . Also, by default XML validation is not enabled.

ikemeneghetti commented 2 years ago

Hi! Thanks for the quick response!

I've tried some xpath expressions, but the issue is that I can't get "myCDR" as the root of my record.

I will paste an excerpt of the records that I can read from the topic.

When I remove the DOCTYPE line manually and do not define a xpath expression:

{"myCDR":{"myCDR":{"cdrData":{"array":[{"io.confluent.connect.avro.CdrData":{"headerModule":{"io.confluent.connect.avro.HeaderModule":{"recordId":{  

When I let the original xml and use a xpath expression:

{"cdrData":{"array":[{"CdrData":{"basicModule":null,"centrexModule":null,"headerModule":{"io.confluent.connect.avro.HeaderModule":{"recordId": 

I tried the following expressions:

I validated these expressions with xmlllint and them seem to include "myCDR" in the output.

fhussonnois commented 2 years ago

Hi @ikemeneghetti, a fix was push for this issue. You can try it by using the following docker image:

docker pull streamthoughts/kafka-connect-file-pulse:master
ikemeneghetti commented 2 years ago

Hi @fhussonnois!

I just got a chance to test the update just now.

Worked perfectly!

I noticed that some WARNs appear in the log, but it doesn't seem to be a problem since the info came up in the topic normally. I'll put an excerpt of the log just to demonstrate, but the problem can already be considered solved.

[0m 2021-12-16 17:39:18,593 INFO [task-thread-connect-file-pulse-quickstart-xml-0] Started FilePulse source task (io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask)
[0m 2021-12-16 17:39:18,593 INFO [task-thread-connect-file-pulse-quickstart-xml-0] WorkerSourceTask{id=connect-file-pulse-quickstart-xml-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask)
[0m 2021-12-16 17:39:18,598 INFO [task-thread-connect-file-pulse-quickstart-xml-0] WorkerSourceTask{id=connect-file-pulse-quickstart-xml-0} Executing source task (org.apache.kafka.connect.runtime.WorkerSourceTask)
[0m 2021-12-16 17:39:18,601 INFO [kafka-producer-network-thread | connector-producer-connect-file-pulse-quickstart-xml-0] [Producer clientId=connector-producer-connect-file-pulse-quickstart-xml-0] Cluster ID: GD8uTB8GQECRME_IRvRnIw (org.apache.kafka.clients.Metadata)
[0m 2021-12-16 17:39:18,617 INFO [task-thread-connect-file-pulse-quickstart-xml-0] Opening new iterator for: file:/tmp/kafka-connect/examples/file_1.xml (io.streamthoughts.kafka.connect.filepulse.source.DelegateFileInputIterator)
[0m 2021-12-16 17:39:18,620 WARN [task-thread-connect-file-pulse-quickstart-xml-0] Handled XML parser error on file file:/tmp/kafka-connect/examples/file_1.xml. Error: Element type "myCDR" must be declared. (io.streamthoughts.kafka.connect.filepulse.fs.reader.xml.XMLFileInputIterator)
[0m 2021-12-16 17:39:18,620 WARN [task-thread-connect-file-pulse-quickstart-xml-0] Handled XML parser error on file file:/tmp/kafka-connect/examples/file_1.xml. Error: Element type "cdrData" must be declared. (io.streamthoughts.kafka.connect.filepulse.fs.reader.xml.XMLFileInputIterator)
[0m 2021-12-16 17:39:18,620 WARN [task-thread-connect-file-pulse-quickstart-xml-0] Handled XML parser error on file file:/tmp/kafka-connect/examples/file_1.xml. Error: Element type "headerModule" must be declared. (io.streamthoughts.kafka.connect.filepulse.fs.reader.xml.XMLFileInputIterator)
[0m 2021-12-16 17:39:18,620 WARN [task-thread-connect-file-pulse-quickstart-xml-0] Handled XML parser error on file file:/tmp/kafka-connect/examples/file_1.xml. Error: Element type "recordId" must be declared. (io.streamthoughts.kafka.connect.filepulse.fs.reader.xml.XMLFileInputIterator)
[0m 2021-12-16 17:39:18,620 WARN [task-thread-connect-file-pulse-quickstart-xml-0] Handled XML parser error on file file:/tmp/kafka-connect/examples/file_1.xml. Error: Element type "info1" must be declared. (io.streamthoughts.kafka.connect.filepulse.fs.reader.xml.XMLFileInputIterator)
[0m 2021-12-16 17:39:18,620 WARN [task-thread-connect-file-pulse-quickstart-xml-0] Handled XML parser error on file file:/tmp/kafka-connect/examples/file_1.xml. Error: Element type "info2" must be declared. (io.streamthoughts.kafka.connect.filepulse.fs.reader.xml.XMLFileInputIterator)
[0m 2021-12-16 17:39:18,620 WARN [task-thread-connect-file-pulse-quickstart-xml-0] Handled XML parser error on file file:/tmp/kafka-connect/examples/file_1.xml. Error: Element type "info3" must be declared. (io.streamthoughts.kafka.connect.filepulse.fs.reader.xml.XMLFileInputIterator)
[0m 2021-12-16 17:39:18,620 WARN [task-thread-connect-file-pulse-quickstart-xml-0] Handled XML parser error on file file:/tmp/kafka-connect/examples/file_1.xml. Error: Element type "info4" must be declared. (io.streamthoughts.kafka.connect.filepulse.fs.reader.xml.XMLFileInputIterator)
[0m 2021-12-16 17:39:18,620 WARN [task-thread-connect-file-pulse-quickstart-xml-0] Handled XML parser error on file file:/tmp/kafka-connect/examples/file_1.xml. Error: Element type "type" must be declared. (io.streamthoughts.kafka.connect.filepulse.fs.reader.xml.XMLFileInputIterator)
[0m 2021-12-16 17:39:18,662 INFO [task-thread-connect-file-pulse-quickstart-xml-0] Closed iterator for: [uri=file:/tmp/kafka-connect/examples/file_1.xml, name='file_1.xml', contentLength=302, lastModified=1639676337000, contentDigest=[digest=2787635386, algorithm='CRC32'], userDefinedMetadata={system.inode=67563658, system.hostname=786e7e695127}]  (io.streamthoughts.kafka.connect.filepulse.source.DelegateFileInputIterator)
[0m 2021-12-16 17:39:18,662 INFO [task-thread-connect-file-pulse-quickstart-xml-0] Completed all object files. FilePulse source task is transitioning to IDLE state while waiting for new reconfiguration request from source connector. (io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask)
[0m 2021-12-16 17:39:22,713 INFO [FileSystemMonitorThread] Cleaning up completed object files '1' (io.streamthoughts.kafka.connect.filepulse.fs.DefaultFileSystemMonitor)
[0m 2021-12-16 17:39:22,716 INFO [FileSystemMonitorThread] Finished cleaning all completed object files (io.streamthoughts.kafka.connect.filepulse.fs.DefaultFileSystemMonitor)
[0m 2021-12-16 17:39:22,716 INFO [FileSystemMonitorThread] Scheduled files still being processed: 1. Skip filesystem listing while waiting for tasks completion (io.streamthoughts.kafka.connect.filepulse.fs.DefaultFileSystemMonitor)
[0m 2021-12-16 17:39:22,716 INFO [FileSystemMonitorThread] Completed filesystem monitoring iteration in 3 ms (io.streamthoughts.kafka.connect.filepulse.source.FileSystemMonitorThread)
[0m 2021-12-16 17:39:27,821 INFO [FileSystemMonitorThread] Starting to list object files using: LocalFSDirectoryListing (io.streamthoughts.kafka.connect.filepulse.fs.DefaultFileSystemMonitor)
[0m 2021-12-16 17:39:27,821 INFO [FileSystemMonitorThread] Completed object files listing. '0' object files found in 0ms (io.streamthoughts.kafka.connect.filepulse.fs.DefaultFileSystemMonitor)
[0m 2021-12-16 17:39:27,822 INFO [FileSystemMonitorThread] Finished lookup for new object files: '0' files can be scheduled for processing (io.streamthoughts.kafka.connect.filepulse.fs.DefaultFileSystemMonitor)

Thanks!