wandenberg / nginx-push-stream-module

A pure stream http push technology for your Nginx setup. Comet made easy and really scalable.
Other
2.21k stars 295 forks source link

Publish from other nginx modules #283

Closed RekGRpth closed 4 years ago

RekGRpth commented 5 years ago

Is it possible to publish events into channels from other nginx modules?

wandenberg commented 5 years ago

Not yet. Only if the module can execute a POST to 'publish' location.

RekGRpth commented 5 years ago

I use curl_easy in module to execute a POST to publish location, but it is very bad

RekGRpth commented 5 years ago

I want call some internal method for publish without any location etc.

wandenberg commented 5 years ago

Feel free to expose the function that adds a message to a channel and call it from the other module. Pull requests are more than welcome. ;)

RekGRpth commented 5 years ago

Unfortunatelly, I don't understood, how I can

expose the function that adds a message to a channel

wandenberg commented 5 years ago

After all the validations and parsers done when a message is posted the module uses the function ngx_http_push_stream_add_msg_to_channel to really publish the message. One way would be your other module prepare all the parameters this function requires and call it. Then would be possible to 'publish' a message from another module without having to post to Nginx

RekGRpth commented 5 years ago

I seen this function ngx_http_push_stream_add_msg_to_channel already. Is it really possible to prepare all its parameters from other module?

RekGRpth commented 5 years ago

I did it!!! Great thank You!

diff --git a/include/ngx_http_push_stream_module_utils.h b/include/ngx_http_push_stream_module_utils.h
index 8c99cf1..8d29fd8 100644
--- a/include/ngx_http_push_stream_module_utils.h
+++ b/include/ngx_http_push_stream_module_utils.h
@@ -263,6 +263,7 @@ static void                 ngx_http_push_stream_complex_value(ngx_http_request_

 ngx_int_t                   ngx_http_push_stream_add_msg_to_channel(ngx_http_push_stream_main_conf_t *mcf, ngx_log_t *log, ngx_http_push_stream_channel_t *channel, u_char *text, size_t len, ngx_str_t *event_id, ngx_str_t *event_type, ngx_flag_t store_messages, ngx_pool_t *temp_pool);
+void                        ngx_http_push_stream_add_msg_to_channel_my(ngx_log_t *log, ngx_str_t *id, ngx_str_t *text, ngx_str_t *event_id, ngx_str_t *event_type, ngx_flag_t store_messages, ngx_pool_t *temp_pool);
 ngx_int_t                   ngx_http_push_stream_send_event(ngx_http_push_stream_main_conf_t *mcf, ngx_log_t *log, ngx_http_push_stream_channel_t *channel, ngx_str_t *event_id, ngx_pool_t *temp_pool, ngx_http_request_t *r);

 static void                 ngx_http_push_stream_ping_timer_wake_handler(ngx_event_t *ev);
diff --git a/src/ngx_http_push_stream_module_utils.c b/src/ngx_http_push_stream_module_utils.c
index 6f35a59..e47ac50 100644
--- a/src/ngx_http_push_stream_module_utils.c
+++ b/src/ngx_http_push_stream_module_utils.c
@@ -371,6 +371,27 @@ ngx_http_push_stream_convert_char_to_msg_on_shared(ngx_http_push_stream_main_con
     return msg;
 }

+void
+ngx_http_push_stream_add_msg_to_channel_my(ngx_log_t *log, ngx_str_t *id, ngx_str_t *text, ngx_str_t *event_id, ngx_str_t *event_type, ngx_flag_t store_messages, ngx_pool_t *temp_pool)
+{
+    ngx_http_push_stream_global_shm_data_t *global_data = (ngx_http_push_stream_global_shm_data_t *) ngx_http_push_stream_global_shm_zone->data;
+    for (ngx_queue_t *data_q = ngx_queue_head(&global_data->shm_datas_queue); data_q != ngx_queue_sentinel(&global_data->shm_datas_queue); data_q = ngx_queue_next(data_q)) {
+        ngx_http_push_stream_shm_data_t *data = ngx_queue_data(data_q, ngx_http_push_stream_shm_data_t, shm_data_queue);
+        ngx_http_push_stream_main_conf_t *mcf = data->mcf;
+        ngx_http_push_stream_channel_t *channel = ngx_http_push_stream_find_channel(id, log, mcf);
+        if (channel != NULL) {
+            if (ngx_http_push_stream_add_msg_to_channel(mcf, log, channel, text->data, text->len, event_id, event_type, store_messages, temp_pool) != NGX_OK) {
+                ngx_log_error(NGX_LOG_ERR, log, 0, "ngx_http_push_stream_add_msg_to_channel != NGX_OK");
+            }
+        }
+    }
+}

 ngx_int_t
 ngx_http_push_stream_add_msg_to_channel(ngx_http_push_stream_main_conf_t *mcf, ngx_log_t *log, ngx_http_push_stream_channel_t *channel, u_char *text, size_t len, ngx_str_t *event_id, ngx_str_t *event_type, ngx_flag_t store_messages, ngx_pool_t *temp_pool)