Closed robshep closed 1 year ago
A log file showing the same issue encountered just now.
A restart of the application recovered, but seemingly wasn't able to recover itself.
@robshep Could you please provide more information about the schema configuration? Which schema compatibility policy are you using? how the schema of the topic looks like now(You can use the pulsar-admin schemas to get the current schema of the topic)
Thank you,
I'm afraid I don't know how to get the schema compatibility policy of a topic - please can you advise. I don't see a cli command to show this)
On the few occasions this has happened, it typically works OK after restarting the application. On once occasion it has failed to recover so I used a fresh broker cluster. (in testing environment so no data lost)
You can see in the original post the NULL schema that is returned as a symptom of this problem, whereas below is a proper one as logged upon connection.
"name": "myorg/myapp/ingress",
"schema": {
"type": "record",
"name": "IngressMessageV1",
"namespace": "io.myapp.shared.pl",
"fields": [
{
"name": "attributes",
"type": [
"null",
{
"type": "map",
"values": "string"
}
]
},
{
"name": "timestamp",
"type": "long"
},
{
"name": "rawMessage",
"type": [
"null",
{
"type": "bytes",
"java-class": "[B"
}
]
}
]
},
"type": "AVRO",
"properties": {
"__alwaysAllowNull": "true",
"__jsr310ConversionEnabled": "false"
}
}```
@robshep Thanks, seems the schema definition is not the problem, I think maybe the problem is the reader cache in the StructSchema. @congbobo184 Could you please help take a look at this issue?
@robshep hello, I look up your log and are you consume more than one topic in your consumer? it look like one topic does not have schema.
Hi, thanks for taking a look. There are 3 topics in use in this deployment. All have schemas. None of this code, or the Payload objects have changed for many months.
I notice in App1 there is another topic -RETRY
created presumably to handle redelivery. I don't maintain a schema here.
The log above is from restarting App1
below but I have seen the same issue with the "Null Schema" also when restarting App3
also.
App1: 1 x consumer, 1 x producer
@Bean(destroyMethod = "close")
@Autowired
public Consumer<UpdateMessageV1> updateListener(PulsarUpdateListener pulsarUpdateListener) throws PulsarClientException {
return pulsarClient().newConsumer(AvroSchema.of(UpdateMessageV1.class))
.messageListener(pulsarUpdateListener)
.subscriptionMode(SubscriptionMode.Durable)
.subscriptionName("app-tp-updater")
.negativeAckRedeliveryDelay(20, TimeUnit.SECONDS) // try again after 20 seconds
.topic("persistent://myorg/myappB/update")
.enableRetry(true)
.subscriptionType(SubscriptionType.Shared)
.subscribe();
}
@Bean(destroyMethod = "close")
public Producer notifyExpiryProducer() throws PulsarClientException {
return pulsarClient().newProducer(AvroSchema.of(ExpiryNotificationV1.class))
.topic("persistent://myorg/myappB/notify:expiry")
.create();
}
App 2: 1 x producer
final Producer<IngressMessageV1> producer = pulsarClient.newProducer(AvroSchema.of(IngressMessageV1.class))
.topic("persistent://myorg/myappA/ingress")
.create();
App3: 1 x producer, 2 x consumer
producer = pulsarClient.newProducer(AvroSchema.of(UpdateMessageV1.class))
.producerName("myapp-tp-update-consumer")
.topic("persistent://myorg/myappB/update")
.create();
consumerTPNotifsV1 = pulsarClient.newConsumer(AvroSchema.of(ExpiryNotificationV1.class))
.subscriptionName("app-tp-expiry")
.topic( "persistent://myorg/appB/notify:expiry")
.subscribe();
consumerIngressV1 = pulsarClient.newConsumer(AvroSchema.of(IngressMessageV1.class))
.subscriptionName("myapp-ingress")
.topic("persistent://myorg/myappA/ingress")
.subscribe();
Thanks
Rob
@robshep could you please provide more detailed information from app3, log and which topic the consumer Cannot parse <null> schema
Thank you,
Attached is the application log from the first time this happened.
It isn't clear exaclty which topic the consumer finds a null schema, but you can see in this example it was both consumers.
On this occasion a plain restart recovered one of them (persistent://myorg/appB/notify:expiry) but the other (persistent://myorg/myappA/ingress) did not recover (same error of Null schema on each message received) and so I just trashed the standalone pulsar and started a new instance.
It has done it since, but a restart recovered them.
Thanks again.
if restart, this topic persistent://myorg/myappA/ingress
not recover you can use admin api to check is the schema exist.
Hi,
This is still happening and we still need to keep restarting connected producers/consumers after updating our application. Occasionally if 20+restarts does not clear it, we destroy the (docker standalone) broker and re-build.
@congbobo184 All the expected topics appear using pulsar-admin at any stage in the process: before, during a failure, after etc.
@robshep Sorry to hear that,
AvroSchema has been in the production environment for a long time. It is a stable feature.
I think you can use one topic to test and use ./pulsar-admin schemas get persistent://tenant/namespace/topic --version
to find out this version of schema in broker or provide a stable way to reproduce for me, I will reproduce it. include. (schema compatibility, all version schema, producer and consumer update order and update schema.)
I think this is not a problem that can prevent you from entering the production environment, we can try to solve this problem together. You can ping me in pulsar slack general channel. my clack name is CongBo
. Expect you to find me :)
Thank you,
If the components are generally stable then there could be issues with my setup? In this case I am grateful for your support, and will look you up on slack With thanks R
Get Outlook for iOShttps://aka.ms/o0ukef
From: congbo notifications@github.com
Sent: Monday, January 18, 2021 2:23:49 AM
To: apache/pulsar pulsar@noreply.github.com
Cc: Rob Shepherd rs@datacymru.net; Mention mention@noreply.github.com
Subject: Re: [apache/pulsar] Cannot parse
@robshephttps://github.com/robshep Sorry to hear that,
AvroSchema has been in the production environment for a long time. It is a stable feature.
I think you can use one topic to test and use ./pulsar-admin schemas get persistent://tenant/namespace/topic --version to find out this version of schema in broker or provide a stable way to reproduce for me, I will reproduce it. include. (schema compatibility, all version schema, producer and consumer update order and update schema.)
I think this is not a problem that can prevent you from entering the production environment, we can try to solve this problem together. You can ping me in pulsar slack general channel. my clack name is CongBo. Expect you to find me š¢
ā You are receiving this because you were mentioned. Reply to this email directly, view it on GitHubhttps://github.com/apache/pulsar/issues/8429#issuecomment-761937645, or unsubscribehttps://github.com/notifications/unsubscribe-auth/AANXMU4WPGB3KXWTSILZTPTS2OLTLANCNFSM4THYHL3Q.
Hey,
I am facing a similar issue. I am running pulsar 2.7.1 and I have a topic with AVRO schema where my producers push avro messages and consumer receives and process the avro data. My consumer application receives the message through pulsar client's MessageListener. All my topic messages are tagged with the latest avro schema version. And I could see that schema in the Pulsar cluster's SchemaRegistry.
But I am facing the following issue when my consumer application starts and consumes message.
org.apache.pulsar.shade.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.pulsar.shade.org.apache.avro.SchemaParseException: Cannot parse <null> schema
at org.apache.pulsar.shade.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051)
at org.apache.pulsar.shade.com.google.common.cache.LocalCache.get(LocalCache.java:3951)
at org.apache.pulsar.shade.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
at org.apache.pulsar.shade.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935)
at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:86)
at org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:60)
at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:301)
at org.apache.pulsar.client.impl.TopicMessageImpl.getValue(TopicMessageImpl.java:143)
at com.kohia.galaxy.listener.MessageListenerImpl.received(MessageListenerImpl.kt:20)
at org.apache.pulsar.client.impl.ConsumerBase.callMessageListener(ConsumerBase.java:882)
at org.apache.pulsar.client.impl.ConsumerBase.lambda$triggerListener$5(ConsumerBase.java:862)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.pulsar.shade.org.apache.avro.SchemaParseException: Cannot parse <null> schema
at org.apache.pulsar.shade.org.apache.avro.Schema.parse(Schema.java:1597)
at org.apache.pulsar.shade.org.apache.avro.Schema$Parser.parse(Schema.java:1396)
at org.apache.pulsar.shade.org.apache.avro.Schema$Parser.parse(Schema.java:1384)
at org.apache.pulsar.client.impl.schema.util.SchemaUtil.parseAvroSchema(SchemaUtil.java:46)
at org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader.loadReader(MultiVersionAvroReader.java:53)
at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:52)
at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:49)
at org.apache.pulsar.shade.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
at org.apache.pulsar.shade.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
at org.apache.pulsar.shade.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
at org.apache.pulsar.shade.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
... 18 common frames omitted
The above exception occurs for all the message that is received by the consumer. But when I restart my consumer application, it starts working fine and the above exception is not occurring. This issue occurs randomly when I restart my consumer application and when I again restarted it works fine and all the message are processed properly. Here is my subcription code,
pulsarClient
.newConsumer(AvroSchema.of(MyMetric::class))
.topic(topicName).subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared)
.receiverQueueSize(localQueueMsgCount)
.messageListener(myMessageListener)
.subscribe()
Here is my Listener code(short version),
import org.apache.pulsar.client.api.Consumer
import org.apache.pulsar.client.api.Message
import org.apache.pulsar.client.api.MessageListener
import java.net.InetAddress
class MyMessageListener<T>() : MessageListener<T> {
override fun received(consumer: Consumer<T>, msg: Message<T>) {
try {
process(msg.value)
consumer.acknowledge(msg)
} catch (ex: Exception) {
consumer.negativeAcknowledge(msg)
}
}
fun process(metric:MyMetric){}
}
When I debug, I could find the following log at the time when my consumer application is working perfectly.
Load schema reader for version(2),
schema is : {"type":"record","name":"MyMetric","namespace":"com.example","fields":[{"name":"assetId","type":"long"},{"name":"utctime","type":{"type":"string","avro.java.string":"String"}}]},
schemaInfo: {
"name": "mytenant/mynamesapce/mytopic",
"schema": {
"type": "record",
"name": "MyMetric",
"namespace": "com.example",
"fields": [
{
"name": "assetId",
"type": "long"
},
{
"name": "utctime",
"type": {
"type": "string",
"avro.java.string": "String"
}
}
]
},
"type": "AVRO",
"properties": {
"__alwaysAllowNull": "true",
"__jsr310ConversionEnabled": "false"
}
}
But at the time when my consumer application works weird, I could find the following log which doesn't have any schema info,
Load schema reader for version(2),
schema is : ,
schemaInfo: {
"name": "",
"schema": "",
"type": "NONE",
"properties": {}
}
Can u guys please help me resolve this issue. We are already in production and we find it diffficult to restart our consumer applications each and every time when this issue occurs.
@robshep @congbobo184 Did you find any root cause for this issue or any way to fix it, if so kindly share so that it may help us. Eagerly waiting for your inputs. :)
Hi No, sadly I cannot recreate this issue in isolation.
But It occurs on ~50% of redeploys and requires repeated restarts of the application to clear it.
We use TestContainers as part of the integration tests to spin up pulsar in isolation and it never happens in this scenario. (Empty schema registry when consumer and producer connects, then schema def on first use)
Iād be happy to spend time on this if anyone has any ideas on how I might recreate it.
Rob
Get Outlook for iOShttps://aka.ms/o0ukef
From: dhinesherode91 @.>
Sent: Thursday, June 24, 2021 2:56:18 AM
To: apache/pulsar @.>
Cc: Rob Shepherd @.>; Mention @.>
Subject: Re: [apache/pulsar] Cannot parse
Hey,
I am facing a similar issue. I am running pulsar 2.7.1 and I have a topic with AVRO schema where my producers push avro messages and consumer receives and process the avro data. My consumer application receives the message through pulsar client's MessageListener. All my topic messages are tagged with the latest avro schema version. And I could see that schema in the Pulsar cluster's SchemaRegistry.
But I am facing the following issue when my consumer application starts and consumes message.
org.apache.pulsar.shade.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.pulsar.shade.org.apache.avro.SchemaParseException: Cannot parse
The above exception occurs for all the message that is received by the consumer. But when I restart my consumer application, it starts working fine and the above exception is not occurring. This issue occurs randomly when I restart my consumer application and when I again restarted it works fine and all the message are processed properly. Here is my subcription code, pulsarClient .newConsumer(AvroSchema.of(MyMetric::class)) .topic(topicName).subscriptionName(subscriptionName) .subscriptionType(SubscriptionType.Shared) .receiverQueueSize(localQueueMsgCount) .messageListener(myMessageListener) .subscribe()
Here is my Listener code(short version),
import org.apache.pulsar.client.api.Consumer import org.apache.pulsar.client.api.Message import org.apache.pulsar.client.api.MessageListener import java.net.InetAddress class MyMessageListener
When I debug, I could find the following log at the time when my consumer application is working perfectly. Load schema reader for version(2), schema is : {"type":"record","name":"MyMetric","namespace":"com.example","fields":[{"name":"assetId","type":"long"},{"name":"utctime","type":{"type":"string","avro.java.string":"String"}}]}, schemaInfo: { "name": "mytenant/mynamesapce/mytopic", "schema": { "type": "record", "name": "MyMetric", "namespace": "com.example", "fields": [ { "name": "assetId", "type": "long" }, { "name": "utctime", "type": { "type": "string", "avro.java.string": "String" } } ] }, "type": "AVRO", "properties": { "alwaysAllowNull": "true", "jsr310ConversionEnabled": "false" } }
But at the time when my consumer application works weird, I could find the following log which doesn't have any schema info, Load schema reader for version(2), schema is : , schemaInfo: { "name": "", "schema": "", "type": "NONE", "properties": {} }
Can u guys please help me resolve this issue. We are already in production and we find it diffficult to restart our consumer applications each and every time when this issue occurs.
@robshephttps://github.com/robshep @congbobo184https://github.com/congbobo184 Did you find any root cause for this issue or any way to fix it, if so kindly share so that it may help us. Eagerly waiting for your inputs. :)
ā You are receiving this because you were mentioned. Reply to this email directly, view it on GitHubhttps://github.com/apache/pulsar/issues/8429#issuecomment-867272525, or unsubscribehttps://github.com/notifications/unsubscribe-auth/AANXMU74YNJET3RKHWSFL63TUKGEFANCNFSM4THYHL3Q.
Thanks for the information.
In my case, in my local I am running a standalone Pulsar and Consumer applications are Spring boot application. The above issue is not occurring in local.
But in staging and production setups, we are running Pulsar in kubernetes and our Consumer spring boot applications are running as Containers. In those setups I could see this issue more often and I have to restart my containers to make it work.
On debugging we could see like the schema is not available in the consumer application pulsar client local cache and due to that on every message receive when the pulsar client tries to convert that respective message into respective class object by parsing the schema, it throws this 'Cannot parse null schema' error.
Something related to having/loading SchemaInfo in the memory. It would be great if someone from the team could help us in fixing it.
@dhinesherode91 hi, this issue problem solved? if not, could you please provide the broker version and client version. Do your consumers consume multiple topics? look at the source code, it does not store the NONE
type schema. Maybe you should open the debug log on the broker side to locate the problem.
@congbobo184 The issue is not yet resolved. We are using Pulsar 2.7.1. Both broker and Pulsar client are using the same version. Our consumer is subscribed to a single topic only. Ours is an AVRO schema based topic. On debugging the code, I could arrive at one point, Once the consumer receives the first message it checks the schema version of the message and asks the broker for the schema details. At that time when the broker is serving heavy loads and is unable to process the request, then the client cannot get the schema and it can throw the above exception. Is it possible ?
Also I would like to know what is the proper configuration for Pulsar production load ? Currently our Pulsar will receive 10K messages per minute and will grow to 100K in near future. We have deployed our Pulsar in AWS Kubernetes (m5.xlarge instances nodegroup) with, 5 brokers (each 1CPU, 3Gi Memory), 5 bookies (each 1CPU, 4Gi Memory with separate ledger and journal mounted as EBS), 5 zookeeper (each 500m CPU, 2Gi Memory) , 5 proxy instances (each 500m CPU, 2Gi Memory). With this above configuration, we are facing connection exception occasionally. Kindly advise.
Do you have a stable way to reproduce this issue?
I can't relaiably re-produce this, but the issue persists.
Thanks @dhinesherode91 for adding your observations.
We do not have this load. We have a single standalone running on ample VM (no cpu, memory , i/o or disk pressures) with usual statistics output of 0.3msg/sec
We get this error when re-deploying the only consumer application. (producer remains connected).
Restarting one or more times resolves the issue to the point where a valid schema definition is printed to the console, not a null one. (so we deploy manually for now)
I can't comment on this timing related workaround (maybe, possibly, there is nothing in the queue, then on a restart there is - next time we re-deploy I will poll the admin interface for queue length and feedback
If we leave it, after a null schema is found, it does not recover and no msgs are processed on the topic.
However, We run the same application in integration tests daily, using an ephemeral pular-standalone in a docker container to test against and never see this issue.
Thanks @robshep feedback.
I did a test to reproduce this case:
public class Main {
public static void main(String[] args) throws Exception {
var pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
var topicName = "test-topic-" + System.currentTimeMillis();
var producer1 = pulsarClient
.newProducer(AvroSchema.of(MyPojo.class))
.topic(topicName).create();
producer1.send(new MyPojo());
var consumer = pulsarClient
.newConsumer(Schema.AVRO(MyPojo.class))
.topic(topicName).subscriptionName("test")
.subscriptionType(SubscriptionType.Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
.subscribe();
var producer2 = pulsarClient
.newProducer(Schema.STRING)
.topic(topicName).create();
producer2.send("test2");
var msg = consumer.receive();
msg.getValue(); // org.apache.pulsar.shade.org.apache.avro.SchemaParseException: Cannot parse <null> schema
}
}
We cannot use the old schema to decode the message with the latest schema.
If you have such a scenario, we need to avoid this.
@nodece is this issue a bug in the Pulsar side or depending on how users use it?
@robshep @dhinesherode91 could you please check if the topic has multiple schemas by the problem when it happens? or upgrade the pulsar version and test if this problem has been fixed
@tisonkun it seems like a user uses problem.
Moved to the Q&A category. If we find some bugs later, we can create issues correspondingly.
Describe the bug
Reconnecting an updated consumer application that had NOT changed any of the following:
This system was restarted from an update of application code unrelated to pulsar handling, and has two consumers. At first both consumers were failing but after a restart one is OK and the other continues to fail with the error:
I could see the schemas using pulsar-admin, and they look exactly as they should, and the producers are tied to the same.
For the two consumar connections in this application they should have been binding to schema versions 1 & 2 respectively, but both had the error mentioning schema version 0
My connection code:
The stacktrace:
To Reproduce Unable to re-produce.
Expected behaviour Consumers connect to the topics using the latest version from before the restart.
Desktop (please complete the following information):
Additional context None.