zeromq / chumak

Pure Erlang implementation of ZeroMQ Message Transport Protocol.
Mozilla Public License 2.0
197 stars 47 forks source link

Check if killing the socket process actually closes the socket #18

Closed drozzy closed 6 years ago

drozzy commented 6 years ago

It is not clear to me whether this trap exit: https://github.com/zeromq/chumak/blob/master/src/chumak_socket.erl#L34

shuts down the socket (and not only the peer): https://github.com/zeromq/chumak/blob/master/src/chumak_socket.erl#L109

vegabook commented 6 years ago

Hi so I have done some test.

Here is a Python router socket that just pings pack whatever it gets. Anaconda python distribution contains all the dependencies.

# test elixir/erlang :chumak sockets
from __future__ import print_function
from datetime import datetime, timedelta
import socket
import zmq
import pdb
import argparse
import sys
import msgpack

parser = argparse.ArgumentParser()
parser.add_argument("--reqport", default = "5560")  
parser.add_argument("--myip", default = "192.168.1.117")  

args = parser.parse_args()

if __name__ == "__main__":
    context = zmq.Context()
    req = context.socket(zmq.ROUTER)  
    req.bind("tcp://" + str(args.myip) + ":" + str(args.reqport))

    # sort out the poller
    poller = zmq.Poller() 
    poller.register(req, zmq.POLLIN)

    while True: 
        polls = dict(poller.poll(1000))

        if req in polls:
            msg = req.recv_multipart()
            sender = msg[0]
            body = msg[1]
            print(msgpack.unpackb(body))
            req.send_multipart([sender, body])
            print(msg)

Here is an Elixir module with function quote_handler that starts up a new Chumak socket, sends stuff over, and prints back the response.


defmodule Dbase.BBworker do

  def quote_handler(ticker, sender, req_id) do
    case :chumak.socket(:dealer, req_id) do
      {:ok, sock} -> 
        {:ok, _} = :chumak.connect(sock, :tcp, '192.168.1.117', 5560)
        :chumak.send_multipart(sock, [Msgpax.pack!(["tick", "123", ticker], iodata: false)])
        {:ok, msg} = :chumak.recv_multipart(sock)
        IO.inspect(Msgpax.unpack!(msg))
      {:error, err_name} -> IO.inspect(err_name)
    end
  end

  def print_something(what) do
    IO.puts(what)
  end

end

You'll need this under deps in your mix.exs file at the root directory of the project:

  defp deps do
    [
      {:msgpax, "~> 2.0"},
      {:chumak, "~> 1.2.0"},
      {:poison, "~> 3.1"}
      # {:dep_from_hexpm, "~> 0.3.0"},
      # {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"},
    ]
  end

Now I will demonstrate that chumak does not clean up its processes giving an "already started" error.

tbrowne@calculon:~/Dropbox/code/elixir/dbase$ iex -S mix
Erlang/OTP 19 [erts-8.3] [source] [64-bit] [smp:8:8] [async-threads:10] [hipe] [kernel-poll:false]

Interactive Elixir (1.5.2) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> :observer.start()
:ok
iex(2)> pp = spawn(Dbase.BBworker, :quote_handler, ["USD", self(), '1']
...(2)> )
#PID<0.178.0>
["tick", "123", "USD"]
iex(3)> Process.alive? pp
false
iex(4)> pp = spawn(Dbase.BBworker, :quote_handler, ["USD", self(), '2'])
#PID<0.186.0>
["tick", "123", "USD"]
iex(5)> Process.alive? pp                                               
false
iex(6)> pp = spawn(Dbase.BBworker, :quote_handler, ["USD", self(), '2'])
#PID<0.193.0>
{:already_started, #PID<0.187.0>}
iex(7)> pp = spawn(Dbase.BBworker, :quote_handler, ["USD", self(), '3'])
#PID<0.195.0>
["tick", "123", "USD"]
iex(8)> 

Here are the screenshots from :observer to show that chumak processes are not dying. They're sitting there and don't get killed even though the spawned process that launched them is no longer alive.

screenshot from 2017-10-12 11-14-33 screenshot from 2017-10-12 11-15-13 screenshot from 2017-10-12 11-15-39 screenshot from 2017-10-12 11-16-10

vegabook commented 6 years ago

Oh I saw that the sock variable is actually a process ID. So I tried to kill those explicitly using Process.exit(sock, :kill) but because these are supervised by chumak_sup, it seems they're always restarted. So I don't know, basically, how to close the socket.

drozzy commented 6 years ago

Woah! Thanks for the great report.

drozzy commented 6 years ago

If you've got time @vegabook, please look this over.

I'm going to close it for now, feel free to re-open!

shishirpy commented 3 years ago

Oh I saw that the sock variable is actually a process ID. So I tried to kill those explicitly using Process.exit(sock, :kill) but because these are supervised by chumak_sup, it seems they're always restarted. So I don't know, basically, how to close the socket.

I also noticed this, a simple exit(Socket, kill) will not kill the socket because it is supervised. Although the socket traps exits, it is only used to stop the peers and not the socket itself.

Since, the Socket is a gen_server, the best way to stop the Socket is to use gen_server:stop(Socket), this sends the shutdown exit signal so the supervisor knows not to restart the process.