hdima / erlport

ErlPort - connect Erlang to other languages
http://erlport.org
BSD 3-Clause "New" or "Revised" License
627 stars 131 forks source link

Enhancement request for monitoring erlport callers #35

Open dynamicbutter opened 8 years ago

dynamicbutter commented 8 years ago

We are using erlport to communicate with pools of Python processes in a production system. Each Python OS process has a 1-1 gen_server that utilizes erlport for bi-directional communication between the erlang VM and Python. The problem we have is the pooler library we are using does brutal_kill on the gen_server when a pool is shutdown leading to an untrappable exit of the gen_server. So we do not have an opportunity in our gen_server to terminate the port and we end up leaking Python processes. One way to resolve this problem is to enhance erlport to monitor its callers and call port_close whenever the last caller associated with the port has been shutdown. Here is a solution that I have tested in our environment.

$ git diff
diff --git a/src/erlport.erl b/src/erlport.erl
index ffa0670..afdf85c 100644
--- a/src/erlport.erl
+++ b/src/erlport.erl
@@ -122,9 +122,10 @@ init(Fun) when is_function(Fun, 0) ->
 %% @doc Synchronous event handler
 %% @hidden
 %%
-handle_call(Call={call, _M, _F, _A, Options}, From, State=#state{})
+handle_call(Call={call, _M, _F, _A, Options}, {Pid, _Ref} = From, State)
         when is_list(Options) ->
-    send_request(Call, From, Options, State);
+    NewState = monitor_caller(Pid, State),
+    send_request(Call, From, Options, NewState);
 handle_call(Request, From, State) ->
     Error = {unknown_call, ?MODULE, Request, From},
     {reply, Error, State}.
@@ -160,6 +161,21 @@ handle_info({'EXIT', Pid, {Id, Result}}, State=#state{calls=Calls}) ->
         error ->
             {noreply, State}
     end;
+handle_info({'DOWN', Ref, process, Pid, _Reason}, State=#state{port=Port, callers=Callers}) ->
+    case orddict:find(Pid, Callers) of
+        error ->
+            {noreply, State};
+        _ ->
+            erlang:demonitor(Ref),
+            Callers2 = orddict:erase(Pid, Callers),
+            case orddict:size(Callers2) of
+                0 ->
+                    port_close(Port),
+                    {stop, shutdown, State#state{callers=Callers2}};
+                _ ->
+                    {noreply, State#state{callers=Callers2}}
+            end
+    end;
 handle_info({erlport_timeout, {in, Id}}, State=#state{calls=Calls}) ->
     case orddict:find(Id, Calls) of
         {ok, {Pid, _Timer}} ->
@@ -410,3 +426,15 @@ incoming_call(Id, Module, Function, Args, _Context, State=#state{
             Calls2 = orddict:store(Id, Info, Calls),
             {noreply, State#state{calls=Calls2}}
     end.
+
+%%
+%% @doc Add monitor for caller
+%%
+monitor_caller(Pid, State=#state{callers = Callers}) ->
+    case orddict:find(Pid, Callers) of
+        error ->
+            Ref = erlang:monitor(process, Pid),
+            State#state{callers=orddict:store(Pid, Ref, Callers)};
+        _ ->
+            State
+    end.
diff --git a/src/erlport.hrl b/src/erlport.hrl
index d3eb01b..0d4bad7 100644
--- a/src/erlport.hrl
+++ b/src/erlport.hrl
@@ -40,7 +40,9 @@
     % orddict(): CallId -> {From::term(), Timer::reference() | undefined}
     sent = orddict:new() :: list(),
     % orddict(): CallId -> {Pid::pid(), Timer::reference()}
-    calls = orddict:new() :: list()
+    calls = orddict:new() :: list(),
+    % orddict(): CallerId -> Monitor::reference()
+    callers = orddict:new() :: list()
     }).

 -endif. % ERLPORT_HRL
matehat commented 8 years ago

We solved that same problem by having a global "watchdog" process in our node that monitors each such gen_server and the python worker it's using, so that when the gen_server dies, the watchdog calls python:stop/1

dynamicbutter commented 8 years ago

Yeah, that's certainly another way to do it. I think a lot of users are likely to run into this issue so enhancing erlport seems like the right thing to do.

matehat commented 8 years ago

Agree

I would suggest you make a pull request with the proposed changes