wandenberg / nginx-push-stream-module

A pure stream http push technology for your Nginx setup. Comet made easy and really scalable.
Other
2.22k stars 295 forks source link

Duplication message #97

Closed Terry-Mao closed 10 years ago

Terry-Mao commented 11 years ago

Hi: i found some issue, when store message in nginx-push-stream

this is the log from our Android Developer:

1. Request url: http://10.33.30.72/sub/5284047F4FFB4E04824A2FD1D1F0CD62; LastModify: Wed, 21 Aug 2013 10:25:34 GMT; ETAG: 0 2. push test0~9 to push-stream 3. get message test1-9(notice:not include test0) and get time and etag for next GET 4. Request url: http://10.33.30.72/sub/5284047F4FFB4E04824A2FD1D1F0CD62; LastModify: Wed, 21 Aug 2013 10:25:34 GMT; ETAG: 9 5. get message test 0 and get time and etag for next GET it seems OK,but i repush the message test10-19 (our configuration is store_message 15) 6. Request url: http://10.33.30.72/sub/5284047F4FFB4E04824A2FD1D1F0CD62; LastModify: Wed, 21 Aug 2013 10:28:56 GMT; ETAG: 0

ERROR! 7. i get message test5-19

in my opion, i can only get test10-19,not include test5-9...

so the notify message is duplicate......

wandenberg commented 11 years ago

Try to use the right request headers, if-modified-since and if-none-match to see if works like you expected. The last modified and etag ate response headers only. On Aug 22, 2013 5:41 AM, "Terry.Mao" notifications@github.com wrote:

Hi: i found some issue, when store message in nginx-push-stream

this is the log from our Android Developer:

1. Request url: http://10.33.30.72/sub/5284047F4FFB4E04824A2FD1D1F0CD62; LastModify: Wed, 21 Aug 2013 10:25:34 GMT; ETAG: 0 2. push test0~9 to push-stream 3. get message test1-9(notice:not include test0) and get time and etag for next GET 4. Request url: http://10.33.30.72/sub/5284047F4FFB4E04824A2FD1D1F0CD62; LastModify: Wed, 21 Aug 2013 10:25:34 GMT; ETAG: 9 5. get message test 0 and get time and etag for next GET it seems OK,but i repush the message test10-19 (our configuration is store_message 15) 6. Request url: http://10.33.30.72/sub/5284047F4FFB4E04824A2FD1D1F0CD62; LastModify: Wed, 21 Aug 2013 10:28:56 GMT; ETAG: 0

ERROR! 7. i get message test5-19

in my opion, i can only get test10-19,not include test5-9...

so the notify message is duplicate......

— Reply to this email directly or view it on GitHubhttps://github.com/wandenberg/nginx-push-stream-module/issues/97 .

Terry-Mao commented 11 years ago

we get response : String last_modified = (String)connection.getHeaderField("last-modified"); String etag = (String)connection.getHeaderField("etag");

use last_modified and etag send the request and set the header: connection.addRequestProperty("If-Modified-Since", mLastModify); connection.addRequestProperty("If-None-Match", mETag); connection.setRequestProperty("Connection", "Keep-Alive"); connection.setRequestProperty("Content-Type", "application/json"); connection.setRequestProperty("Charset", "UTF-8"); connection.setRequestProperty("Content-Transfer-Encoding", "binary");

if not works like expected

wandenberg commented 11 years ago

I can't see your entire code, but seems you save the response value in one variable (last_modified) and sent another variable (mLastModified) in the following request. Is this correct? On Aug 23, 2013 4:51 AM, "Terry.Mao" notifications@github.com wrote:

we get response : String last_modified = (String)connection.getHeaderField("last-modified"); String etag = (String)connection.getHeaderField("etag");

use last_modified and etag send the request and set the header: connection.addRequestProperty("If-Modified-Since", mLastModify); connection.addRequestProperty("If-None-Match", mETag); connection.setRequestProperty("Connection", "Keep-Alive"); connection.setRequestProperty("Content-Type", "application/json"); connection.setRequestProperty("Charset", "UTF-8"); connection.setRequestProperty("Content-Transfer-Encoding", "binary");

if not works like expected

— Reply to this email directly or view it on GitHubhttps://github.com/wandenberg/nginx-push-stream-module/issues/97#issuecomment-23148569 .

Terry-Mao commented 11 years ago

yes

String last_modified = (String)connection.getHeaderField("last-modified"); String etag = (String)connection.getHeaderField("etag"); mLastModify = last_modified; mETag = etag;

but still get duplication message (low rate....)

wandenberg commented 11 years ago

Can you reproduce this scenario using curl command? Or a simple java program? To be sure the problem isn't in your code, because I have some test written in ruby which are working exactly in this use case, send some messages in the same second and get only some of then using the right headers.

On Fri, Aug 23, 2013 at 9:02 AM, Terry.Mao notifications@github.com wrote:

yes

String last_modified = (String)connection.getHeaderField("last-modified"); String etag = (String)connection.getHeaderField("etag"); mLastModify = last_modified; mETag = etag;

but still get duplication message (low rate....)

— Reply to this email directly or view it on GitHubhttps://github.com/wandenberg/nginx-push-stream-module/issues/97#issuecomment-23159313 .

Terry-Mao commented 11 years ago

My program is like: send 10 message and subscribe the channel and resend 10 message

it's hard to reproduce... the code like: if (mLastModify == null || mLastModify.trim().length() == 0) return false;

        String url = combineURL(channels);
        url = "http://10.33.30.72/sub/5284047F4FFB4E04824A2FD1D1F0CD62";
        KLog.e(TAG, "---"+url);
        HttpURLConnection connection = null;
        int iUnknownHostEx = 0;
        int iInvalidRes= 0;
        Exception exception = null;
        while (!isCancelled()) {
            try {
                KLog.i(TAG, "run run run");
                connection = getConnection(url.toString());

                KLog.i(TAG, String.format("Request url: %s; LastModify: %s; ETAG: %s in thread: %d", url, mLastModify, mETag, mThreadId));

                long begin_time = System.currentTimeMillis();
                int resCode = connection.getResponseCode();
                long keep_alive_time = (System.currentTimeMillis() - begin_time) / 1000;
                KLog.i(TAG, String.format("Response Code: %d; Keep Alive: %d(s) in thread: %d", resCode, keep_alive_time, mThreadId));

                String last_modified = (String)connection.getHeaderField("last-modified");
                String etag = (String)connection.getHeaderField("etag");
                if (last_modified != null && last_modified.trim().length() > 0) {
                    PushPref.storeLastModifyAndETag(last_modified, etag);

                    mLastModify = last_modified;
                    mETag = etag;
                    KLog.i(TAG, String.format("Store last-modify: %s; etag: %s in thread: %d", mLastModify, mETag, mThreadId));
                }else {
                    KLog.e(TAG, "---last_modified null, mETag:" + mETag);
                }

                KLog.i(TAG, "run run run1" + resCode);
                if (resCode != 200) {
                    if (keep_alive_time < 120) {    // Keep_Alive 间隔小于2分钟,需要人为降低请求频率(一般在GPRS下会出现这种情况)
                        throw new WaitException();
                    } else {
                        throw new ContinueException();
                    }
                }

                KLog.i(TAG, "run run run2");
                InputStream ins = connection.getInputStream();
                boolean contain_valid_msg = parseInputstream(ins);
                ins.close();

                KLog.i(TAG, "run run run3");
                if (!contain_valid_msg) {   // 返回了无效内容(可能被GPRS网关劫持),强制等待15分钟后再试
                    KLog.e(TAG, "----msg error");
                    throw new InvalidResponse();
                }

// } catch (MalformedURLException e) { // KLog.w(TAG, "MalformedURLException in thread: " + mThreadId, e); // break;//推出循环 // } catch (WaitException e) { // KLog.w(TAG, "WaitException, resend request after 900(s) in thread: " + mThreadId); // sleep(15 * 60); // 15分钟后再发送请求 // }catch (InvalidResponse e) { // KLog.w(TAG, "InvalidResponse, resend request exponential in thread: " + mThreadId); // sleep((int)Math.pow(2, iInvalidRes>16 ? iInvalidRes=0: iInvalidRes++)); // /*delete by fqc // * sleep(15 * 60); // 15分钟后再发送请求 // / // } catch (ContinueException e) { // KLog.w(TAG, "ContinueException, resend request after 5s in thread: " + mThreadId); // sleep(5); } catch (IOException e) { exception = e; if(e instanceof MalformedURLException) {
KLog.w(TAG, "MalformedURLException in thread: " + mThreadId, e); break;//推出循环

                }else if(e instanceof WaitException) {              
                    KLog.w(TAG, "WaitException, resend request after 900(s) in thread: " + mThreadId);
                    sleep(15 * 60); // 15分钟后再发送请求

                }else if(e instanceof InvalidResponse) {        
                    KLog.w(TAG, "InvalidResponse, resend request exponential  in thread: " + mThreadId);
                    KLog.w(TAG, "InvalidResponse, sleep time: " + (int)Math.pow(2, iInvalidRes>16 ? iInvalidRes=0: iInvalidRes));

// sleep((int)Math.pow(2, iInvalidRes>16 ? iInvalidRes=0: iInvalidRes++)); sleep(6); }else if(e instanceof ContinueException) {
KLog.w(TAG, "ContinueException, resend request after 5s in thread: " + mThreadId); sleep(5);

                }else if(e instanceof SocketException) {                        /**when the connect type changed*/

                    KLog.e(TAG, "---JUDGE is SocketException");

                } else if(e instanceof SocketTimeoutException) {        /**get connection time out, is set by setReadTimeout or setConnectTimeout*/

                    KLog.e(TAG, "---JUDGE is SocketTimeoutException");

                } else if(e instanceof UnknownHostException){

                    KLog.e(TAG, "---JUDGE is UnknownHostException");

/* int curConnectType = PhoneUtil.getConnectType(); if(iUnknownHostEx > 0 && curConnectType != isUnknownHost) { iUnknownHostEx = 0; KLog.e(TAG, "--iUnknownHostEx set 0"); } isUnknownHost = curConnectType;/ KLog.w(TAG, "IOException in thread: " + mThreadId, e); KLog.e(TAG, "IOException in thread, time= " + (int)Math.pow(2, iUnknownHostEx>16 ? iUnknownHostEx=0: iUnknownHostEx)+" ioExceptionCount:"+iUnknownHostEx); sleep((int)Math.pow(2, iUnknownHostEx>16 ? iUnknownHostEx=0: iUnknownHostEx++));

                } else {

                    KLog.e(TAG, "---JUDGE is others");
                    sleep(5);

                }
                continue;
            } finally {
                if(!(exception instanceof UnknownHostException) ) {
                    iUnknownHostEx = 0;
                    KLog.e(TAG, "---set iUnknownHostEx 0");
                }
                if(!(exception instanceof InvalidResponse) ) {
                    iInvalidRes = 0;
                    KLog.e(TAG, "---set iInvalidRes 0");
                }
                if (connection != null) {
                    KLog.e(TAG, "--disconnect the connection");
                    connection.disconnect();
                    connection = null;
                }
            }
        }

        return true;
wandenberg commented 10 years ago

Hi @Terry-Mao do you still having this problem? if yes, send me your server configuration please.

Terry-Mao commented 10 years ago

push_stream_shared_memory_size 2048M; push_stream_shared_memory_cleanup_objects_ttl 30s; push_stream_message_ttl 240m; push_stream_max_messages_stored_per_channel 15; chunked_transfer_encoding on; chunkin on;

server { listen xxx:80; server_name xxx; charset utf-8; keepalive_timeout 600;

    location /date {
            timesyn;
            access_log off;
    }

    location ~ /sub/(.*) {
            set $push_stream_channels_path  $1;
            push_stream_subscriber  long-polling;
            push_stream_authorized_channels_only    off;
            push_stream_content_type        text/plain;
            push_stream_subscriber_connection_ttl   600s;
            push_stream_longpolling_connection_ttl  600s;
            access_log  /data/logs/nginx/notify_sub.log notify;
    }

    location ~ /sub2/(.*) {
            set $push_stream_channels_path  $1;
            push_stream_subscriber  long-polling;
            push_stream_authorized_channels_only    off;
            push_stream_content_type        text/plain;
            push_stream_subscriber_connection_ttl   600s;
            push_stream_longpolling_connection_ttl  600s;
            access_log  /data/logs/nginx/notify_sub2.log notify;
    }

    error_page 411 = @my_411_error;

    location @my_411_error {
            chunkin_resume;
    }

}

still have this problem

wandenberg commented 10 years ago

Hi @Terry-Mao

I notice that you are using an old module version. I will assume that is the 0.3.5 version.

In your code you aren't setting the proper headers as far as I saw. Here is the code and the configuration I used to test and it is working. Pay attention to these lines.

if (lastModify != null) {
    connection.setRequestProperty("If-Modified-Since", lastModify);
}

if (etag != null) {
    connection.setRequestProperty("If-None-Match", etag);
}

Tthese were the steps I did:

And received the messages as expected. If you still having problems send me the complete code by email, if want to.

Complete code:

package com.test;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.ProtocolException;
import java.net.URL;

public class LongPolling {

    public static void main(String[] args) throws Exception {
        HttpURLConnection connection = null;
        BufferedReader rd = null;
        StringBuilder sb = null;
        String line = null;

        URL serverAddress = null;

        String lastModify = null;
        String etag = null;

        while (true) {
            try {
                serverAddress = new URL("http://localhost:9080/sub/ch1");
                // set up out communications stuff
                connection = null;

                // Set up the initial connection
                connection = (HttpURLConnection) serverAddress.openConnection();
                connection.setRequestMethod("GET");
                connection.setDoOutput(true);
                connection.setReadTimeout(600000);

                if (lastModify != null) {
                    connection.setRequestProperty("If-Modified-Since", lastModify);
                }

                if (etag != null) {
                    connection.setRequestProperty("If-None-Match", etag);
                }

                connection.connect();

                // read the result from the server
                rd = new BufferedReader(new InputStreamReader(connection.getInputStream()));
                sb = new StringBuilder();

                while ((line = rd.readLine()) != null)
                {
                    sb.append(line + '\n');
                }

                lastModify = connection.getHeaderField("Last-Modified");
                etag = connection.getHeaderField("Etag");

                System.out.println(sb.toString());
                System.out.println(String.format("Last-Modified: %s Etag: %s", lastModify, etag));

            } catch (MalformedURLException e) {
                e.printStackTrace();
            } catch (ProtocolException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            } finally
            {
                // close the connection, set all objects to null
                connection.disconnect();
                rd = null;
                sb = null;
                connection = null;
            }
        }
    }

}

nginx.conf:

pid         logs/nginx.pid;
error_log   logs/nginx-main_error.log debug;

events {
  worker_connections  1024;
  use                 epoll;
}

http {
  access_log      logs/nginx-http_access.log;
  push_stream_shared_memory_size  2048M;
  push_stream_shared_memory_cleanup_objects_ttl   30s;
  push_stream_message_ttl 240m;
  push_stream_max_messages_stored_per_channel     15;

  server {
    listen 9080;
    charset utf-8;
    keepalive_timeout 600;

    location ~ /sub/(.*) {
      set $push_stream_channels_path  $1;
      push_stream_subscriber  long-polling;
      push_stream_authorized_channels_only    off;
      push_stream_content_type        text/plain;
      push_stream_subscriber_connection_ttl   600s;
      push_stream_longpolling_connection_ttl  600s;
      access_log logs/notify_sub.log;
    }

    location /pub {
      push_stream_publisher admin;
      set $push_stream_channel_id  $arg_id;
      push_stream_store_messages              on;
    }
  }
}
Terry-Mao commented 10 years ago

i use your code above, i don't get the same message again, thks a lot!

akunz commented 10 years ago

Thanks for the detailed example code :+1: