doujiang24 / lua-resty-kafka

Lua kafka client driver for the Openresty based on the cosocket API
BSD 3-Clause "New" or "Revised" License
801 stars 274 forks source link

failed to do SCRAM-SHA-512 auth #153

Open zhanghangorg opened 1 year ago

zhanghangorg commented 1 year ago

环境: openresty -V nginx version: openresty/1.21.4.1 built by gcc 12.2.1 20220924 (Alpine 12.2.1_git20220924-r4) built with OpenSSL 1.1.1t 7 Feb 2023

lua-resty-kafka 0.22.2 依赖 lua-resty-openssl 0.8.22 lua-resty-jit-uuid-0.0.7

问题描述: 连接 SCRAM-SHA-512 认证的kafka集群,无法通过认证

错误信息: [info] 63#63: 5040 [lua] client.lua:185: _fetch_metadata(): broker fetch metadata failed, err:failed to do SCRAM-SHA-512 auth with 1...:9092: closed, host: 1.*.*, port: 9092, context: ngx.timer, client: 1..., server: 0.0.0.0:80

返回的err信息,仅仅是一个 closed。

通过代码跟踪,发现在 scramsha.lua - _M.sock_send_receive() 中 local len, err = sock:receive(4) len为空,进入错误处理 https://github.com/doujiang24/lua-resty-kafka/blob/3fbed91d81d4fb32d4dda4316f5f2cba04622633/lib/resty/kafka/scramsha.lua#L198

通过telent测试,网络是通的。

doujiang24 commented 1 year ago

@Jon-Gaara 可以帮忙看下不?多谢

zhanghangorg commented 1 year ago

有没有可能是 配置链接方式出错? 我已经做过很多种不同配置的尝试,当前配置如下。

  local sasl_config = { mechanism = "SCRAM-SHA-512", user = "my_name", password = "my_passowd" }
  local broker_list = {
        {
            host = "1.x.x.x", port = 9092, sasl_config = sasl_config
        },
        {
            host = "1.x.x.x", port = 9092, sasl_config = sasl_config
        },
        {
            host = "1.x.x.x", port = 9092, sasl_config = sasl_config
        }
    }
    local producer_config = {
        api_version = "auto",
        producer_type = "async",
        request_timeout = 3000,
        ssl_verify = false,
        sasl_config = sasl_config
    }
    local cluster_name = "my_cluster_name"
    local pro = producer:new(broker_list, producer_config, cluster_name)

因为不确定 sasl_config 是放在broker_list 还是client_config,所以都配置了sasl_config。

python 3.10下测试可链接的配置如下

broker_ips = "1.x.x.x:9092, 1.2.2.2:9092, 1.3.3.3:9092"
consumer = KafkaConsumer(
       bootstrap_servers=broker_ips,
       group_id=aclCluster,
       sasl_mechanism="SCRAM-SHA-512",
       security_protocol='SASL_PLAINTEXT',
       sasl_plain_username=my_name,
       sasl_plain_password=my_passowd
    )

配置上少一个 security_protocol

Jon-Gaara commented 1 year ago

1686291048755 1686291545838 1686291567865 1686291584462 我测试是可以过的,你那边能给下kafka的集群配置么?

zhanghangorg commented 1 year ago

的,你那边能给下kafka的集群配置么

@Jon-Gaara 感谢你的回复,请问这个认证,在log_by_lua 阶段会不会有问题? 我怀疑认证过程部分API 在log_by_lua 阶段无法正常运行。

因为是别人的kafka集群,我无法拿到配置,但是我通过python的api测试是可以正常访问的。 如下配置 broker_ips = "1.x.x.x:9092, 1.2.2.2:9092, 1.3.3.3:9092" consumer = KafkaConsumer( bootstrap_servers=broker_ips, group_id=aclCluster, sasl_mechanism="SCRAM-SHA-512", security_protocol='SASL_PLAINTEXT', sasl_plain_username=my_name, sasl_plain_password=my_passowd )

Jon-Gaara commented 1 year ago

你在log_by_lua* 阶段是通过ngx.timer.at去调用kafka发送消息的么?如果不是的话,可以试试,看下能不能正常运行

zhanghangorg commented 1 year ago

如果不是的话,可以试试,看下能不能

测试了通过ngx.timer.at 调用,还是无法连接kafka

kafka集群认证的配置是 security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512 sasl.enabled.mechanisms=SCRAM-SHA-512

Jon-Gaara commented 1 year ago

你如果方便的也可以debug看下,因为我这个就是参考的kafka-python写的,如果是scramsha.lua - _M.sock_send_receive() 中 local len, err = sock:receive(4) len为空的话,那就是调用send_first_message方法发第一条信息就失败了,这种情况说明发送的消息有问题,kafka主动关闭连接,你可以和kafka-python对比下发的消息有什么不一样的地方?

zhanghangorg commented 1 year ago

afka-python对比 lua-resty-kafka 打印的 client_first_message 是 n,,n=user_name,r=34b5a7**c33ee4 (32位随机字符) server_first_message 是 closed

抓包比较吗?请问有没有什么比较快捷的对比方法?

Jon-Gaara commented 1 year ago

我本地模拟看看,能不能出现你说的情况. 你的scala和kafka的版本是多少?

zhanghangorg commented 1 year ago

大佬,看下是不是这个原因。 kafka-python库有个参数 : security_protocol (str): Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. Default: PLAINTEXT.

kafka-python此参数的精简后 逻辑如下 (conn.py 400 行左右)

if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
      self.state = ConnectionStates.HANDSHAKE
elif self.config['security_protocol'] == 'SASL_PLAINTEXT':
    self.state = ConnectionStates.AUTHENTICATING
else:
    self.state = ConnectionStates.CONNECTED
......
if self.state is ConnectionStates.HANDSHAKE:
            if self._try_handshake():
                log.debug('%s: completed SSL handshake.', self)
                if self.config['security_protocol'] == 'SASL_SSL':
                    log.debug('%s: initiating SASL authentication', self)
                    self.state = ConnectionStates.AUTHENTICATING
                else:
                    log.info('%s: Connection complete.', self)
                    self.state = ConnectionStates.CONNECTED
                    self._reset_reconnect_backoff()
                self.config['state_change_callback'](self.node_id, self._sock, self)

        if self.state is ConnectionStates.AUTHENTICATING:
            assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL')
            if self._try_authenticate():

security_protocol 参数 为 SASL_SSL 时,有_try_handshake调用 , 为 SASL_PLAINTEXT 直接 _try_authenticate 在pythonn中调用,是否配置这个参数,均可正常连接。

security_protocol 不配置时,通过抓包和 pdb调试,都没有发送 first_message() security_protocol 配置为 SASL_PLAINTEXT 时, client_first 为 n,,n=user_name,r=72effa31d3cc4e68bf**e server_first 为 r=72effa31d****f,s=MTk1bGpsb**=,i=4096

在lua-resty-kafka 这个库中没有此参数,默认逻辑就 ssl 握手 client_first 为 n,,n=user_name,r=34b5a7**c33ee4

远程返回closed,连接kafka失败。

zhanghangorg commented 1 year ago

我本地模拟看看,能不能出现你说的情况. 你的scala和kafka的版本是多少?

kafka版本 2.1.2-1.0.2 麻烦帮忙看看

Jon-Gaara commented 1 year ago
  1. security_protocol我们没有这个参数,但是我们有实现这个逻辑,如果security_protocol=PLAINTEXT,相当于你是明文连接,直接连接就好了,不需要配置{ ssl = true }和sasl_config,如果security_protocol=SSL,需要配置{ ssl = true },如果security_protocol=SASL_PLAINTEXT,则只需要配置sasl_config,如果security_protocol=SASL_SSL,需要配置{ssl=true}和sasl_config
  2. 如果python security_protocol 配置和不配置都能连成功,说明你kafka支持SASL_PLAINTEXT 和 PLAINTEXT,那你用lua连接的时候,可以去掉sasl_config,看看能连接成功么?
  3. python的client_first 为 n,,n=user_name,r=72effa31d3cc4e68bfe,lua的为n,,n=user_name,r=34b5a7**c33ee4,我想问下lua的n=user_name应该是你传的用户名,这里是否是正确的?为什么python看起来像正确的用户名,lua的却不像
zhanghangorg commented 1 year ago
  1. security_protocol我们没有这个参数,但是我们有实现这个逻辑,如果security_protocol=PLAINTEXT,相当于你是明文连接,直接连接就好了,不需要配置{ ssl = true }和sasl_config,如果security_protocol=SSL,需要配置{ ssl = true },如果security_protocol=SASL_PLAINTEXT,则只需要配置sasl_config,如果security_protocol=SASL_SSL,需要配置{ssl=true}和sasl_config
  2. 如果python security_protocol 配置和不配置都能连成功,说明你kafka支持SASL_PLAINTEXT 和 PLAINTEXT,那你用lua连接的时候,可以去掉sasl_config,看看能连接成功么?
  3. python的client_first 为 n,,n=user_name,r=72effa31d3cc4e68bfe,lua的为n,,n=user_name,r=34b5a7**c33ee4,我想问下lua的n=user_name应该是你传的用户名,这里是否是正确的?为什么python看起来像正确的用户名,lua的却不像
  1. 收到,我再试试;
  2. 去掉sasl_config,是指 完全不配置用户名和密码吗?
  3. 是的,都是我的用户名,被我替换 模糊化了, 格式是一样的。麻烦你帮忙也把 n=xxx模糊一下。
Jon-Gaara commented 1 year ago

是的,整个去掉sasl_config,不配置用户名和密码

zhanghangorg commented 1 year ago

local broker_list = { {host="集群ip1", port=9092}, {host="集群ip2", port=9092}, {host="集群ip3", port=9092}, } local producer_config = {} local cluster_name = "集群名称"

local p = producer:new(broker_list, producer_config, cluster_name)

还是链接失败,报错 all brokers failed in fetch topic metadata, context: ngx.timer not found topic, context: ngx.timer

Jon-Gaara commented 1 year ago

我用kafka版本 2.12-1.0.2,跑了下test,发现很多都过不了,版本不兼容,你是否能升级kafka版本? 1688109534969 1688109662847

Jon-Gaara commented 1 year ago

@doujiang24 是否需要对kafka的低版本进行支持?

zhanghangorg commented 1 year ago

过不了,版本不兼容,你是否能升级kafk

生产环境使用的统一集群,我很被动😭。

请教: 这个测试环境 如何搭建,可以通过哪些关键词找到资料?

Jon-Gaara commented 1 year ago

fork项目,然后改test.yaml,提交之后,github就会跑Action,执行测试 https://docs.github.com/zh/actions/learn-github-actions/understanding-github-actions

Jon-Gaara commented 1 year ago

过不了,版本不兼容,你是否能升级kafk

生产环境使用的统一集群,我很被动😭。

请教: 这个测试环境 如何搭建,可以通过哪些关键词找到资料?

我这边测试用kafka 2.12-1.0.2,跑test,认证也是可以过的,不知道你本地为什么不行?lua-resty-kafka只有0.22.0版本,0.22.2是哪个版本?

zhanghangorg commented 1 year ago

lua-resty-kafka-0.22 我是0.22

doujiang24 commented 1 year ago

@doujiang24 是否需要对kafka的低版本进行支持?

我的意见是:能支持最好,如果有想支持的,欢迎 PR,我还是乐意来 review 代码的

sincereye commented 9 months ago

@zhanghangorg 你这个问题解决了吗?我也遇到了同样的问题