zeromq / chumak

Pure Erlang implementation of ZeroMQ Message Transport Protocol.
Mozilla Public License 2.0
197 stars 47 forks source link

Subscriber topic filtering should match only the start not the entire string #43

Closed shishirpy closed 3 years ago

shishirpy commented 3 years ago

According to the zeromq api guide: http://api.zeromq.org/master:zmq-setsockopt

ZMQ_SUBSCRIBE: Establish message filter

The ZMQ_SUBSCRIBE option shall establish a new message filter on a ZMQ_SUB socket. Newly created ZMQ_SUB sockets shall filter out all incoming messages, therefore you should call this option to establish an initial message filter.

An empty option_value of length zero shall subscribe to all incoming messages. A non-empty option_value shall subscribe to all messages beginning with the specified prefix. Multiple filters may be attached to a single ZMQ_SUB socket, in which case a message shall be accepted if it matches at least one filter.
Option value type | binary data -- | -- Option value unit | N/A Default value | N/A Applicable socket types | ZMQ_SUB

The subscription should filter out messages that do not start with the filter.

The current implementation searches for the given filter in the entire message and not just the beginning. The error can be reproduced using the following code:

-module(wuserver).
-export([main/0]).

main() ->
    application:start(chumak),
    {ok, Socket} = chumak:socket(pub),

    case chumak:connect(Socket, tcp, "localhost", 5555) of
        {ok, _BindPid} ->
            io:format("Binding OK wiht Pid: ~p\n", [Socket]);
        {error, Reason} ->
            io:format("Connection Failed for this reason: ~p\n", [Reason]);
        X ->
            io:format("Unhandled reply for bind ~p \n", [X])
    end,
    loop(Socket).

loop(Socket) ->
    Zipcode = rand:uniform(100000),
    Temperature = rand:uniform(135),
    Relhumidity = rand:uniform(50) + 10,

    BinZipCode = erlang:integer_to_binary(Zipcode),
    BinTemperature = erlang:integer_to_binary(Temperature),
    BinRelhumidity = erlang:integer_to_binary(Relhumidity),
    io:format(" SENDING: ~p\n", [[BinZipCode, " ", BinTemperature, " ", BinRelhumidity]]),
    ok = chumak:send(Socket, [BinZipCode, " ", BinTemperature, " ", BinRelhumidity]),
    timer:sleep(100),
    loop(Socket).
-module(wuclient).
-export([main/0]).

main() ->
    application:start(chumak),
    {ok, Socket} = chumak:socket(sub),
    Topic = ["12"],
    chumak:subscribe(Socket, Topic),
    case chumak:bind(Socket, tcp, "localhost", 5555) of
        {ok, _BindPid} ->
            io:format("Binding OK with Pid: ~p\n", [Socket]);
        {error, Reason} ->
            io:foramt("Connection failed for this reason: ~p\n", [Reason]);
        X ->
            io:format("Unhandled reply for bind ~p \n", [X])
    end,
    loop(Socket).

loop(Socket) ->
    {ok, Data1} = chumak:recv(Socket),
    io:format("Received ~p\n", [Data1]),
    loop(Socket).

Once, the code is compiled do the following:

  1. Open two erlang shells.
  2. In first shell start the client with wuclient:main().
  3. In the second shell start the server with wuserver:main().
  4. The result in the client will be something like this:
    Binding OK with Pid: <0.191.0>
    Received <<"78811 12 15">>
    Received <<"27336 120 47">>
    Received <<"45253 120 42">>
    Received <<"76712 80 28">>
    Received <<"7334 135 12">>
    Received <<"92121 105 44">>
    Received <<"49187 127 48">>
    Received <<"58301 135 12">>
  5. We expect only the messages starting with 12 should be shown. But the messages show have 12 somewhere in the text.
shishirpy commented 3 years ago

The error is caused because of the way binary:match(FirstPart, PeerSubscriptions) function is used in chumak_subscription.erl file. There should be a third case which makes sure the first of the message is a match.

drozzy commented 3 years ago

Thanks for the report. Can you contribute the PR for this? I'd be happy to merge it in.

shishirpy commented 3 years ago

I'll do that, my concern is that there are some unit-tests related to encryption that are not passing on my local. I'll raise new issues for them.