apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.23k stars 3.58k forks source link

PIP-236: Record schema in the request and carry to the broker when subscribing with AUTO_CONSUME schema. #19113

Open Denovo1998 opened 1 year ago

Denovo1998 commented 1 year ago

Motivation

Fixed the failure to use schema to create consumer after using AUTO-CONSUME consumer to subscribe an empty topic, and Broker returned the error message as IncompatibleSchemaException("Topic does not have schema to check"). https://github.com/apache/pulsar/blob/ed33fb399e661e4d47baeaaa8d0cdb3bfadc9546/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1037 https://github.com/apache/pulsar/blob/ed33fb399e661e4d47baeaaa8d0cdb3bfadc9546/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1147-L1152 https://github.com/apache/pulsar/blob/ed33fb399e661e4d47baeaaa8d0cdb3bfadc9546/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L3054-L3071 https://github.com/apache/pulsar/blob/ed33fb399e661e4d47baeaaa8d0cdb3bfadc9546/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java#L1162-L1177 We should record whether the active consumers of the Topic have one or more consumers whose SchemaType is not AUTO_CONSUME.

Goal

  1. On the client side, we add AutoConsume enum in the Schema.Type, and add protocol version in the ProtocolVersion.
  2. On the broker side, the SchemaType containing AUTO_CONSUME is recorded in org.apache.pulsar.broker.service.Consumer.

API Changes

Protocal change: Schema.Type and ProtocolVersion

message Schema {
    enum Type {
        AutoConsume = 21;
    }
}

enum ProtocolVersion {
    v21 = 21; // Carry the AUTO_CONSUME schema to the Broker after this version
}

Record SchemaType in Consumer

    @Getter
    private final SchemaType schemaType;

Implementation

On the client side

  1. Set "default" schemaInfo(schemaType=SchemaType.AUTO_CONSUME) in org.apache.pulsar.client.impl.schema.AutoConsumeSchema.
  2. Get and set SchemaInfo with AUTO_CONSUME schema in org.apache.pulsar.common.protocol.Commands#newSubscribe when schemaType is AUTO_CONSUME and proto version is greater than or equal to v21.

On the broker side

  1. When the schema is not null and schemaType not AUTO_CONSUME, then addSchemaIfIdleOrCheckCompatible.
  2. Get the schemaType and record it in Consumer.
  3. The active consumers schema of the Topic have one or more consumers schema whose not AUTO_CONSUME. Then checkSchemaCompatibleForConsumer.
    @Override
    public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
        return hasSchema().thenCompose((hasSchema) -> {
            int numActiveConsumersWithoutAutoSchema = subscriptions.values().stream()
                    .mapToInt(subscription -> subscription.getConsumers().stream()
                            .filter(consumer -> consumer.getSchemaType() != SchemaType.AUTO_CONSUME)
                            .toList().size())
                    .sum();
            if (hasSchema
                    || (!producers.isEmpty())
                    || (numActiveConsumersWithoutAutoSchema != 0)
                    || (ledger.getTotalSize() != 0)) {
                return checkSchemaCompatibleForConsumer(schema);
            } else {
                return addSchema(schema).thenCompose(schemaVersion ->
                        CompletableFuture.completedFuture(null));
            }
        });
    }

Alternatives

  1. On the client side, we add an optional field in the CommandSubscribe.

    add optional bool check_schema_compatibility = 20 [default = true];

    message CommandSubscribe {
       optional bool check_schema_compatibility = 20 [default = true];
    }
  2. On the broker side, record checkSchemaCompatibility in org.apache.pulsar.broker.service.Consumer.

  3. If checkSchemaCompatibility is true, schema compatibility check is required. If schemaType is AUTO_CONSUME, the checkSchemaCompatibility is false.

Anything else?

No response

Links

Discussion: https://lists.apache.org/thread/v7p88h7grqnbzocw34g6jvxjfw962kfd Vote: https://lists.apache.org/thread/pvcdlbflofoj41ryo1lrn0zlhj15bwpv PR: #17449

github-actions[bot] commented 1 year ago

The issue had no activity for 30 days, mark with Stale label.