Notes
The AoP plugin supports only the 0-9-1 protocol with basic produce and consume functionalities, and does not include advanced features such as transactions. It is available as an open-source plugin and is only offered as a private preview feature in the Private Cloud distribution as an experimental feature. It is not available on StreamNative Cloud. Please use it with caution.
AoP stands for AMQP on Pulsar. AoP broker supports AMQP0-9-1 protocol, and is backed by Pulsar.
AoP is implemented as a Pulsar ProtocolHandler with protocol name "amqp". ProtocolHandler is built as a nar
file, and is loaded when Pulsar Broker starts.
AoP is implemented based on Pulsar features. However, the methods of using Pulsar and using AMQP are different. The following are some limitations of AoP.
In this guide, you will learn how to use the Pulsar broker to serve requests from AMQP client.
Download Pulsar 2.6.1 binary package apache-pulsar-2.6.1-bin.tar.gz
. and unzip it.
You can download aop nar file from the AoP releases.
To build from code, complete the following steps:
git clone https://github.com/streamnative/aop.git
cd aop
mvn clean install -DskipTests
You can find the nar file in the following directory.
./amqp-impl/target/pulsar-protocol-handler-amqp-${version}.nar
Name | Description | Default |
---|---|---|
amqpTenant | AMQP on Pulsar broker tenant | public |
amqpListeners | AMQP service port | amqp://127.0.0.1:5672 |
amqpMaxNoOfChannels | The maximum number of channels which can exist concurrently on a connection | 64 |
amqpMaxFrameSize | The maximum frame size on a connection | 4194304 (4MB) |
amqpHeartBeat | The default heartbeat timeout of AoP connection | 60 (s) |
amqpProxyPort | The AMQP proxy service port | 5682 |
amqpProxyEnable | Whether to start proxy service | false |
As mentioned above, AoP module is loaded with Pulsar broker. You need to add configs in Pulsar's config file, such as broker.conf
or standalone.conf
.
You need to add messagingProtocols
(the default value is null
) and protocolHandlerDirectory
(the default value is "./protocols"), in Pulsar configuration files, such as broker.conf
or standalone.conf
. For AoP, the value for messagingProtocols
is amqp
; the value for protocolHandlerDirectory
is the directory of AoP nar file.
The following is an example.
messagingProtocols=amqp
protocolHandlerDirectory=./protocols
Set AMQP service listeners
. Note that the hostname value in listeners is the same as Pulsar broker's advertisedAddress
.
The following is an example.
amqpListeners=amqp://127.0.0.1:5672
advertisedAddress=127.0.0.1
With the above configuration, you can start your Pulsar broker. For details, refer to Pulsar Get started guides.
cd apache-pulsar-2.6.1
bin/pulsar standalone
In Pulsar log4j2.yaml config file, you can set AoP log level.
The following is an example.
Logger:
- name: io.streamnative.pulsar.handlers.amqp
level: debug
additivity: false
AppenderRef:
- ref: Console
There is also other configs that can be changed and placed into Pulsar broker config file. <!what's the "other configs"?>
If you want to make contributions to AMQP on Pulsar, follow the following steps.
From version 2.11.0, the AoP need JDK 17.
Dependency | Installation guide |
---|---|
Java 17 | https://openjdk.java.net/install/ |
Maven | https://maven.apache.org/ |
Clone code to your machine.
git@github.com:streamnative/aop.git
Build the project.
mvn install -DskipTests
Fork
button (top right) to establish a cloud-based fork.Create your clone.
$ cd $working_dir
$ git clone https://github.com/$user/aop
Set your clone to track upstream repository.
$ cd $working_dir/aop
$ git remote add upstream https://github.com/streamnative/aop.git
Use the git remote -v
command, you find the output looks as follows:
origin https://github.com/$user/aop.git (fetch)
origin https://github.com/$user/aop.git (push)
upstream https://github.com/streamnative/aop (fetch)
upstream https://github.com/streamnative/aop (push)
Get your local master up to date.
$ cd $working_dir/aop
$ git checkout master
$ git fetch upstream
$ git rebase upstream/master
$ git push origin master
Branch from master.
$ git checkout -b myfeature
You can now edit the code on the myfeature
branch.
Commit your changes.
$ git add <filename>
$ git commit -m "$add a comment"
Likely you'll go back and edit-build-test in a few cycles.
The following commands might be helpful for you.
$ git add <filename> (used to add one file)
git add -A (add all changes, including new/delete/modified files)
git add -a -m "$add a comment" (add and commit modified and deleted files)
git add -u (add modified and deleted files, not include new files)
git add . (add new and modified files, not including deleted files)
When your commit is ready for review (or just to establish an offsite backup of your work), push your branch to your fork on github.com
:
$ git push origin myfeature
$user
obviously).Compare & pull request
button next to your myfeature
branch.Once you open your pull request, at least two reviewers will participate in reviewing. Those reviewers will conduct a thorough code review, looking for correctness, bugs, opportunities for improvement, documentation and comments, and style.
Commit changes made in response to review comments to the same branch on your fork.
Very small PRs are easy to review. Very large PRs are very difficult to review.
Clone this project from GitHub to your local.
git clone https://github.com/streamnative/aop.git
cd aop
Build the project.
mvn clean install -DskipTests
Copy the nar package to Pulsar protocols directory.
cp ./amqp-impl/target/pulsar-protocol-handler-amqp-${version}.nar $PULSAR_HOME/protocols/pulsar-protocol-handler-amqp-${version}.nar
Modify Pulsar standalone configuration
# conf file: $PULSAR_HOME/conf/standalone.conf
# add amqp configs
messagingProtocols=amqp
protocolHandlerDirectory=./protocols
amqpListeners=amqp://127.0.0.1:5672
advertisedAddress=127.0.0.1
Start Pulsar in standalone mode.
$PULSAR_HOME/bin/pulsar standalone
Add namespace for vhost.
# for example, the vhost name is `vhost1`
bin/pulsar-admin namespaces create -b 1 public/vhost1
# set retention for the namespace
bin/pulsar-admin namespaces set-retention -s 100M -t 2d public/vhost1
Use RabbitMQ client test
# add RabbitMQ client dependency in your project
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
// Java Code
// create connection
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("vhost1");
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchange = "ex";
String queue = "qu";
// exchage declare
channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT, true, false, false, null);
// queue declare and bind
channel.queueDeclare(queue, true, false, false, null);
channel.queueBind(queue, exchange, "");
// publish some messages
for (int i = 0; i < 100; i++) {
channel.basicPublish(exchange, "", null, ("hello - " + i).getBytes());
}
// consume messages
CountDownLatch countDownLatch = new CountDownLatch(100);
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("receive msg: " + new String(body));
countDownLatch.countDown();
}
});
countDownLatch.await();
// release resource
channel.close();
connection.close();
To use proxy, complete the following steps. If you do not know some detailed steps, refer to Deploy a cluster on bare metal.
Prepare ZooKeeper cluster.
Initialize cluster metadata.
Prepare bookkeeper cluster.
Copy the pulsar-protocol-handler-amqp-${version}.nar
to the $PULSAR_HOME/protocols
directory.
Start broker.
broker config
messagingProtocols=amqp
protocolHandlerDirectory=./protocols
brokerServicePort=6651
amqpListeners=amqp://127.0.0.1:5672
amqpProxyEnable=true
amqpProxyPort=5682
Reset the number of the namespace public/default to 1
.
$PULSAR_HOME/bin/pulsar-admin namespaces delete public/default
$PULSAR_HOME/bin/pulsar-admin namespaces create -b 1 public/default
$PULSAR_HOME/bin/pulsar-admin namespaces set-retention -s 100M -t 3d public/default
Prepare exchange and queue for test.
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("/");
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5682);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String ex = "ex-perf";
String qu = "qu-perf";
channel.exchangeDeclare(ex, BuiltinExchangeType.DIRECT, true);
channel.queueDeclare(qu, true, false, false, null);
channel.queueBind(qu, ex, qu);
channel.close();
connection.close();
Download RabbitMQ perf tool and test.
$RABBITMQ_PERF_TOOL_HOME/bin/runjava com.rabbitmq.perf.PerfTest -e ex-perf -u qu-perf -r 1000 -h amqp://127.0.0.1:5682 -p
This library is licensed under the terms of the Apache License 2.0 and may include packages written by third parties which carry their own copyright notices and license terms.
Founded in 2019 by the original creators of Apache Pulsar, StreamNative is one of the leading contributors to the open-source Apache Pulsar project. We have helped engineering teams worldwide make the move to Pulsar with StreamNative Cloud, a fully managed service to help teams accelerate time-to-production.