Open gustavoschewinski opened 3 months ago
You just need to checkout the github repo with the extras: https://github.com/apache/plc4x-extras/ and build that repo ... it should automatically build with the latest drivers.
Hello @chrisdutz,
I've built the folder (https://github.com/apache/plc4x-extras/tree/develop/plc4j/integrations/apache-kafka) using Maven (mvn clean install
), and in the target directory, I got two files: original-plc4j-apache-kafka-0.13.0-SNAPSHOT.jar
and plc4j-apache-kafka-0.13.0-SNAPSHOT.jar
.
When I try to use only plc4j-apache-kafka-0.13.0-SNAPSHOT.jar
, the connector doesn't appear in Kafka.
When I try to use only original-plc4j-apache-kafka-0.13.0-SNAPSHOT.jar
, I get the following error: "java.lang.NoClassDefFoundError: org/apache/plc4x/java/DefaultPlcDriverManager."
And if I use both (original-plc4j-apache-kafka-0.13.0-SNAPSHOT.jar
and plc4j-apache-kafka-0.13.0-SNAPSHOT.jar
), I get this error: "org.apache.kafka.common.config.ConfigException: Connection String format is incorrect: sources.machineA.connectionString=opcua:tcp://Yoshi.lab.mtu-digilab.io:53530/OPCUA/SimulationServer?discovery=false".
Do you know what might be causing these issues? According to the repo, I should only need the plc4j-apache-kafka-0.13.0-SNAPSHOT.jar
, but Kafka doesn't recognize it. I suspect the problem could be related to how I built this.
Is it possible to have the plugin in a folder like the ones found here: https://repo1.maven.org/maven2/org/apache/plc4x/plc4j-apache-kafka/0.12.0/?
The "originial-*" probably is the code of the kafka adapter without any dependencies. The other is the only including the dependencies. Why it doesn't appear in Kafka, I currently have no idea ... I could possibly have a look on one of my next PLC4X friday afternoons ... however will be 2-3 weeks I guess before I have time for that.
Hopefully someone else will be able to be of assistance.
The plugin will appear at a location like that once it's released ... till then the only option you have is to get it from the SNAPSHOT repo: https://repository.apache.org/content/groups/snapshots/org/apache/plc4x/plc4j-apache-kafka/0.13.0-SNAPSHOT/
Hello @chrisdutz,
Thanks for the quick response. I understand, I'll test it here and try to figure it out.
Currently, I have managed to make it appear in Kafka by changing the command from mvn install
to mvn package
.
However, if I may, I have one more question for you. I'm encountering the following error when running a source connector:
Does this error look familiar to you, or do you have any idea what might be wrong? I tried adding other dependencies related to com.fasterxml.jackson
that could be missing, but that didn't help.
[2024-08-26 10:52:34,176] INFO AbstractConfig values:
bufferSize = 1000
connection-name = machineA
connectionString = opcua:tcp://Yoshi.lab.mtu-digilab.io:53530/OPCUA/SimulationServer?discovery=false
pollReturnInterval = 500
queries = [jobA|plc-topic-plc4x|1000|counter#ns=3;i=1002]
(org.apache.kafka.common.config.AbstractConfig)
[2024-08-26 10:52:34,191] ERROR WorkerSourceTask{id=plc-1-source-13-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
java.lang.VerifyError: Bad type on operand stack
Exception Details:
Location:
org/apache/plc4x/java/scraper/config/ScraperConfiguration.fromYaml(Ljava/lang/String;Ljava/lang/Class;)Ljava/lang/Object; @11: invokespecial
Reason:
Type 'com/fasterxml/jackson/dataformat/yaml/YAMLFactory' (current frame, stack[2]) is not assignable to 'com/fasterxml/jackson/core/JsonFactory'
Current Frame:
bci: @11
flags: { }
locals: { 'java/lang/String', 'java/lang/Class' }
stack: { uninitialized 0, uninitialized 0, 'com/fasterxml/jackson/dataformat/yaml/YAMLFactory' }
Bytecode:
0000000: bb00 0a59 bb00 0c59 b700 0eb7 0012 4d2c
0000010: 2a2b b600 15b0 4ebb 0018 5912 1a2d b700
0000020: 1cbf
Exception Handler Table:
bci [15, 21] => handler: 22
Stackmap Table:
full_frame(@22,{Object[#46],Object[#48],Object[#10]},{Object[#31]})
at org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImplBuilder.build(ScraperConfigurationTriggeredImplBuilder.java:43)
at org.apache.plc4x.kafka.Plc4xSourceTask.start(Plc4xSourceTask.java:152)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.initializeAndStart(AbstractWorkerSourceTask.java:283)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:227)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:80)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
[2024-08-26 10:52:34,195] WARN Failed to close source task with type org.apache.kafka.connect.runtime.AbstractWorkerSourceTask$$Lambda$1177/0x0000000101f8b040 (org.apache.kafka.common.utils.Utils)
java.lang.NullPointerException
at org.apache.plc4x.kafka.Plc4xSourceTask.stop(Plc4xSourceTask.java:243)
at org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:1263)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.close(AbstractWorkerSourceTask.java:317)
at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:208)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.doClose(AbstractWorkerSourceTask.java:80)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:241)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:80)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Thank you in advance for your help!
First of all ... all important plugin executions are bound to the package phase ... so switcing from intall to package should just skip the installation goal step, which should have no effect on what's in "target", but if it works, that's good ;-)
The error you are getting comes from stopping the scraper without having successfully started it. This could be because it never was started, or there was an error starting it ... if you find any log output "Error starting the scraper" that should provide some more information on that.
Hello @chrisdutz,
I've tested everything with DEBUG mode enabled, but I still don't see any errors related to the scraper. It's always the same issue as mentioned before.
Could the problem be related to the environment I'm using to build it? Currently, I am using Apache Maven 3.6.3 and Java version 17.0.12. In Kafka, I am using the .zip file produced in target/components/packages
.
Is there a better way to improve logging in the PLC4x connector?
Thank you!
Hello @chrisdutz,
I discovered something interesting. I'm currently running the connector code locally to test it, and the error remains the same.
The issue is triggered when executing:
scraperConfig = builder.build()
in Plc4xSourceTaskpublic ScraperConfigurationTriggeredImpl build() { return new ScraperConfigurationTriggeredImpl(sources, jobConfigurations); }
@JsonCreator public ScraperConfigurationTriggeredImpl(@JsonProperty(value = "sources", required = true) Map<String, String> sources, @JsonProperty(value = "jobs", required = true) List<JobConfigurationImpl> jobConfigurations) { checkNoUnreferencedSources(sources, jobConfigurations); this.sources = sources; this.jobConfigurations = jobConfigurations; }
Somehow, this trigger is not functioning correctly, and it always produces the error: "Exception in thread 'main' java.lang.NoClassDefFoundError: com/fasterxml/jackson/dataformat/yaml/YAMLFactory." I suspect this might be related to the use of the @JsonCreator
annotation, which relies on FasterXML's Jackson library.
Interestingly, if I remove all test scopes when running the code, the previous FasterXML error disappears (because jackson-dataformat is just in test scope), the scraping starts, it attempts to poll but then the buffer is empty, not pooling anything. (Obs. It is possible to see in Prosys that it connected with it and opened a channel, however it doesen't poll anything)
24-08-28 10:44:53 INFO o.a.p.java.DefaultPlcDriverManager - Registering driver for Protocol simulated (Simulated PLC4X Datasource)
2024-08-28 10:44:53 INFO o.a.p.j.s.c.t.ScraperConfigurationTriggeredImpl - Assuming job as triggered job because triggerConfig has been set
2024-08-28 10:44:53 DEBUG o.a.p.j.s.t.t.TriggerConfiguration - Strategy: SCHEDULED, scheduled ms: 1000
Starting scraper...
2024-08-28 10:44:53 INFO o.a.p.j.s.t.TriggeredScraperImpl - Starting jobs...
2024-08-28 10:44:53 DEBUG o.a.p.j.s.t.TriggeredScraperImpl - Register task for job simulated-dashboard for conn machineA (opcua:tcp://Yoshi.lab.mtu-digilab.io:53530/OPCUA/SimulationServer?discovery=false) at rate 1000 ms
2024-08-28 10:44:53 DEBUG o.a.p.j.s.t.t.TriggerConfiguration - Strategy: SCHEDULED, scheduled ms: 1000
2024-08-28 10:44:53 INFO o.a.p.j.s.t.TriggeredScraperImpl - Task TriggeredScraperTask{connectionManager=org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager@672872e1, jobName='simulated-dashboard', connectionAlias='machineA', connectionString='opcua:tcp://Yoshi.lab.mtu-digilab.io:53530/OPCUA/SimulationServer?discovery=false', requestTimeoutMs=2000, executorService=java.util.concurrent.ThreadPoolExecutor@32910148[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0], resultHandler=org.apache.plc4x.kafka.Plc4xSourceTask$$Lambda$115/0x00000008400dcc40@3f56875e, triggerHandler=org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.TriggerHandlerImpl@2b4bac49} added to scheduling
2024-08-28 10:44:53 DEBUG o.a.p.j.s.t.TriggeredScraperTask - Trigger for job simulated-dashboard and device machineA is met ... scraping desired data
Scraper started.
Polling...
2024-08-28 10:44:53 DEBUG o.a.p.j.u.c.CachedPlcConnectionManager - Creating new connection
Buffer empty. Waiting...500[]
2024-08-28 10:44:54 DEBUG o.a.p.j.s.t.TriggeredScraperImpl - Job=org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScrapeJobImpl@fd07cbb Task=TriggeredScraperTask{connectionManager=org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager@672872e1, jobName='simulated-dashboard', connectionAlias='machineA', connectionString='opcua:tcp://Yoshi.lab.mtu-digilab.io:53530/OPCUA/SimulationServer?discovery=false', requestTimeoutMs=2000, executorService=java.util.concurrent.ThreadPoolExecutor@32910148[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0], resultHandler=org.apache.plc4x.kafka.Plc4xSourceTask$$Lambda$115/0x00000008400dcc40@3f56875e, triggerHandler=org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.TriggerHandlerImpl@2b4bac49}
2024-08-28 10:44:54 DEBUG o.a.p.j.s.t.TriggeredScraperImpl - Task=Trigg
After some time, the scraper just stops:
2024-08-28 10:44:54 INFO o.a.p.j.s.t.TriggeredScraperImpl - Stopping scraper...
2024-08-28 10:44:54 DEBUG o.a.p.j.s.t.TriggeredScraperImpl - Stopping task TriggeredScraperTask{connectionManager=org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager@672872e1, jobName='simulated-dashboard', connectionAlias='machineA', connectionString='opcua:tcp://Yoshi.lab.mtu-digilab.io:53530/OPCUA/SimulationServer?discovery=false', requestTimeoutMs=2000, executorService=java.util.concurrent.ThreadPoolExecutor@32910148[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0], resultHandler=org.apache.plc4x.kafka.Plc4xSourceTask$$Lambda$115/0x00000008400dcc40@3f56875e, triggerHandler=org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.TriggerHandlerImpl@2b4bac49}...
2024-08-28 10:44:54 WARN o.a.p.j.s.t.TriggeredScraperTask - Exception during scraping of Job simulated-dashboard, Connection-Alias machineA: Error-message: null - for stack-trace change logging to DEBUG
2024-08-28 10:44:54 DEBUG o.a.p.j.s.t.TriggeredScraperTask - Detailed exception occurred at scraping
org.apache.plc4x.java.api.exceptions.PlcRuntimeException: java.lang.InterruptedException
at org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl.getPlcConnection(TriggeredScraperImpl.java:334)
at org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperTask.run(TriggeredScraperTask.java:112)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.InterruptedException: null
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:385)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2005)
at org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl.getPlcConnection(TriggeredScraperImpl.java:330)
... 7 common frames omitted
Do you have any ideas on what might be causing this issue? :)
Thanks a lot in advance.
Best,
Gustavo.
Yould you please add which version of Kafka you are using?
Hello @chrisdutz,
I am using Confluent Platform 7.6.1 with Apache Kafka 3.6.x.
However, I don't think that is the problem, because I took that into consideration when compiling the connector. And I am also experiencing the same issue even when using the connector without Kafka.
Below is the code example where I am testing the connector outside of Kafka:
package org.apache.plc4x.kafka;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.*;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.plc4x.kafka.Plc4xSourceConnector;
import org.apache.plc4x.kafka.Plc4xSourceTask;
import org.apache.plc4x.kafka.config.Constants;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ConnectorTest {
private static final Logger log = LoggerFactory.getLogger(ConnectorTest.class);
public static void main(String[] args) {
// Simulate Kafka Connect properties
Map<String, String> props = new HashMap<>();
props.put("tasks.max", "1");
props.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
props.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
props.put("default.topic", "DefaultTopic");
props.put("sources", "machineA");
props.put("sources.machineA.connectionString", "opcua:tcp://Yoshi.lab.mtu-digilab.io:53530/OPCUA/SimulationServer?discovery=false");
props.put("sources.machineA.pollReturnInterval", "500");
props.put("sources.machineA.bufferSize", "1000");
props.put("sources.machineA.jobReferences.simulated-dashboard.topic", "simulated-dashboard-topic");
props.put("jobs", "simulated-dashboard");
props.put("jobs.simulated-dashboard.interval", "1000");
props.put("jobs.simulated-dashboard.tags", "running");
props.put("jobs.simulated-dashboard.tags.running", "ns=3;i=1002");
props.put("sources.machineA.jobReferences", "simulated-dashboard");
log.debug("Starting PLC4X Kafka Connector with properties: {}", props);
// Instantiate your connector
Plc4xSourceConnector connector = new Plc4xSourceConnector();
connector.start(props);
// Get task configurations
List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
if (taskConfigs.isEmpty()) {
log.error("No task configurations found. Please check the configuration.");
return;
}
// Instantiate task
Plc4xSourceTask task = new Plc4xSourceTask();
System.out.println("task" + taskConfigs.get(0));
task.start(taskConfigs.get(0));
// Simulate polling
List<SourceRecord> records = task.poll();
if (records != null && !records.isEmpty()) {
for (SourceRecord record : records) {
System.out.println("Polled Record: " + record);
}
} else {
System.out.println("No records polled.");
}
// Stop task and connector
task.stop();
connector.stop();
}
}
Thanks! I hope we can find where this problem comes from :)
Best, Gustavo.
Hello @chrisdutz,
I would like to add the following point: I tried changing the versions of PLC4x to 0.12.0 in the pom.xml
, but I am encountering the same error, which is as follows:
Exception in thread "main" java.lang.NoClassDefFoundError: com/fasterxml/jackson/dataformat/yaml/YAMLFactory
at org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImplBuilder.build(ScraperConfigurationTriggeredImplBuilder.java:43)
at org.apache.plc4x.kafka.Plc4xSourceTask.start(Plc4xSourceTask.java:184)
at org.apache.plc4x.kafka.ConnectorTest.main(ConnectorTest.java:70)
Caused by: java.lang.ClassNotFoundException: com.fasterxml.jackson.dataformat.yaml.YAMLFactory
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
... 3 more
UPDATE: By adding all missing libraries, the connector does work with the release 0.12.0. However, it does not work with the 0.13.0-SNAPSHOT.
Right now, iI guess that the issue might be related to the compilation or creation of the connector. It does not make any sense
Do you have any suggestions on how to fix this? Or any prerequisite to compile the connector.
Thanks again for all your support!
Hello @chrisdutz,
Sorry for continuing this thread...
I've tracked down the problem, and it appears to be coming from the scraper when it tries to connect:
Timeout error acquiring connection to: opcua:tcp://Yoshi.lab.mtu-digilab.io:53530/OPCUA/SimulationServer?discovery=false&username=admin&password=admin&security-policy=Basic256Sha256&message-security=SIGN_ENCRYPT&server-certificate-file=/usr/certificates/SimulationServer@Yoshi_2048.der&key-store-file=/usr/certificates/client_cert.p12&key-store-password=gustavo&tcp.keep-alive=true&channel-lifetime=30000&session-timeout=10000, error: null
java.util.concurrent.TimeoutException
This occurs exactly at the part where it tries to establish the connection, and it doesn't work:
public static PlcConnection getPlcConnection(PlcConnectionManager plcConnectionManager,
String connectionString,
ExecutorService executorService,
long requestTimeoutMs,
String info) throws InterruptedException, ExecutionException, TimeoutException {
if (!info.isEmpty() && LOGGER.isTraceEnabled()) {
LOGGER.trace("Additional Info from caller {}", info);
}
CompletableFuture<PlcConnection> future = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("Attempting to get a connection to: " + connectionString);
PlcConnection connection = plcConnectionManager.getConnection(connectionString);
System.out.println("Connection object created successfully: " + connection);
return connection;
} catch (Exception e) {
LOGGER.warn("Unable to instantiate connection to " + connectionString, e);
System.out.println("Unable to instantiate connection to: " + connectionString + ", exception: " + e);
e.printStackTrace(); // Print the full stack trace to stdout for debugging
throw new PlcRuntimeException(e);
}
}, executorService);
It gets stuck on this specific line:
PlcConnection connection = plcConnectionManager.getConnection(connectionString);
The strange part is that with release 0.12.0, this error does not occur, and everything works fine. Another odd thing is that I can see in Prosys that the connection is opened, and it also seems to renew the channel, but it does not pull the data.
At this point, I still don't know how to fix the issue. If you have any suggestions, I would greatly appreciate it :)
All the best,
Gustavo
Hello @chrisdutz,
I’ve found some additional logs that may be useful, but I still haven’t identified the root cause of the problem.
As you can see, a request is made and a channel is successfully created:
[2024-09-09 15:56:00,606] DEBUG reading field maxRequestMessageSize (org.apache.plc4x.java.spi.codegen.fields.FieldReaderSimple)
[2024-09-09 15:56:00,606] DEBUG Sending activate session request to opc.tcp://Yoshi:53530/OPCUA/SimulationServer (org.apache.plc4x.java.opcua.context.SecureChannel)
[2024-09-09 15:56:00,607] TRACE Using Endpoint Yoshi.mshome.net 53530 /OPCUA/SimulationServer (org.apache.plc4x.java.opcua.context.SecureChannel)
[2024-09-09 15:56:00,607] TRACE Using Endpoint Yoshi.mshome.net 53530 /OPCUA/SimulationServer (org.apache.plc4x.java.opcua.context.SecureChannel)
[2024-09-09 15:56:00,607] TRACE Using Endpoint Yoshi.mshome.net 53530 /OPCUA/SimulationServer (org.apache.plc4x.java.opcua.context.SecureChannel)
[2024-09-09 15:56:00,607] TRACE Using Endpoint Yoshi.mshome.net 53530 /OPCUA/SimulationServer (org.apache.plc4x.java.opcua.context.SecureChannel)
[2024-09-09 15:56:00,607] TRACE Using Endpoint Yoshi.mshome.net 53530 /OPCUA/SimulationServer (org.apache.plc4x.java.opcua.context.SecureChannel)
[2024-09-09 15:56:00,607] TRACE Using Endpoint Yoshi.mshome.net 53530 /OPCUA/SimulationServer (org.apache.plc4x.java.opcua.context.SecureChannel)
[2024-09-09 15:56:00,607] TRACE Using Endpoint Yoshi.mshome.net 53530 /OPCUA/SimulationServer (org.apache.plc4x.java.opcua.context.SecureChannel)
[2024-09-09 15:56:00,607] TRACE Using Endpoint Yoshi.mshome.net 53530 /OPCUA/SimulationServer (org.apache.plc4x.java.opcua.context.SecureChannel)
[2024-09-09 15:56:00,607] TRACE Using Endpoint Yoshi.mshome.net 53530 /OPCUA/SimulationServer (org.apache.plc4x.java.opcua.context.SecureChannel)
[2024-09-09 15:56:00,607] TRACE Using Endpoint Yoshi.mshome.net 53530 /OPCUA/SimulationServer (org.apache.plc4x.java.opcua.context.SecureChannel)
[2024-09-09 15:56:00,607] TRACE Using Endpoint Yoshi.mshome.net 53530 /OPCUA/SimulationServer (org.apache.plc4x.java.opcua.context.SecureChannel)
[2024-09-09 15:56:00,607] DEBUG Submitting Transaction to TransactionManager 3, security channel 7, token 1 (org.apache.plc4x.java.opcua.context.Conversation)
[2024-09-09 15:56:00,607] DEBUG write field receiveBufferSize (org.apache.plc4x.java.spi.codegen.fields.FieldWriterSimple)
[2024-09-09 15:56:00,607] TRACE Overflow by 15 chars (org.apache.plc4x.java.spi.utils.ascii.AsciiBoxWriter)
However, after a lot of logging, the connection is established but no data is received. Eventually, it times out:
[2024-09-09 15:56:00,632] DEBUG done reading field results (org.apache.plc4x.java.spi.codegen.fields.FieldReaderArray)
[2024-09-09 15:56:00,632] DEBUG reading field noOfDiagnosticInfos (org.apache.plc4x.java.spi.codegen.fields.FieldReaderSimple)
[2024-09-09 15:56:00,632] DEBUG reading field diagnosticInfos. Count: -1 (org.apache.plc4x.java.spi.codegen.fields.FieldReaderArray)
[2024-09-09 15:56:00,632] DEBUG done reading field diagnosticInfos (org.apache.plc4x.java.spi.codegen.fields.FieldReaderArray)
[2024-09-09 15:56:00,632] DEBUG Scheduling session keep alive to happen within 2700s (org.apache.plc4x.java.opcua.context.SecureChannel)
[2024-09-09 15:56:00,632] INFO Established connection to server (org.apache.plc4x.java.opcua.protocol.OpcuaProtocolLogic)
[2024-09-09 15:56:00,632] TRACE Firing Connected! (org.apache.plc4x.java.spi.internal.DefaultConversationContext)
[2024-09-09 15:56:01,019] DEBUG Job statistics (jobA, machineA) number of requests: 2 (0 success, 100.0 % failed, NaN % too slow), min latency: NaN ms, mean latency: NaN ms, median: NaN ms (org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl)
[2024-09-09 15:56:01,117] DEBUG Job=org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScrapeJobImpl@60290d60 Task=TriggeredScraperTask{connectionManager=org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager@55f4dc44, jobName='jobA', connectionAlias='machineA', connectionString='opcua:tcp://Yoshi:53530/OPCUA/SimulationServer?discovery=false', requestTimeoutMs=2000, executorService=java.util.concurrent.ThreadPoolExecutor@4e3f133f[Running, pool size = 2, active threads = 1, queued tasks = 0, completed tasks = 1], resultHandler=org.apache.plc4x.kafka.Plc4xSourceTask$$Lambda$1172/0x0000000101f9a440@3f54d16d, triggerHandler=org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.TriggerHandlerImpl@13ccf5a1} (org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl)
[2024-09-09 15:56:01,118] DEBUG Task=TriggeredScraperTask{connectionManager=org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager@55f4dc44, jobName='jobA', connectionAlias='machineA', connectionString='opcua:tcp://Yoshi:53530/OPCUA/SimulationServer?discovery=false', requestTimeoutMs=2000, executorService=java.util.concurrent.ThreadPoolExecutor@4e3f133f[Running, pool size = 2, active threads = 1, queued tasks = 0, completed tasks = 1], resultHandler=org.apache.plc4x.kafka.Plc4xSourceTask$$Lambda$1172/0x0000000101f9a440@3f54d16d, triggerHandler=org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.TriggerHandlerImpl@13ccf5a1} Future=java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@4e65db69[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@40a22d0d[Wrapped task = TriggeredScraperTask{connectionManager=org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager@55f4dc44, jobName='jobA', connectionAlias='machineA', connectionString='opcua:tcp://Yoshi:53530/OPCUA/SimulationServer?discovery=false', requestTimeoutMs=2000, executorService=java.util.concurrent.ThreadPoolExecutor@4e3f133f[Running, pool size = 2, active threads = 1, queued tasks = 0, completed tasks = 1], resultHandler=org.apache.plc4x.kafka.Plc4xSourceTask$$Lambda$1172/0x0000000101f9a440@3f54d16d, triggerHandler=org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.TriggerHandlerImpl@13ccf5a1}]] (org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl)
[2024-09-09 15:56:02,018] DEBUG Job statistics (jobA, machineA) number of requests: 2 (0 success, 100.0 % failed, NaN % too slow), min latency: NaN ms, mean latency: NaN ms, median: NaN ms (org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl)
[2024-09-09 15:56:02,050] TRACE Additional Info from caller acquiring data collecting connection to (machineA,jobA) (org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl)
java.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2005)
at org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl.getPlcConnection(TriggeredScraperImpl.java:330)
at org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperTask.run(TriggeredScraperTask.java:112)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.util.concurrent.TimeoutException
at java.base/java.util.concurrent.CompletableFuture$Timeout.run(CompletableFuture.java:2798)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
... 3 more
[2024-09-09 15:56:02,053] WARN Exception during scraping of Job jobA, Connection-Alias machineA: Error-message: java.util.concurrent.TimeoutException - for stack-trace change logging to DEBUG (org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperTask)
[2024-09-09 15:56:02,053] DEBUG Detailed exception occurred at scraping (org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperTask)
org.apache.plc4x.java.api.exceptions.PlcRuntimeException: java.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException
at org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl.getPlcConnection(TriggeredScraperImpl.java:334)
at org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperTask.run(TriggeredScraperTask.java:112)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2005)
at org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl.getPlcConnection(TriggeredScraperImpl.java:330)
... 7 more
Caused by: java.util.concurrent.TimeoutException
at java.base/java.util.concurrent.CompletableFuture$Timeout.run(CompletableFuture.java:2798)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
... 3 more
[2024-09-09 15:56:02,054] TRACE Check condition for task of job jobA for connection machineA (org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperTask)
[2024-09-09 15:56:02,054] DEBUG Trigger for job jobA and device machineA is met ... scraping desired data (org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperTask)
[2024-09-09 15:56:02,054] TRACE Start new scrape of task of job jobA for connection machineA (org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperTask)
Do you think this issue could be related to an update, or is there something else I might be overlooking? If you have any suggestions, just let me know :)
Thanks!
Hello @chrisdutz,
I have some updates regarding the issue.
I could find the problem and do a quick fix. In TriggeredScraperImpl.java
, the getPlcConnection
function doesn’t get the correct requestTimeoutMs
(as shown below), and it doesn’t have the default value mentioned on the website and in the OPC UA driver configuration. This is what was causing the timeout:
[2024-09-12 14:54:33,911] INFO SCRAPER DEBUG: request timeout before: 2000 (org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl)
[2024-09-12 14:54:33,911] INFO SCRAPER DEBUG: request timeout after: 30000 (org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl) --> This value is hard-coded.
I fixed it by hard-coding the request timeout, and it worked for about 5 minutes, but then it started creating sessions every 2 seconds (see the image below). This happens in both with and without security.
Do you know why the values from the OPC UA driver don’t seem to pass to the scraper? I’d like to fix this for the community, but I’m still a bit lost with all the dependencies, which makes testing difficult.
Also, do you have any idea what’s causing the session issue? I don’t see this problem when using OPC UA Java by itself, only when using it with the Kafka connector. I’d like to resolve this for the community too, but I’m still unsure where the issue lies.
Thanks, and sorry for the continued messages, reply whenever you have time :)
Best regards,
Gustavo
Hey, @gustavoschewinski , my Kafka connector has the same issue when compiling like u did. Did u find any solutions? thx
Hello @fernandokendy,
unfortunately it's still not working. I found a fix for the timeout issue as said before, but now there's another session bug occurring. At the moment, I don't have many clues about where the problem might be
Thanks for all of your work on the issue ... Unfortunately I'm not a pro using Kafka so I'm a bit hesitant to invest the 4h per week that I get to work on this ... perhaps there's someone on the community that has the expertiese to help?
To update everyone, I recently got it working, but it's not the ideal solution :)
I used version 0.12.0 from github to compile a working Kafka connector and implemented the necessary updates myself, mainly adding/changing some channel renewal features for OPCUA. The rest of the protocols seem to work fine with the "old" version.
However, something is causing some issues when compiling with the 0.13.0-SNAPSHOT version, and I haven't been able to find a fix. I still make myself available to help fix the new version with some guidance, since I'm quite lost...
head should compile again, although there are test failures related to NIFI
I didn't express myself clearly earlier, sorry.
The project compiles normally in the 0.13.0-SNAPSHOT, but the connector doesn't work because of a problem in "TriggeredScraperImpl" when running it. As I said before the requestTimeoutMs
was not being set correctly, which I fixed, but other problems then happened
maybe @splatch has an idea
However, something is causing some issues when compiling with the 0.13.0-SNAPSHOT version, and I haven't been able to find a fix. I still make myself available to help fix the new version with some guidance, since I'm quite lost...
@gustavoschewinski can you push your changes and submit PR, so I can try building your changes and see how it fails? I think it will be most effective way. I believe that we should be able to get a snapshot version of driver within plc4x-extras, once we configure ASF snapshot repository.
@splatch sorry for the delay, but it does compile. However, it doesn’t work when running in kafka. The error seems to start in the scrapper, but I couldn’t identify the exact issue. I shared more of my findings earlier in this thread, which might help finding the problem
@gustavoschewinski no worries. I see three reports - first about signature/runtime linkage errors, second about timeout errors in scraper, third about re-connections. I am not entirely sure which we need to address in order to solve initial kafka connectivity. Maybe it will be best to address them one by one?
Sounds good.
I just built the latest PLC4X for Kafka and got it running using the package from components/package/apache-kafka-connect-plc4x-plc4j-0.13.0-SNAPSHOT
. But then I ran into this error:
[2024-10-16 12:00:30,250] INFO AbstractConfig values:
bufferSize = 1000
connection-name = machineA
connectionString = opcua:tcp://Yoshi.lab.mtu-digilab.io:53530/OPCUA/SimulationServer?discovery=false&security-policy=NONE&message-security=NONE
pollReturnInterval = 500
queries = [jobA|plc-topic-plc4x|1000|counter#ns=3;i=1002]
(org.apache.kafka.common.config.AbstractConfig)
[2024-10-16 12:00:30,265] ERROR WorkerSourceTask{id=plc-1-source-13-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
java.lang.VerifyError: Bad type on operand stack
Exception Details:
Location:
org/apache/plc4x/java/scraper/config/ScraperConfiguration.fromYaml(Ljava/lang/String;Ljava/lang/Class;)Ljava/lang/Object; @11: invokespecial
Reason:
Type 'com/fasterxml/jackson/dataformat/yaml/YAMLFactory' (current frame, stack[2]) is not assignable to 'com/fasterxml/jackson/core/JsonFactory'
Current Frame:
bci: @11
flags: { }
locals: { 'java/lang/String', 'java/lang/Class' }
stack: { uninitialized 0, uninitialized 0, 'com/fasterxml/jackson/dataformat/yaml/YAMLFactory' }
Bytecode:
0000000: bb00 0a59 bb00 0c59 b700 0eb7 0012 4d2c
0000010: 2a2b b600 15b0 4ebb 0018 5912 1a2d b700
0000020: 1cbf
Exception Handler Table:
bci [15, 21] => handler: 22
Stackmap Table:
full_frame(@22,{Object[#46],Object[#48],Object[#10]},{Object[#31]})
at org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImplBuilder.build(ScraperConfigurationTriggeredImplBuilder.java:43)
at org.apache.plc4x.kafka.Plc4xSourceTask.start(Plc4xSourceTask.java:152)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.initializeAndStart(AbstractWorkerSourceTask.java:283)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:227)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:80)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
[2024-10-16 12:00:30,269] WARN Failed to close source task with type org.apache.kafka.connect.runtime.AbstractWorkerSourceTask$$Lambda$1324/0x0000000100ac8440 (org.apache.kafka.common.utils.Utils)
java.lang.NullPointerException
at org.apache.plc4x.kafka.Plc4xSourceTask.stop(Plc4xSourceTask.java:243)
at org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:1263)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.close(AbstractWorkerSourceTask.java:317)
at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:208)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.doClose(AbstractWorkerSourceTask.java:80)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:241)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:80)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
It’s weird because this issue didn’t happen with version 0.12.0. To fix it, I added this dependency:
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
<version>2.17.2</version>
</dependency>
After rebuilding and running Kafka again, I got another error:
[2024-10-16 13:35:58,609] DEBUG Job statistics (jobA, machineA) number of requests: 31 (0 success, 100.0 % failed, NaN % too slow), min latency: NaN ms, mean latency: NaN ms, median: NaN ms (org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl)
[2024-10-16 13:35:58,698] WARN Exception during scraping of Job jobA, Connection-Alias machineA: Error-message: java.util.concurrent.TimeoutException - for stack-trace change logging to DEBUG (org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperTask)
[2024-10-16 13:35:58,698] DEBUG Detailed exception occurred at scraping (org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperTask)
org.apache.plc4x.java.api.exceptions.PlcRuntimeException: java.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException
at org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl.getPlcConnection(TriggeredScraperImpl.java:334)
at org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperTask.run(TriggeredScraperTask.java:112)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2005)
at org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl.getPlcConnection(TriggeredScraperImpl.java:330)
... 7 more
Caused by: java.util.concurrent.TimeoutException
at java.base/java.util.concurrent.CompletableFuture$Timeout.run(CompletableFuture.java:2798)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
... 3 more
Same error as I got before. In this thread while looking at how to fix this problem, I found this way of getting away with it but it works just some time and crashes (https://github.com/apache/plc4x-extras/issues/125#issuecomment-2346601805).
This is the same error I saw earlier. After digging into the issue, I also found a possible workaround, but it only works for some time and then crashes again. You can check out this message of the thread where I found the workaround: link
Hi Gustavo, from the line throwing the error it seems that you are passing invalid data-structures to the scraper ... however from my perspective I can't really see which one is bad (or if both source and job-configurations are bad) ... I could add some code to possibly do some more checks and output an error message that is easier to understand.
@chrisdutz, that’s possible, but I’m using the same configuration for the connector that works with versions v0.12.0 and v0.13.0-SNAPSHOT from the Maven repository. The errors only occur when rebuilding from the current HEAD branch and using the connector in Kafka. Is it possible that the connector configuration has changed?
Obs: Ive already got the connector working with a updated v0.12.0 and everything I need, so there’s no rush. However, I’m happy to help in finding a solution for the community
What happened?
Hello,
I have a question rather than a bug report.
Is it possible to compile plc4j-apache-kafka with newer versions of the OPC-UA and S7 drivers? If so, is there any documentation available on how to do this? I'm willing to compile it and make the updated version of the Kafka connector available for everyone to use.
The reason I ask is that I need to use version 0.13.0-SNAPSHOT for both drivers.
Thank you!
Best regards, Gustavo.
Version
v.0.12.0
Programming Languages
Protocols