wandenberg / nginx-push-stream-module

A pure stream http push technology for your Nginx setup. Comet made easy and really scalable.
Other
2.22k stars 295 forks source link

Events channel, tag field #260

Closed materkov closed 7 years ago

materkov commented 7 years ago

It seems that tag field is always equal to 1 for messages in events channel:

1|Tue, 23 May 2017 10:28:04 GMT|{"type": "client_subscribed", "channel": "user_conn_track.user-1"}
1|Tue, 23 May 2017 10:28:04 GMT|{"type": "client_unsubscribed", "channel": "user_conn_track.user-1"}
1|Tue, 23 May 2017 10:28:04 GMT|{"type": "client_subscribed", "channel": "user_conn_track.user-1"}
1|Tue, 23 May 2017 10:28:05 GMT|{"type": "client_unsubscribed", "channel": "user_conn_track.user-1"}

So, there is a small possibility to receive duplicates messages, when retrieving old message from history channel.

Is this a bug? Any workaround for this problem?

wandenberg commented 7 years ago

Not sure if I understood your question @materkov . Which tag are you talking about? The events channel should kind behave like any other channel from the perspective of publishing messages. So it increases a sequence humber when the messages are published on the same second. Where are you seeing equal tags ?

materkov commented 7 years ago

@wandenberg I'll try to explain more. Here is config that I am using:

push_stream_shared_memory_size 128M;
push_stream_events_channel_id connection_events_channel;
push_stream_ping_message_text "ping";
push_stream_message_template  "{\"id\": ~id~, \"eventid\": ~event-id~, \"channel\": \"~channel~\", \"text\": ~text~, \"tag\": ~tag~, \"time\": \"~time~\"}";
push_stream_store_messages on;

server {
    listen 80 default_server;

    location /pub {
        # activate publisher (admin) mode for this location
        push_stream_publisher admin;

        # query string based channel id
        push_stream_channels_path $arg_id;
    }

    location ~ /sub_ws/(.*) {
        push_stream_subscriber websocket;

        push_stream_last_received_message_time      "$arg_time";
        push_stream_last_received_message_tag       "$arg_tag";

        # positional channel path
        push_stream_channels_path $1;
    }

    location ~ /sub_events_channel {
        push_stream_allow_connections_to_events_channel on;
        push_stream_subscriber websocket;

        push_stream_ping_message_interval 2s;

        push_stream_last_received_message_time      $arg_time;
        push_stream_last_received_message_tag       $arg_tag;

        # positional channel path
        push_stream_channels_path connection_events_channel;
    }
}

Then, I am sending 4 messages very quickly:

curl http://10.1.2.6:8200/pub?id=12 -d "dsad"

In normal channel, everything seems to be OK:

ws://10.1.2.6:8200/sub/11/12 

{"id": 11, "eventid": , "channel": "12", "text": dsad, "tag": 1, "time": "Tue, 23 May 2017 12:41:44 GMT"}
{"id": 12, "eventid": , "channel": "12", "text": dsad, "tag": 2, "time": "Tue, 23 May 2017 12:41:44 GMT"}
{"id": 13, "eventid": , "channel": "12", "text": dsad, "tag": 3, "time": "Tue, 23 May 2017 12:41:44 GMT"}
{"id": 14, "eventid": , "channel": "12", "text": dsad, "tag": 4, "time": "Tue, 23 May 2017 12:41:44 GMT"}

Tags are different here. Then, I've tried to quickly connect and disconnect websocket connection. Here is what I see in events channel:


{"id": 47, "eventid": , "channel": "connection_events_channel", "text": {"type": "client_unsubscribed", "channel": "11"}, "tag": 1, "time": "Tue, 23 May 2017 12:44:41 GMT"}
{"id": 48, "eventid": , "channel": "connection_events_channel", "text": {"type": "client_unsubscribed", "channel": "12"}, "tag": 1, "time": "Tue, 23 May 2017 12:44:41 GMT"}
{"id": 49, "eventid": , "channel": "connection_events_channel", "text": {"type": "client_subscribed", "channel": "11"}, "tag": 1, "time": "Tue, 23 May 2017 12:44:41 GMT"}
{"id": 50, "eventid": , "channel": "connection_events_channel", "text": {"type": "client_subscribed", "channel": "12"}, "tag": 1, "time": "Tue, 23 May 2017 12:44:41 GMT"}
{"id": 51, "eventid": , "channel": "connection_events_channel", "text": {"type": "client_unsubscribed", "channel": "11"}, "tag": 1, "time": "Tue, 23 May 2017 12:44:41 GMT"}
{"id": 52, "eventid": , "channel": "connection_events_channel", "text": {"type": "client_unsubscribed", "channel": "12"}, "tag": 1, "time": "Tue, 23 May 2017 12:44:41 GMT"}
{"id": 53, "eventid": , "channel": "connection_events_channel", "text": {"type": "client_subscribed", "channel": "11"}, "tag": 1, "time": "Tue, 23 May 2017 12:44:41 GMT"}
{"id": 54, "eventid": , "channel": "connection_events_channel", "text": {"type": "client_subscribed", "channel": "12"}, "tag": 1, "time": "Tue, 23 May 2017 12:44:41 GMT"}
{"id": 55, "eventid": , "channel": "connection_events_channel", "text": {"type": "client_unsubscribed", "channel": "11"}, "tag": 1, "time": "Tue, 23 May 2017 12:44:41 GMT"}
{"id": 56, "eventid": , "channel": "connection_events_channel", "text": {"type": "client_unsubscribed", "channel": "12"}, "tag": 1, "time": "Tue, 23 May 2017 12:44:41 GMT"}

As you can see, tag is 1 for every message (but it should be different, because time is Tue, 23 May 2017 12:44:41 GMT for every message)

wandenberg commented 7 years ago

I will check this. Should not work like that. Besides that this channel was not created to be used with history.

materkov commented 7 years ago

Thank you. I am looking forward to your reply.

By the way, what I am trying to do is some kind of daemon. This tiny daemon will listen connect/disconnect events and re-send them to message broker. Then, our backend will consume this messages and perform some processing (e.g. presence tracking, or some other stuff). I want to make sure that this daemon works reliable and don't loose events in periods of code deployment/restarting/or some troubles. To do this, I am tracking tag&time of the last message, and then retrieve history events when daemon connected again. If tag is 1 for every message, I am recieving all messages for this second (except first message) in next reconnect.

wandenberg commented 7 years ago

@materkov sorry for the delay. Can you give it a try on this branch fix_tag_in_concurrent_messages ?

materkov commented 7 years ago

Thank you. I can confirm that problem is solved in this branch.

Just one note, there was an error:

In file included from ../nginx-push-stream-module/src/ngx_http_push_stream_module.c:29:0:
../nginx-push-stream-module/src/ngx_http_push_stream_module_utils.c: In function 'ngx_http_push_stream_convert_char_to_msg_on_shared':
../nginx-push-stream-module/src/ngx_http_push_stream_module_utils.c:273:48: error: unused variable 'shm_data' [-Werror=unused-variable]
     ngx_http_push_stream_shm_data_t           *shm_data = mcf->shm_data;
                                                ^
cc1: all warnings being treated as errors
make[1]: *** [objs/addon/src/ngx_http_push_stream_module.o] Error 1

After removing this line, build was successful.