second, in spark scala, i send a message into topic, use the Users schema:
import spatio.topics.SchemaUtils.Users
import org.apache.pulsar.client.api.{PulsarClient, Schema}
object clientProducer {
def main(args: Array[String]): Unit = {
val user = new Users("werewr", 35)
println(user)
val service_url_tn = "****"
val topic = "persistent://st/forever/scala-Users-topic"
val client = PulsarClient.builder()
.serviceUrl(service_url_tn)
.build()
val producer = client.newProducer(Schema.AVRO(classOf[Users])).topic(topic).create()
producer.send(user)
println("send success" + user.toString())
}
}
third, the messge has send success,
./bin/pulsar-admin schemas get persistent://st/forever/scala-Users-topic
four, in spark scala, i read the topic, and get the message,
object clientConsumer {
implicit val formats = Serialization.formats(NoTypeHints)
def main(args: Array[String]): Unit = {
val service_url_tn = "****"
val client = PulsarClient.builder()
.serviceUrl(service_url_tn)
.build()
val topic = "persistent://st/forever/scala-Users-topic"
val consumer = client.newConsumer(Schema.AVRO(classOf[Users]))
.topic(topic)
.subscriptionName("read-my-topic")
.subscribeAsync().join()
var cnt = 0
while (true) {
val user = consumer.receive()
println(write(user.getValue)) // 将类转为json输出
cnt +=1
println(cnt)
}
b'\x02\x0cwerewrF'
Traceback (most recent call last):
File "***/schemaConsume.py", line 38, in
print(msg.value())
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pulsar/init.py", line 180, in value
return self._schema.decode(self._message.data())
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pulsar/schema/schema_avro.py", line 70, in decode
d = fastavro.schemaless_reader(buffer, self._schema)
File "fastavro/_read.pyx", line 976, in fastavro._read.schemaless_reader
File "fastavro/_read.pyx", line 988, in fastavro._read.schemaless_reader
File "fastavro/_read.pyx", line 659, in fastavro._read._read_data
File "fastavro/_read.pyx", line 516, in fastavro._read.read_record
File "fastavro/_read.pyx", line 651, in fastavro._read._read_data
File "fastavro/_read.pyx", line 436, in fastavro._read.read_union
IndexError: list index out of range
Original Issue: apache/pulsar#12596
What issue do you find in Pulsar docs?
https://pulsar.apache.org/docs/en/client-libraries-python/ from the doc, i can not read topic mess by use python,
spark:3.1.2
scala:2.12
python:3.7
pulsar:2.8.0
spark-plusar:
python-pulsar: python3 -m pip install pulsar-client=='2.8.1' python3 -m pip install pulsar-client[avro]
first, in spark scala, i create a topic schema:
object SchemaUtils { class Users () { var name: String = null var age: Int = -1
} }
second, in spark scala, i send a message into topic, use the Users schema: import spatio.topics.SchemaUtils.Users import org.apache.pulsar.client.api.{PulsarClient, Schema}
object clientProducer { def main(args: Array[String]): Unit = { val user = new Users("werewr", 35) println(user) val service_url_tn = "****" val topic = "persistent://st/forever/scala-Users-topic" val client = PulsarClient.builder() .serviceUrl(service_url_tn) .build() val producer = client.newProducer(Schema.AVRO(classOf[Users])).topic(topic).create() producer.send(user) println("send success" + user.toString()) } }
third, the messge has send success,
./bin/pulsar-admin schemas get persistent://st/forever/scala-Users-topic
{ "version": 0, "schemaInfo": { "name": "scala-Users-topic", "schema": { "type": "record", "name": "Users", "namespace": "spatio.topics.SchemaUtils$", "fields": [ { "name": "name", "type": [ "null", "string" ] }, { "name": "age", "type": "int" } ] }, "type": "AVRO", "properties": { "__alwaysAllowNull": "true" } } }
four, in spark scala, i read the topic, and get the message,
object clientConsumer { implicit val formats = Serialization.formats(NoTypeHints) def main(args: Array[String]): Unit = { val service_url_tn = "****" val client = PulsarClient.builder() .serviceUrl(service_url_tn) .build()
} }
and get : log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. {"name":"werewr","age":35} 1
five, so in python, i want to read the topic, but get error,
class Users(Record): name = String() age = Integer()
and read topic:
from pulsar.schema import AvroSchema import pulsar from pulsar import MessageId from pulsar import InitialPosition import schemaProduce import pulsar_topics
client = pulsar.Client('****') topic = "persistent://st/forever/scala-Users-topic" msg_id = MessageId.earliest topic_schema = AvroSchema(pulsar_topics.Users) print(topic_schema, type(topic_schema)) consumer = client.subscribe(topic = topic, subscription_name='python-read', schema = topic_schema, initial_position=InitialPosition.Earliest) while True: msg = consumer.receive() print(msg.data()) print(msg.value())
consumer.close() client.close()
but get the erro:
b'\x02\x0cwerewrF' Traceback (most recent call last): File "***/schemaConsume.py", line 38, in
print(msg.value())
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pulsar/init.py", line 180, in value
return self._schema.decode(self._message.data())
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pulsar/schema/schema_avro.py", line 70, in decode
d = fastavro.schemaless_reader(buffer, self._schema)
File "fastavro/_read.pyx", line 976, in fastavro._read.schemaless_reader
File "fastavro/_read.pyx", line 988, in fastavro._read.schemaless_reader
File "fastavro/_read.pyx", line 659, in fastavro._read._read_data
File "fastavro/_read.pyx", line 516, in fastavro._read.read_record
File "fastavro/_read.pyx", line 651, in fastavro._read._read_data
File "fastavro/_read.pyx", line 436, in fastavro._read.read_union
IndexError: list index out of range
so, how read the topic message?