staskobzar / goami2

Simple Asterisk Manager Interface (AMI) library fo golang
MIT License
14 stars 4 forks source link

New client is skipping small events and errors on big events #7

Open halsbox opened 9 months ago

halsbox commented 9 months ago
func readPack(conn net.Conn, buf []byte) (string, error) {
    var packet string
    for {
        n, err := conn.Read(buf)
        if err != nil {
            return "", fmt.Errorf("%w: failed read: %s", ErrConn, err)
        }
        packet += string(buf[:n])
        if len(packet) > len(buf) {
            return "", fmt.Errorf("%w: too long input: %d", ErrAMI, len(packet))
        }
        if strings.HasSuffix(packet, "\r\n\r\n") {
            return packet, nil
        }
    }
}

In this fragment if AMI event is larger than buf, you'll get "too long input" error. In my case AMI messages are going continuously and often conn.Read(buf) reads the whole AMI message together with the beginning of the next AMI message, so there is no "\r\n\r\n" in the end of packet value and finally function errors with "too long input" in next loop.

staskobzar commented 7 months ago

@halsbox thanks for reporting this and sorry for delay you are right, this is a wrong behaviour I will try to fix it soon

halsbox commented 7 months ago

@staskobzar Hi! I've reviewed your fix and I'm not sure it actually fixes the problem. The point is you are not guarantied to have \r\n\r\n as a suffix of a packet assembled by packet += string(buf[:n]). As a workaround, I do all the job in the main loop, keeping packet variable outside the conn.Read loop and splitting packet on \r\n\r\n as separator and then parsing each part as it arrives. Like this:

func (c *Client) Run() {
  go func() {
    buf := make([]byte, bufSize)
    for {
      if c.conn != nil {
        break
      }
      time.Sleep(netTimeout)
    }
    c.ReadForever()
    var packet string
    for {
      n, err := c.conn.Read(buf)
      if err != nil {
        switch {
        case errors.Is(err, syscall.ECONNABORTED):
          fallthrough
        case errors.Is(err, syscall.ECONNRESET):
          fallthrough
        case errors.Is(err, syscall.ECONNREFUSED):
          fallthrough
        case err == io.EOF:
          c.emitNetErr(ErrEOF)
          <-c.waitNewConnection
        //Pause reading when connection is not available while
        //reconnecting and reconnection is triggered from network error
        //in separate AMI Action Ping goroutine.
        case errors.Is(err, os.ErrDeadlineExceeded):
          c.emitNetErr(ErrDead)
          <-c.waitNewConnection
          _ = c.conn.SetReadDeadline(time.Time{})
        default:
          c.emitErr(fmt.Errorf("%w: conn read error: %s", Error, err))
        }
        continue
      }
      // We are not guarantied to have `\r\n\r\n` right at the end of data that was read.
      // We even may have the end of one message, one small but whole message
      // and the beginning of the third message in the buf.
      parts := bytes.SplitAfter(buf[:n], []byte("\r\n\r\n"))
      // append one whole message to empty packet or the tail of the big message
      // to packet whith previous parts of big message.
      packet += string(parts[0])
      if len(parts) > 1 {
        // whole message is in the packet so we parse it
        msg, err := Parse(packet)
        if err != nil {
          c.emitErr(fmt.Errorf("%w: failed to parse AMI message: %s", ErrAMI, err))
        } else {
          c.emitMsg(msg)
        }
        // parse other parts except the last one
        for i := 1; i < len(parts)-1; i++ {
          msg, err := Parse(string(parts[i]))
          if err != nil {
            c.emitErr(fmt.Errorf("%w: failed to parse AMI message: %s", ErrAMI, err))
          } else {
            c.emitMsg(msg)
          }
        }
        // Assign last part to packet.
        // Last part will always be either empty string or first fragment
        // of upcoming message because of SplitAfter behavior.
        packet = string(parts[len(parts)-1])
      }
    }
  }()
}
staskobzar commented 7 months ago

@halsbox I think this is AMI who should guaranty terminating "\r\n\r\n" to the packet. We need just read and return an error if the packet is invalid or there is a connection problem.

the fix is just read till "\r\n\r\n" and then returns the packet.

halsbox commented 7 months ago

@staskobzar
We have asterisk 18.x under heavy load and it's generating a lot of AMI messages that we need to consume. It's a common case when packet is not ending with \r\n\r\n. It takes less then a second from starting of connection to get error on buf data not ending with \r\n\r\n. I even had to use my parsing code in the login function cause 8 times of 10 I had login reply received to buf with the first part of the following Event message.

halsbox commented 7 months ago

I did a lot of experimenting on this, playing with different buf sizes from very small to huge and also different asterisk versions from 14.x to 20.x and came to conclusion that you either have to read from connection in portions and then split on \r\n\r\n or read byte-by-byte until \r\n\r\n as it was done in goami2 1.6 where textproto.Reader does this under the hood in ReadMIMEHeader().

staskobzar commented 7 months ago

@halsbox I see what you mean I remember now I have this problem when network packet has two AMI packets with \r\n\r\n in the middle You are right, I will have to rework it thank you

halsbox commented 7 months ago

@halsbox I think this is AMI who should guaranty terminating "\r\n\r\n" to the packet.

To be clear, I mean that every AMI Message is terminated with "\r\n\r\n", but when reading from connection with Read([]byte) we can get any portion of stream of AMI messages. It can be a whole message terminated by "\r\n\r\n" if there is enough delay between messages or it can be an ending part (with "\r\n\r\n") of big message that did not feat in a buf size in previous loop, followed by very small message with it's "\r\n\r\n" and followed by beginning of the third message that will be cut in the middle and next part of which will be received in next loop of Read([]byte).

halsbox commented 7 months ago

One more thing I'd like to share: In my tests under heavy load, reading in portions and then splitting on terminator strangely consumed less CPU then reading byte by byte until terminator by custom function or by textproto.Reader. Unfortunately I had no time to investigate the reasons. Currently I use reading byte by byte until '\n' only before login in AMI prompt prefix verification (like: bytes.Equal(label[:22], []byte("Asterisk Call Manager/")))

staskobzar commented 7 months ago

@halsbox thanks, I will keep in mind will share here when I have some results

staskobzar commented 7 months ago

there is new v1.7.4 release where I have tried to fix all we have discussed here. I was using your idea of reading from connection as a stream and scan for the terminating \r\n\r\n even if it is in the middle of network packet. Also, considering your last comment I am using reading by line and it seems to give a good performance rate. Take a look when you have a time and let me know if you have any comment. Thanks your help