DataDog / java-dogstatsd-client

Java statsd client library
MIT License
176 stars 101 forks source link

Could not load FFI provider jnr.ffi.provider.jffi.Provider #258

Open shubhamb12 opened 2 days ago

shubhamb12 commented 2 days ago

Hi!,

We are using "com.datadoghq" % "java-dogstatsd-client" % "4.2.0", as SBT dependency for our Flink application. Suddenly during HPA rescaling or any general redeployment we are seeing the following jnr related error from StatsDClientBuilder

Flink version: 1.18 DatadogClient: 3.33.0

java.lang.UnsatisfiedLinkError: could not load FFI provider jnr.ffi.provider.jffi.Provider
    at jnr.ffi.provider.InvalidRuntime.newLoadError(InvalidRuntime.java:101)
    at jnr.ffi.provider.InvalidRuntime.findType(InvalidRuntime.java:42)
    at jnr.ffi.Struct$NumberField.<init>(Struct.java:872)
    at jnr.ffi.Struct$Unsigned16.<init>(Struct.java:1240)
    at jnr.unixsocket.SockAddrUnix$DefaultSockAddrUnix.<init>(SockAddrUnix.java:209)
    at jnr.unixsocket.SockAddrUnix.create(SockAddrUnix.java:174)
    at jnr.unixsocket.UnixSocketAddress.<init>(UnixSocketAddress.java:53)
    at com.timgroup.statsd.NonBlockingStatsDClientBuilder$1.call(NonBlockingStatsDClientBuilder.java:261)
    at com.timgroup.statsd.NonBlockingStatsDClientBuilder$1.call(NonBlockingStatsDClientBuilder.java:259)
    at com.timgroup.statsd.NonBlockingStatsDClientBuilder.staticAddressResolution(NonBlockingStatsDClientBuilder.java:283)
    at com.timgroup.statsd.NonBlockingStatsDClientBuilder.staticStatsDAddressResolution(NonBlockingStatsDClientBuilder.java:299)
    at com.timgroup.statsd.NonBlockingStatsDClientBuilder.resolve(NonBlockingStatsDClientBuilder.java:217)
    at com.timgroup.statsd.NonBlockingStatsDClientBuilder.build(NonBlockingStatsDClientBuilder.java:193)
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:309)
    at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
    at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
    at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)

Caused by: java.lang.UnsatisfiedLinkError: could not get native definition for type `POINTER`, original error message follows: java.io.IOException: Unable to write jffi binary stub to `/tmp`. Set `TMPDIR` or Java property `java.io.tmpdir` to a read/write path that is not mounted "noexec".
    at com.kenai.jffi.internal.StubLoader.tempReadonlyError(StubLoader.java:414)
    at com.kenai.jffi.internal.StubLoader.loadFromJar(StubLoader.java:399)
    at com.kenai.jffi.internal.StubLoader.load(StubLoader.java:278)
    at com.kenai.jffi.internal.StubLoader.<clinit>(StubLoader.java:487)
    at java.base/java.lang.Class.forName0(Native Method)
    at java.base/java.lang.Class.forName(Unknown Source)
    at com.kenai.jffi.Init.load(Init.java:68)
    at com.kenai.jffi.Foreign$InstanceHolder.getInstanceHolder(Foreign.java:49)
    at com.kenai.jffi.Foreign$InstanceHolder.<clinit>(Foreign.java:45)
    at com.kenai.jffi.Foreign.getInstance(Foreign.java:103)
    at com.kenai.jffi.Type$Builtin.lookupTypeInfo(Type.java:242)
    at com.kenai.jffi.Type$Builtin.getTypeInfo(Type.java:237)
    at com.kenai.jffi.Type.resolveSize(Type.java:155)
    at com.kenai.jffi.Type.size(Type.java:138)
    at jnr.ffi.provider.jffi.NativeRuntime$TypeDelegate.size(NativeRuntime.java:178)
    at jnr.ffi.provider.AbstractRuntime.<init>(AbstractRuntime.java:48)
    at jnr.ffi.provider.jffi.NativeRuntime.<init>(NativeRuntime.java:57)
    at jnr.ffi.provider.jffi.NativeRuntime.<init>(NativeRuntime.java:41)
    at jnr.ffi.provider.jffi.NativeRuntime$SingletonHolder.<clinit>(NativeRuntime.java:53)
    at jnr.ffi.provider.jffi.NativeRuntime.getInstance(NativeRuntime.java:49)
    at jnr.ffi.provider.jffi.Provider.<init>(Provider.java:29)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
    at java.base/java.lang.reflect.Constructor.newInstance(Unknown Source)
    at java.base/java.lang.Class.newInstance(Unknown Source)
    at jnr.ffi.provider.FFIProvider$SystemProviderSingletonHolder.getInstance(FFIProvider.java:68)
    at jnr.ffi.provider.FFIProvider$SystemProviderSingletonHolder.<clinit>(FFIProvider.java:57)
    at jnr.ffi.provider.FFIProvider.getSystemProvider(FFIProvider.java:35)
    at jnr.ffi.Runtime$SingletonHolder.<clinit>(Runtime.java:82)
    at jnr.ffi.Runtime.getSystemRuntime(Runtime.java:67)
    at jnr.unixsocket.SockAddrUnix.<init>(SockAddrUnix.java:46)
    at jnr.unixsocket.SockAddrUnix$DefaultSockAddrUnix.<init>(SockAddrUnix.java:208)
    at jnr.unixsocket.SockAddrUnix.create(SockAddrUnix.java:174)
    at jnr.unixsocket.UnixSocketAddress.<init>(UnixSocketAddress.java:53)
    at com.timgroup.statsd.NonBlockingStatsDClientBuilder$1.call(NonBlockingStatsDClientBuilder.java:261)
    at com.timgroup.statsd.NonBlockingStatsDClientBuilder$1.call(NonBlockingStatsDClientBuilder.java:259)
    at com.timgroup.statsd.NonBlockingStatsDClientBuilder.staticAddressResolution(NonBlockingStatsDClientBuilder.java:283)
    at 
    at scala.Option.foreach(Option.scala:257)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
    at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:309)
    at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
    at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
    at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
    at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
    at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
    at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:160)
    at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419)
    at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.nio.channels.ClosedByInterruptException
    at java.base/java.nio.channels.spi.AbstractInterruptibleChannel.end(Unknown Source)
    at java.base/sun.nio.ch.FileChannelImpl.endBlocking(Unknown Source)
    at java.base/sun.nio.ch.FileChannelImpl.size(Unknown Source)
    at java.base/sun.nio.ch.FileChannelImpl.transferFrom(Unknown Source)
    at com.kenai.jffi.internal.StubLoader.loadFromJar(StubLoader.java:392)
    ... 74 more

    at com.kenai.jffi.Type$Builtin.lookupTypeInfo(Type.java:253)
    at com.kenai.jffi.Type$Builtin.getTypeInfo(Type.java:237)
    at com.kenai.jffi.Type.resolveSize(Type.java:155)
    at com.kenai.jffi.Type.size(Type.java:138)
    at jnr.ffi.provider.jffi.NativeRuntime$TypeDelegate.size(NativeRuntime.java:178)
    at jnr.ffi.provider.AbstractRuntime.<init>(AbstractRuntime.java:48)
    at jnr.ffi.provider.jffi.NativeRuntime.<init>(NativeRuntime.java:57)
    at jnr.ffi.provider.jffi.NativeRuntime.<init>(NativeRuntime.java:41)
    at jnr.ffi.provider.jffi.NativeRuntime$SingletonHolder.<clinit>(NativeRuntime.java:53)
    at jnr.ffi.provider.jffi.NativeRuntime.getInstance(NativeRuntime.java:49)
    at jnr.ffi.provider.jffi.Provider.<init>(Provider.java:29)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
    at java.base/java.lang.reflect.Constructor.newInstance(Unknown Source)
    at java.base/java.lang.Class.newInstance(Unknown Source)
    at jnr.ffi.provider.FFIProvider$SystemProviderSingletonHolder.getInstance(FFIProvider.java:68)
    at jnr.ffi.provider.FFIProvider$SystemProviderSingletonHolder.<clinit>(FFIProvider.java:57)
    at jnr.ffi.provider.FFIProvider.getSystemProvider(FFIProvider.java:35)
    at jnr.ffi.Runtime$SingletonHolder.<clinit>(Runtime.java:82)
    at jnr.ffi.Runtime.getSystemRuntime(Runtime.java:67)
    at jnr.unixsocket.SockAddrUnix.<init>(SockAddrUnix.java:46)
    at jnr.unixsocket.SockAddrUnix$DefaultSockAddrUnix.<init>(SockAddrUnix.java:208)
    ... 43 more

We have already tried upgrading to latest 4.4.3 but no luck.

Thanks for any help

vickenty commented 1 day ago

Hi,

This is the relevant part of the traceback that also explains how to fix this:

java.io.IOException: Unable to write jffi binary stub to /tmp. Set TMPDIR or Java property java.io.tmpdir to a read/write path that is not mounted "noexec".

If you don't mind me asking, I'd like to know a bit more about the environment where it happens:

The answers wouldn't change anything in the advice offered by the error message, but it would help us understand when the issue happens.

shubhamb12 commented 1 day ago

Hi, Thanks for the response. The problem is we have around 400 pipelines and this issue has never creeped up for any of them. Infact for this particular pipeline this issue is transient and occur sometime when either deploying or new pods coming up. Without changing any configuration the issue doesnt creep many of times as well. So I wonder if there is a race condition which is causing the need for /tmp folder. We dont have /tmp configured for any of other pipelines. We have quite complex as well as simple flink pipelines who have never had this issue whatsoever that makes it a little special for us. This pipeline is only different by the fact that it has a very high message throughput.

Any guidance to permanently fix it would be appreciated. OS : Ubuntu 22.04.4

Our base build dockerfile

FROM docker.io/library/flink:1.18.1-scala_2.12-java11

Thanks

shubhamb12 commented 1 day ago

Btw if I disable the statsd client with noopsstatsD there is no error. So this error specifically originates from this lib

vickenty commented 20 hours ago

I looked a bit into what JFFI is doing, and it appears the library tries to unpack the native library into two different locations, first in /tmp, and then in the current working directory if that fails - and glues errors from the two attempts together in reverse order. The error about the non-writeable directory would come from the attempt in the current directory, and the other exception in the traceback is this:

Caused by: java.nio.channels.ClosedByInterruptException
    at java.base/java.nio.channels.spi.AbstractInterruptibleChannel.end(Unknown Source)
    at java.base/sun.nio.ch.FileChannelImpl.endBlocking(Unknown Source)
    at java.base/sun.nio.ch.FileChannelImpl.size(Unknown Source)
    at java.base/sun.nio.ch.FileChannelImpl.transferFrom(Unknown Source)
    at com.kenai.jffi.internal.StubLoader.loadFromJar(StubLoader.java:392)

It appears that either Flink or something else is trying to interrupt the thread, and the JFFI library does not handle this gracefully. This would explain the transient nature of the error and why it happens on some deployments but not others. I'm not familar with Flink enough to offer more targeted advice, but perhaps the statsd client initialization could be moved to a different phase in the task lifecycle that would not be interrupted.

shubhamb12 commented 8 hours ago

The statsD is already as init step on job graph creation. I am not sure where else we can init it.