apache / pulsar-helm-chart

Official Apache Pulsar Helm Chart
https://pulsar.apache.org/
Apache License 2.0
212 stars 224 forks source link

Add support for using Oxia as the metadata store for Pulsar and BookKeeper #544

Closed yuweisung closed 3 days ago

yuweisung commented 2 weeks ago

Feature: add oxia template

Motivation

Adding oxia templates in helmchart.

Modifications

Just add the template with the correct variables. The oxia component is disabled in default values.yaml.

Verifying this change

yuweisung commented 2 weeks ago

next step: disable zk and modify the init script to integrate pulsar with oxia.

yuweisung commented 2 weeks ago

what's done: 1) add oxia templates from streamnative. 2) add oxia components in Values.yaml 3) modify the oxia templates to fit Values.yaml 4) verify the template by "helm template test1 ." locally.

lhotari commented 4 days ago

@yuweisung I added CI for Oxia changes and noticed quite a few gaps. I got some addressed, but there are remaining gaps in getting things integrated from end-to-end.

lhotari commented 4 days ago

@merlimat what are the recommended default Oxia namespaces for brokers and bookies? Is it simply broker and bookkeeper? What command is needed for creating Oxia namespaces?

merlimat commented 4 days ago

@merlimat what are the recommended default Oxia namespaces for brokers and bookies? Is it simply broker and bookkeeper?

Yes, it should be a good start, with 3 shards each.

What command is needed for creating Oxia namespaces?

The namespaces are automatically created by updating coordinator config-map.

eg: https://github.com/streamnative/oxia/blob/main/deploy/charts/oxia-cluster/templates/coordinator-configmap.yaml#L23-L26

lhotari commented 4 days ago

@merlimat what are the recommended default Oxia namespaces for brokers and bookies? Is it simply broker and bookkeeper?

Yes, it should be a good start, with 3 shards each.

What command is needed for creating Oxia namespaces?

The namespaces are automatically created by updating coordinator config-map.

eg: https://github.com/streamnative/oxia/blob/main/deploy/charts/oxia-cluster/templates/coordinator-configmap.yaml#L23-L26

@merlimat I've made changes accordingly.

I can see that Oxia is working, however there are problems.

│ Name: pulsar-ci-oxia-coordinator-status │ │ Namespace: pulsar │ │ Labels: │ │ Annotations: │ │ │ │ Data │ │ ==== │ │ status: │ │ ---- │ │ namespaces: │ │ bookkeeper: │ │ replicationFactor: 1 │ │ shards: │ │ 1: │ │ status: 1 │ │ term: 13 │ │ leader: │ │ public: pulsar-ci-oxia-svc.pulsar.svc.cluster.local:6648 │ │ internal: pulsar-ci-oxia-svc.pulsar.svc:6649 │ │ ensemble: │ │ - public: pulsar-ci-oxia-svc.pulsar.svc.cluster.local:6648 │ │ internal: pulsar-ci-oxia-svc.pulsar.svc:6649 │ │ removedNodes: [] │ │ int32HashRange: │ │ min: 0 │ │ max: 4294967295 │ │ broker: │ │ replicationFactor: 1 │ │ shards: │ │ 0: │ │ status: 1 │ │ term: 13 │ │ leader: │ │ public: pulsar-ci-oxia-svc.pulsar.svc.cluster.local:6648 │ │ internal: pulsar-ci-oxia-svc.pulsar.svc:6649 │ │ ensemble: │ │ - public: pulsar-ci-oxia-svc.pulsar.svc.cluster.local:6648 │ │ internal: pulsar-ci-oxia-svc.pulsar.svc:6649 │ │ removedNodes: [] │ │ int32HashRange: │ │ min: 0 │ │ max: 4294967295 │ │ shardIdGenerator: 2 │ │ serverIdx: 0

The problem is that the bookkeeper init job fails. the last error message in logs is:

2024-11-21T12:58:53,552+0000 [oxia-client-1-1] INFO  io.streamnative.oxia.client.shard.ShardManager - Retry creating stream for shard assignments namespace=bookkeeper
2024-11-21T12:58:53,555+0000 [grpc-default-worker-ELG-2-1] WARN  io.streamnative.oxia.client.shard.ShardManager - Failed receiving shard assignments.
io.grpc.StatusRuntimeException: UNIMPLEMENTED: unknown service io.streamnative.oxia.proto.OxiaClient
    at io.grpc.Status.asRuntimeException(Status.java:539) ~[io.grpc-grpc-api-1.56.1.jar:1.56.1]
    at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:491) ~[io.grpc-grpc-stub-1.56.1.jar:1.56.1]
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:567) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:71) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:735) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:716) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.SerializeReentrantCallsDirectExecutor.execute(SerializeReentrantCallsDirectExecutor.java:49) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.closedInternal(ClientCallImpl.java:743) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.closed(ClientCallImpl.java:683) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.RetriableStream$4.run(RetriableStream.java:830) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:94) ~[io.grpc-grpc-api-1.56.1.jar:1.56.1]
    at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:126) ~[io.grpc-grpc-api-1.56.1.jar:1.56.1]
    at io.grpc.internal.RetriableStream.safeCloseMasterListener(RetriableStream.java:825) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.RetriableStream.access$2200(RetriableStream.java:55) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.RetriableStream$Sublistener.closed(RetriableStream.java:1018) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.ForwardingClientStreamListener.closed(ForwardingClientStreamListener.java:34) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.InternalSubchannel$CallTracingTransport$1$1.closed(InternalSubchannel.java:691) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.AbstractClientStream$TransportState.closeListener(AbstractClientStream.java:458) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.AbstractClientStream$TransportState.access$400(AbstractClientStream.java:221) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.AbstractClientStream$TransportState$1.run(AbstractClientStream.java:441) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.AbstractClientStream$TransportState.deframerClosed(AbstractClientStream.java:278) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.Http2ClientStreamTransportState.deframerClosed(Http2ClientStreamTransportState.java:31) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.MessageDeframer.close(MessageDeframer.java:234) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.MessageDeframer.closeWhenComplete(MessageDeframer.java:192) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.AbstractStream$TransportState.closeDeframer(AbstractStream.java:201) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.AbstractClientStream$TransportState.transportReportStatus(AbstractClientStream.java:444) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.AbstractClientStream$TransportState.transportReportStatus(AbstractClientStream.java:400) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.AbstractClientStream$TransportState.inboundTrailersReceived(AbstractClientStream.java:383) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.internal.Http2ClientStreamTransportState.transportTrailersReceived(Http2ClientStreamTransportState.java:183) ~[io.grpc-grpc-core-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.grpc.netty.NettyClientStream$TransportState.transportHeadersReceived(NettyClientStream.java:334) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler.onHeadersRead(NettyClientHandler.java:379) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler.access$1200(NettyClientHandler.java:93) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler$FrameListener.onHeadersRead(NettyClientHandler.java:936) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead(DefaultHttp2ConnectionDecoder.java:409) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead(DefaultHttp2ConnectionDecoder.java:337) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onHeadersRead(Http2InboundFrameLogger.java:56) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader$2.processFragment(DefaultHttp2FrameReader.java:476) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readHeadersFrame(DefaultHttp2FrameReader.java:484) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:253) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:159) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:173) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:393) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:453) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.grpc-grpc-netty-shaded-1.56.1.jar:1.56.1]
    at java.base/java.lang.Thread.run(Unknown Source) [?:?]

in conf/bookkeeper.conf there's

metadataServiceUri=metadata-store:oxia://pulsar-ci-oxia-svc:6649/bookkeeper

Connection works from the pod:

pulsar-ci-bookie-init-gz88q:/pulsar$ nc -v -z pulsar-ci-oxia-svc 6649
pulsar-ci-oxia-svc (10.244.1.20:6649) open

I wonder how the problem can be fixed? /cc @mattisonchao

lhotari commented 4 days ago

Last logs on coordinator:

│ {"level":"info","time":"2024-11-21T12:38:36.001675891Z","component":"shard-controller","namespace":"bookkeeper","shard":1,"term":13,"time":"2024-11-21T12:38:36.001711227Z","message":"Starting leader election"}                                                                                                                                                              │
│ {"level":"info","time":"2024-11-21T12:38:36.033715641Z","component":"shard-controller","entry-id":{"term":"-1", "offset":"-1"},"namespace":"bookkeeper","server-address":{"public":"pulsar-ci-oxia-svc.pulsar.svc.cluster.local:6648","internal":"pulsar-ci-oxia-svc.pulsar.svc:6649"},"shard":1,"time":"2024-11-21T12:38:36.033813915Z","message":"Processed newTerm response │
│ {"level":"info","time":"2024-11-21T12:38:36.033840474Z","component":"shard-controller","followers":[],"namespace":"bookkeeper","new-leader":{"public":"pulsar-ci-oxia-svc.pulsar.svc.cluster.local:6648","internal":"pulsar-ci-oxia-svc.pulsar.svc:6649"},"shard":1,"term":13,"time":"2024-11-21T12:38:36.033903832Z","message":"Successfully moved ensemble to a new term"}   │
│ {"level":"info","time":"2024-11-21T12:38:36.04050441Z","component":"shard-controller","leader":{"public":"pulsar-ci-oxia-svc.pulsar.svc.cluster.local:6648","internal":"pulsar-ci-oxia-svc.pulsar.svc:6649"},"namespace":"bookkeeper","shard":1,"term":13,"time":"2024-11-21T12:38:36.040516162Z","message":"Elected new leader"}                                              │
│ {"level":"info","time":"2024-11-21T12:38:36.040532943Z","component":"shard-controller","leader":{"public":"pulsar-ci-oxia-svc.pulsar.svc.cluster.local:6648","internal":"pulsar-ci-oxia-svc.pulsar.svc:6649"},"namespace":"bookkeeper","shard":1,"time":"2024-11-21T12:38:36.040539566Z","message":"Shard is ready"}                                                           │
│ {"level":"info","time":"2024-11-21T12:38:37.967300662Z","component":"shard-controller","namespace":"broker","shard":0,"term":13,"time":"2024-11-21T12:38:37.967319758Z","message":"Starting leader election"}                                                                                                                                                                  │
│ {"level":"info","time":"2024-11-21T12:38:37.99129862Z","component":"shard-controller","entry-id":{"term":"-1", "offset":"-1"},"namespace":"broker","server-address":{"public":"pulsar-ci-oxia-svc.pulsar.svc.cluster.local:6648","internal":"pulsar-ci-oxia-svc.pulsar.svc:6649"},"shard":0,"time":"2024-11-21T12:38:37.991327354Z","message":"Processed newTerm response"}    │
│ {"level":"info","time":"2024-11-21T12:38:37.991407553Z","component":"shard-controller","followers":[],"namespace":"broker","new-leader":{"public":"pulsar-ci-oxia-svc.pulsar.svc.cluster.local:6648","internal":"pulsar-ci-oxia-svc.pulsar.svc:6649"},"shard":0,"term":13,"time":"2024-11-21T12:38:37.991421479Z","message":"Successfully moved ensemble to a new term"}       │
│ {"level":"info","time":"2024-11-21T12:38:38.002184498Z","component":"shard-controller","leader":{"public":"pulsar-ci-oxia-svc.pulsar.svc.cluster.local:6648","internal":"pulsar-ci-oxia-svc.pulsar.svc:6649"},"namespace":"broker","shard":0,"term":13,"time":"2024-11-21T12:38:38.002203163Z","message":"Elected new leader"}                                                 │
│ {"level":"info","time":"2024-11-21T12:38:38.002277151Z","component":"shard-controller","leader":{"public":"pulsar-ci-oxia-svc.pulsar.svc.cluster.local:6648","internal":"pulsar-ci-oxia-svc.pulsar.svc:6649"},"namespace":"broker","shard":0,"time":"2024-11-21T12:38:38.002287801Z","message":"Shard is ready"}
lhotari commented 4 days ago

@yuweisung If you'd like to debug issues in GitHub Actions, you can ssh into the running build job VM when you open a PR to your own fork. ssh authentication will happen with your public keys at https://github.com/yuweisung.keys . This is how I've been working on this: https://github.com/lhotari/pulsar-helm-chart/pull/12 . Each job will have ssh connection details.

image

You can run "k9s" after the k8s cluster is running.

lhotari commented 3 days ago

One gap in Pulsar is the lack of a org.apache.pulsar.packages.management.core.PackagesStorage implementation that works with Oxia. The org.apache.pulsar.packages.management.storage.bookkeeper.BookKeeperPackagesStorage implementation uses the DistributedLog library which requires Zookeeper.

error message:

pulsar-ci-toolset-0:/pulsar$ bin/pulsar-admin functions create --tenant pulsar-ci --namespace test --name test-function --inputs "pulsar-ci/test/test_input" --output "pulsar-ci/test/test_output" --parallelism 1 --classname org.apache.pulsar.functions.api.examples.ExclamationFunction --jar /pulsar/examples/api-examples.jar
Cannot invoke "org.apache.distributedlog.api.namespace.Namespace.logExists(String)" because "dlogNamespace" is null

Reason: Cannot invoke "org.apache.distributedlog.api.namespace.Namespace.logExists(String)" because "dlogNamespace" is null

I have disabled function tests for Oxia in CI to make the tests pass. We can merge this PR without functions support for Oxia.