Apache Pulsar is a cloud-native, distributed messaging framework which natively provides geo-replication. Many organizations deploy pulsar instances on-prem and on multiple different cloud providers and at the same time they would like to enable replication between multiple clusters deployed in different cloud providers. Pulsar already provides various proxy options (Pulsar proxy/ enterprise proxy solutions on SNI) to fulfill security requirements when brokers are deployed on different security zones connected with each other.
However, sometimes it's not possible to share metadata-store (global zookeeper) between pulsar clusters deployed on separate cloud provider platforms, and synchronizing configuration metadata (policies) can be a critical path to share tenant/namespace/topic policies between clusters and administrate pulsar policies uniformly across all clusters. Therefore, we need a mechanism to sync configuration metadata between clusters deployed on the different cloud platforms.
Goal
Replicated metadata event topic
All regions in a cluster that share the same metadata-store (eg: global zookeeper which persists policies) are already in sync but they are not in sync with regions which are in different clusters and not sharing the same metadata-store. We want to sync clusters that are not sharing the same config metadata store and in order to synchronize metadata store, we can pick one region from each cluster and set up a replicated topic across those regions where they can exchange change of metadata event and try to sync cluster with the change of events occurred at different clusters.
This PIP will introduce metadata event-topic which will be replicated between one region in every cloud platform or clusters that are not in sync and don’t share the same metadata store. The one region in every cluster/cloud-platform which will be responsible to generate local change events is known as a source region.
Each source-region in a cluster will generate a local change event to this replicated topic and this event eventually will be replicated to all other clusters present into other cloud platforms and those clusters can update their metadata stores with the new change of event.
In some cases, the source cluster might miss generating change events, or a new cluster is added which needs to be synchronized with metadata stored into another existing cluster. This solution also allows triggering snapshot synchronization to sync metadata-store into the new cluster.
Below data structure shows the payload of change events published into the event topic. Each event contains metadata of individual namespace/tenant/topic along with source cluster name and updated time of the event. Source-cluster name and updateTime helps destination clusters to handle stale or duplicate events.
Event
public class MetadataChangeEvent {
private EventType type;
private ResourceType resource;
private String resourceName;
private byte[] data;
private String sourceCluster;
private long updateTime;
public enum EventType {
Created, Modified, Deleted;
}
public enum ResourceType {
Tenants, Namespaces;
}
}
Handling race condition
Users can update the same policy with different values concurrently in different regions. Every region will eventually receive the updates from other remote regions where policy has been modified and Pulsar has to handle this scenario by merging (or selecting distinct value) the concurrent updates in a consistent manner across all regions. Therefore, each update contains modified-time and the name of source-region which has updated the value. Pulsar region compares local update and remote update based on latest modified timestamp and lexicographical ordering of source-region name and determines a final selected value deterministically across all regions and eventually all regions will have one distinct consistent value for the concurrently modified policy in the metadata store.
For example in the below diagram, Region-A and Region-B received an update for policy P1 at the same time T1. Both regions exchange the local event update with each other and both the regions have to pick only one distinct event from both the updates so, both the regions will have a consistent same update in the metadata store. First, each region compares events based on event updated timestamp and then based on the lexicographic ordering of source-region name. In this example, modified timestamp T1 is the same for both the events so, the next Pulsar selects event with source region-name A over source-region B based on lexicographic sorting on source region name. Therefore, both regions will eventually update metadata with a distinct event that occurred at region-A.
Implementation
Event publisher and handler
Every isolated cluster deployed on a separate cloud platform will have a source region and part of replicated clusters for the event topic. The Source region will have a broker which will create a failover consumer on that topic and a broker with an active consumer will watch the metadata changes and publish the changes to the event topic. The active consumer of the topic consumes the event and updates the metadata to its metadata store cluster. It also allows snapshot synchronization to sync metadata-store into the new cluster by scheduling a job that publishes snapshot of each policy periodically.
Broker changes
Configuration
# topic name to share policy changes
private String metadataSyncEventTopic;
# frequency of generating snapshot for stored policy to the event topic
private long metadataSyncSnapshotDurationSecond;
Tenant/Namespace metadata
Tenant and Namespace metadata will have below optional fields if the sync metadata feature is enabled. These fields are used to handle duplicate and stale events.
String lastUpdatedBy; // source cluster name
long lastUpdatedTime;// when source cluster has updated the metadata.
Event topic consumer and publisher
User can enable this feature by configuring metadataSyncEventTopic into broker and broker initializes MetadataPolicySyncer component which creates failover consumer to listen and handle metadata’s change events. It also sets the watch on metadata changes and publishes those changes to event topic so, other cluster’s source region can consume those events and sync their local metadata store.
Rejected alternative
Use System-topic to synchronize metadata across the cluster. It might not be the correct choice to utilize system-topic to handle metadata-store transportation. Because system topic helps broker to persist topic policies in that local cluster whereas Metadata-event synchronizer helps broker to copy metadata-store across two independent clusters which don't share metadata-store/global-zookeeper. Users will also not be able to use system-topic for metadata sync due to the below reasons:
storage and reliability: Not every user prefers or uses the system topic for the metadata storage due to multiple reasons such as legacy-system, higher reliability on metadata-store compared to system-topic stored in bookies.
Schema compatibility; System topic right now supports only topic level policies with a specific schema whereas the metadata change event requires a different schema for the metadata-store update.
Merging and handling capabilities: Metadata change event not only requires different schema but also requires special handling for create/update and merging capabilities. It will require unnecessary enhancement on system-topic to support merging capabilities.
Compaction requirement: system topic also requires compaction which all systems don't enable because compaction comes with an extra server-side cost which is very expensive for large scale and multi-tenant systems,
However, system-topic can work with metadata-synchronizer. System topic persists topic policies. Broker reads this compacted system topic to retrieve topic policies and applies them to the loaded topic. The broker can replicate metadata-store data to another destination broker that is part of a separate cluster using a metadata-synchronizer, and the destination broker can later persist policies in the local cluster by publishing them to system-topic.
Original Issue: apache/pulsar#13728
Sync Pulsar policies across multiple clouds
Implementation : PR: https://github.com/apache/pulsar/pull/15223
Motivation
Apache Pulsar is a cloud-native, distributed messaging framework which natively provides geo-replication. Many organizations deploy pulsar instances on-prem and on multiple different cloud providers and at the same time they would like to enable replication between multiple clusters deployed in different cloud providers. Pulsar already provides various proxy options (Pulsar proxy/ enterprise proxy solutions on SNI) to fulfill security requirements when brokers are deployed on different security zones connected with each other.
However, sometimes it's not possible to share metadata-store (global zookeeper) between pulsar clusters deployed on separate cloud provider platforms, and synchronizing configuration metadata (policies) can be a critical path to share tenant/namespace/topic policies between clusters and administrate pulsar policies uniformly across all clusters. Therefore, we need a mechanism to sync configuration metadata between clusters deployed on the different cloud platforms.
Goal
Replicated metadata event topic
All regions in a cluster that share the same metadata-store (eg: global zookeeper which persists policies) are already in sync but they are not in sync with regions which are in different clusters and not sharing the same metadata-store. We want to sync clusters that are not sharing the same config metadata store and in order to synchronize metadata store, we can pick one region from each cluster and set up a replicated topic across those regions where they can exchange change of metadata event and try to sync cluster with the change of events occurred at different clusters.
This PIP will introduce metadata event-topic which will be replicated between one region in every cloud platform or clusters that are not in sync and don’t share the same metadata store. The one region in every cluster/cloud-platform which will be responsible to generate local change events is known as a source region. Each source-region in a cluster will generate a local change event to this replicated topic and this event eventually will be replicated to all other clusters present into other cloud platforms and those clusters can update their metadata stores with the new change of event. In some cases, the source cluster might miss generating change events, or a new cluster is added which needs to be synchronized with metadata stored into another existing cluster. This solution also allows triggering snapshot synchronization to sync metadata-store into the new cluster.
Below data structure shows the payload of change events published into the event topic. Each event contains metadata of individual namespace/tenant/topic along with source cluster name and updated time of the event. Source-cluster name and updateTime helps destination clusters to handle stale or duplicate events.
Event
Handling race condition
Users can update the same policy with different values concurrently in different regions. Every region will eventually receive the updates from other remote regions where policy has been modified and Pulsar has to handle this scenario by merging (or selecting distinct value) the concurrent updates in a consistent manner across all regions. Therefore, each update contains modified-time and the name of source-region which has updated the value. Pulsar region compares local update and remote update based on latest modified timestamp and lexicographical ordering of source-region name and determines a final selected value deterministically across all regions and eventually all regions will have one distinct consistent value for the concurrently modified policy in the metadata store.
For example in the below diagram, Region-A and Region-B received an update for policy P1 at the same time T1. Both regions exchange the local event update with each other and both the regions have to pick only one distinct event from both the updates so, both the regions will have a consistent same update in the metadata store. First, each region compares events based on event updated timestamp and then based on the lexicographic ordering of source-region name. In this example, modified timestamp T1 is the same for both the events so, the next Pulsar selects event with source region-name A over source-region B based on lexicographic sorting on source region name. Therefore, both regions will eventually update metadata with a distinct event that occurred at region-A.
Implementation
Event publisher and handler
Every isolated cluster deployed on a separate cloud platform will have a source region and part of replicated clusters for the event topic. The Source region will have a broker which will create a failover consumer on that topic and a broker with an active consumer will watch the metadata changes and publish the changes to the event topic. The active consumer of the topic consumes the event and updates the metadata to its metadata store cluster. It also allows snapshot synchronization to sync metadata-store into the new cluster by scheduling a job that publishes snapshot of each policy periodically.
Broker changes
Configuration
Tenant/Namespace metadata Tenant and Namespace metadata will have below optional fields if the sync metadata feature is enabled. These fields are used to handle duplicate and stale events.
Event topic consumer and publisher
User can enable this feature by configuring metadataSyncEventTopic into broker and broker initializes MetadataPolicySyncer component which creates failover consumer to listen and handle metadata’s change events. It also sets the watch on metadata changes and publishes those changes to event topic so, other cluster’s source region can consume those events and sync their local metadata store.
Rejected alternative
Use System-topic to synchronize metadata across the cluster. It might not be the correct choice to utilize system-topic to handle metadata-store transportation. Because system topic helps broker to persist topic policies in that local cluster whereas Metadata-event synchronizer helps broker to copy metadata-store across two independent clusters which don't share metadata-store/global-zookeeper. Users will also not be able to use system-topic for metadata sync due to the below reasons:
However, system-topic can work with metadata-synchronizer. System topic persists topic policies. Broker reads this compacted system topic to retrieve topic policies and applies them to the loaded topic. The broker can replicate metadata-store data to another destination broker that is part of a separate cluster using a metadata-synchronizer, and the destination broker can later persist policies in the local cluster by publishing them to system-topic.