jlouis / turtle

A wrapper on the RabbitMQ Erlang Client (Erlang)
Other
71 stars 15 forks source link

How to send info to subscriber? #21

Closed dcy closed 8 years ago

dcy commented 8 years ago

Hi! I want to send info to subscriber to tell subscriber refresh state. In my code:

gen_subscriber(Type, State, PoolSize) ->
    Name = erlang:list_to_atom(lists:concat([Type, "_subscriber"])),
    Mod = erlang:list_to_atom(lists:concat(["epush_", Type])),
    Queue = erlang:atom_to_binary(Type, utf8),
    Config = #{
      name => Name,
      connection => amqp_server,
      function => fun Mod:loop/4,
      handle_info => fun Mod:handle_info/2,
      init_state => State,
      declarations => [],
      subscriber_count => PoolSize,
      prefetch_count => PoolSize,
      consume_queue => Queue,
      passive => false
     },
    ServiceSpec = turtle_service:child_spec(Config),
    supervisor:start_child(turtle_sup, ServiceSpec).

start_worker(#{type:=huawei, app_id:=AppId, app_secret:=AppSecret,
               pool_size:=PoolSize} = Conf) ->
    AccessToken = epush_huawei:get_access_token(AppId, AppSecret),
    State = #{type=>huawei, app_id=>AppId, app_secret=>AppSecret, access_token=>AccessToken},
    {ok, Pid} = gen_subscriber(huawei, State, PoolSize),
    Pid = gproc:where({n,l,{turtle,service,huawei_subscriber}}),
    ChildSpecs = supervisor:which_children(Pid),
    PoolSpec = lists:keyfind(pool, 1, ChildSpecs),
    {pool, PoolPid, supervisor, [turtle_subscriber_pool]} = PoolSpec,
    SubsriberSpecs = supervisor:which_children(PoolPid),
    Fun = fun({_, SubsriberPid, worker, [turtle_subscriber]}) ->
                  erlang:send_after(6000, SubsriberPid, refresh_access_token)
          end,
    lists:foreach(Fun, SubsriberSpecs),
    ok;

But it is asynchronous, ChildSpecs = supervisor:which_children(Pid), can not get all children(sometime [], sometime one child), so i can not send info to them all. Could you give me some advice about this issue?

jlouis commented 8 years ago

This issue isn't an issue in turtle. Rather, the problem is, as you say, a problem with linearization of initialization. You are generally not guaranteed that the processes are available, and even if you ask for them, you have no such guarantee.

The easiest solution to this problem is to move the refreshing inside the service process itself and handle it inside Mod:loop/4. Turn the problem around by using the State. If there is no token, make one, and then set the refresh timer, which will hit in handle_info/2. If you need to use the same token in every child, create a process and register yourself to that process, so it can push out new tokens periodically to every subscriber (this can be done with gproc rather easily).

A patch, which may be useful is to allow State to be an initialization function, and make a PR on Turtle. This would allow you to run the initialization as if you were the target process in question.