NuclearAPK / Simple-Kafka_Adapter

Простой адаптер для 1С (пишем, читаем)
Apache License 2.0
70 stars 16 forks source link

Информация об ошибке #5

Closed guest7623 closed 1 year ago

guest7623 commented 1 year ago

https://github.com/NuclearAPK/Simple-Kafka_Adapter/blob/a2230fa97d1cc5f0b983e8400207942c836da8cb/src/SimpleKafka1C.cpp#LL374C14-L374C14

Думаю здесь надо вывести в лог или интерактивно ошибку, а так по тихому возвращается пустая строка и не ясно почему

NuclearAPK commented 1 year ago

Доработка в работе

guest7623 commented 1 year ago

Можете сделать тестовый релиз с выводом ошибки в этом месте (очень нужен)) )?

NuclearAPK commented 1 year ago

В соседней ветке я выложил на yandex disk тестовую сборку с выводом в лог именно в этом месте. Код репозитория я тоже обновил. Но если проблема в ssl - и в лог не выводится информация, то нужно смотреть именно настройки. В настоящий момент я собрал тестовый стенд с ssl, sasl на последней актуальной версии kafka, но не успел еще проверить работу компоненты. До конца недели в планах проверить.

guest7623 commented 1 year ago

Настройки одинаковые в компоненте 1с и в python клиенте , но python клиент читает топик, а компонента 1с нет Python клиент:

from confluent_kafka import Consumer

c = Consumer({
    'bootstrap.servers': 'kafka-1.eaist2-f.mos.ru:9192,kafka-2.eaist2-f.mos.ru:9192,kafka-3.eaist2-f.mos.ru:9192',
    'ssl.endpoint.identification.algorithm': 'https',
    'group.id': 'mygroup1',
    'auto.offset.reset': 'beginning',
    'enable.ssl.certificate.verification': 'false',
    'security.protocol': 'sasl_ssl',
    'ssl.ca.location': 'c:\kafka\eaist-ca.cer',    
    'ssl.key.location': 'c:\kafka\EISNSI_eaist-private.pem',
    'ssl.key.password': 'pass1',
    'ssl.certificate.location': 'c:\kafka\cert-signed',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': 'EISNSI',
    'sasl.password': 'pass2'
})

c.subscribe(['eaistf2.eaist.uas2.kafka-output.EISNSI.okei.GetDictionaryItem.v-1'])

while True:
    msg = c.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        print('Consumer error: {}'.format(msg.error()))
        continue

    print('Received message: {}'.format(msg.value().decode('utf-8')))
    break

c.close()

1c:

&НаСервере
Процедура ПолучитьСообщенияНаСервере()

    Попытка
        Компонента = Новый("AddIn.Integration.simpleKafka1C");
    Исключение
        Подключено = ПодключитьВнешнююКомпоненту("ОбщийМакет.БП_SimpleKafkaAdapter", "Integration", ТипВнешнейКомпоненты.Native, ТипПодключенияВнешнейКомпоненты.Изолированно);
        Если Подключено Тогда
            Компонента = Новый("AddIn.Integration.simpleKafka1C");
        КонецЕсли;
    КонецПопытки;

    НастройкиПодключения = Константы.БП_ЕАИСТ_KafkaНастройкиПодключения.Получить().Получить();

    Компонента.УстановитьПараметр("ssl.endpoint.identification.algorithm", "https");
    Компонента.УстановитьПараметр("group.id", "mygroup1");
    Компонента.УстановитьПараметр("auto.offset.reset", "beginning");
    Компонента.УстановитьПараметр("enable.ssl.certificate.verification", "false");  
    Компонента.УстановитьПараметр("security.protocol", "sasl_ssl");
    Компонента.УстановитьПараметр("ssl.ca.location", "c:\kafka\eaist-ca.cer");
    Компонента.УстановитьПараметр("ssl.key.location", "c:\kafka\EISNSI_eaist-private.pem");
    Компонента.УстановитьПараметр("ssl.key.password", "pass1");
    Компонента.УстановитьПараметр("ssl.certificate.location", "c:\kafka\cert-signed");
    Компонента.УстановитьПараметр("sasl.mechanism", "PLAIN");
    Компонента.УстановитьПараметр("sasl.username", "EISNSI");
    Компонента.УстановитьПараметр("sasl.password", "pass2");

    КаталогЛогов = "c:\Users\USR1CV8\kafka_logs\";
    Если ЗначениеЗаполнено(КаталогЛогов) Тогда
        Компонента.УстановитьФайлЛогирования(СтрШаблон("%1%2%3.log", КаталогЛогов , ПолучитьРазделительПути(), Новый УникальныйИдентификатор));
    КонецЕсли;

    Результат = Компонента.ИнициализироватьКонсьюмера("kafka-1.eaist2-f.mos.ru:9192,kafka-2.eaist2-f.mos.ru:9192,kafka-3.eaist2-f.mos.ru:9192", "eaistf2.eaist.uas2.kafka-output.EISNSI.okei.GetDictionaryItem.v-1");

    // установка таймаута для ожидания сообщений
    Компонента.УстановитьТаймаутОжидания(1000); 

    Если Не Результат Тогда  
        ТекстОшибки = "Не удалось инициализировать консьюмера для топика";  
        ЗаписьЖурналаРегистрации("Интеграция Kafka. Consumer", УровеньЖурналаРегистрации.Ошибка,,, ТекстОшибки);
        Сообщить(ТекстОшибки);
        Возврат;
    КонецЕсли;

    ЧтениеJSON = Новый ЧтениеJSON;

    пока истина цикл
        Попытка
            Сообщение = Компонента.Слушать(); 
            Сообщить(Сообщение);
        Исключение                  
            Компонента = Неопределено;                                                         
            сообщить(ОписаниеОшибки());
            Прервать;
        КонецПопытки;
        Прервать;
    КонецЦикла;

    Компонента.ОстановитьКонсьюмера();  
    Компонента = Неопределено;

КонецПроцедуры
guest7623 commented 1 year ago

Удалось прочитать топик с помощью компоненты 1с - проблема оказалась в таймауте, стоял 1000 мс, поставил 15000 мс и тогда прочитал. Логирование таймаута тоже стоит добавить в код, сейчас насколько понял его нет

NuclearAPK commented 1 year ago

Если логирование таймаута добавить - лог будет содержать однотипные сообщения, от которых нет толку, уже так делали.

вт, 30 мая 2023 г., 17:31 guest7623 @.***>:

Удалось прочитать топик с помощью компоненты 1с - проблема оказалась в таймауте, стоял 1000 мс, поставил 15000 мс и тогда прочитал. Логирование таймаута тоже стоит добавить в код, сейчас насколько понял его нет

— Reply to this email directly, view it on GitHub https://github.com/NuclearAPK/Simple-Kafka_Adapter/issues/5#issuecomment-1568193168, or unsubscribe https://github.com/notifications/unsubscribe-auth/AKTVTZGIU7FPVCHLGL2DMT3XIXD7ZANCNFSM6AAAAAAYKLFI2A . You are receiving this because you commented.Message ID: @.***>

NuclearAPK commented 1 year ago

Добавлено логирование в релизе 1.1.0 Коллбеки при доставке и статистика так же доступны statistics.interval.ms - значение настройки, отличное от 0 включает вывод статистики работы клиента и брокера. Расшифровка статистики https://github.com/confluentinc/librdkafka/blob/master/STATISTICS.md debug - настройка вывода в лог информации: broker,topic,msg - детальная информация о продюссере consumer,cgrp,topic,fetch - консьюмер. Полное описание доступно здесь https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md