telefonicaid / fiware-cygnus

A connector in charge of persisting context data sources into other third-party databases and storage systems, creating a historical view of the context
https://fiware-cygnus.rtfd.io/
GNU Affero General Public License v3.0
64 stars 104 forks source link

Unable to deliver event to Kafka #2014

Open filgiuff opened 3 years ago

filgiuff commented 3 years ago

I've installed Fiware Orion (v 3.0.0), Fiware Cygnus (2.8.0) and Kafka (2.7.0) in order to send data from Orion to Kafka via Cygnus.

When the notification arrives from Orion to Cygnus, this error appears:

cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | time=2021-04-16T10:12:13.774Z | lvl=ERROR | corr=36cccd7a-9e9c-11eb-8e44-02420a000004; cbnotif=2 | trans=a6fe0530-c653-4e19-828a-6 7ec45b37a96 | srv=N/A | subsrv=N/A | comp=cygnus-ngsi | op=run | msg=org.apache.flume.SinkRunner$PollingRunner[158] : Unable to deliver event. Exception follows. cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | java.lang.IllegalStateException: close() called when transaction is OPEN - you must either commit or rollback first cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | at com.google.common.base.Preconditions.checkState(Preconditions.java:172) cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | at org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179) cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | at com.telefonica.iot.cygnus.sinks.NGSISink.processNewBatches(NGSISink.java:646) cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | at com.telefonica.iot.cygnus.sinks.NGSISink.process(NGSISink.java:373) cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | at java.lang.Thread.run(Thread.java:748) cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | time=2021-04-16T10:12:18.778Z | lvl=INFO | corr=36cccd7a-9e9c-11eb-8e44-02420a000004; cbnotif=2 | trans=a6fe0530-c653-4e19-828a-67 ec45b37a96 | srv=N/A | subsrv=N/A | comp=cygnus-ngsi | op=processNewBatches | msg=com.telefonica.iot.cygnus.sinks.NGSISink[643] : Rollback transaction by Exception (begin() calle d when transaction is OPEN!) ...

Here other details.

damianhorna commented 3 years ago

Hi, I've got the same issue. Apparently the error appears when cygnus is checking whether a specific kafka topic exists.

cygnus_1  | time=2021-04-23T09:04:54.013Z | lvl=DEBUG | corr=f7bffea0-a412-11eb-8d4a-0242ac190003 | trans=9aebe718-5668-47af-bb02-306ffbbaa1e1 | srv=N/A | subsrv=N/A | comp=cygnus-ngsi | op=topicExists | msg=com.telefonica.iot.cygnus.backends.kafka.KafkaBackendImpl[60] : Checking if topic 'libraryxffffx002fcatalogxffffBook1xffffBook' already exists.
cygnus_1  | time=2021-04-23T09:04:54.015Z | lvl=DEBUG | corr=f7bffea0-a412-11eb-8d4a-0242ac190003 | trans=9aebe718-5668-47af-bb02-306ffbbaa1e1 | srv=library | subsrv=/catalog | comp=cygnus-ngsi | op=sendResponse | msg=org.eclipse.jetty.server.HttpChannel[693] : sendResponse info=null content=HeapByteBuffer@587c2b08[p=0,l=0,c=0,r=0]={<<<>>>} complete=true committing=false callback=Blocker@4fec5d4c{null}
cygnus_1  | time=2021-04-23T09:04:54.015Z | lvl=DEBUG | corr=f7bffea0-a412-11eb-8d4a-0242ac190003 | trans=9aebe718-5668-47af-bb02-306ffbbaa1e1 | srv=library | subsrv=/catalog | comp=cygnus-ngsi | op=process | msg=org.eclipse.jetty.server.HttpConnection$SendCallback[694] : org.eclipse.jetty.server.HttpConnection$SendCallback@3e547de7[PROCESSING][i=null,cb=Blocker@4fec5d4c{null}] generate: CONTINUE (null,[p=0,l=0,c=0,r=0],true)@COMPLETING
cygnus_1  | time=2021-04-23T09:04:54.015Z | lvl=DEBUG | corr=f7bffea0-a412-11eb-8d4a-0242ac190003 | trans=9aebe718-5668-47af-bb02-306ffbbaa1e1 | srv=library | subsrv=/catalog | comp=cygnus-ngsi | op=process | msg=org.eclipse.jetty.server.HttpConnection$SendCallback[694] : org.eclipse.jetty.server.HttpConnection$SendCallback@3e547de7[PROCESSING][i=null,cb=Blocker@4fec5d4c{null}] generate: NEED_CHUNK (null,[p=0,l=0,c=0,r=0],true)@COMPLETING
cygnus_1  | time=2021-04-23T09:04:54.015Z | lvl=DEBUG | corr=f7bffea0-a412-11eb-8d4a-0242ac190003 | trans=9aebe718-5668-47af-bb02-306ffbbaa1e1 | srv=library | subsrv=/catalog | comp=cygnus-ngsi | op=process | msg=org.eclipse.jetty.server.HttpConnection$SendCallback[694] : org.eclipse.jetty.server.HttpConnection$SendCallback@3e547de7[PROCESSING][i=null,cb=Blocker@4fec5d4c{null}] generate: FLUSH (null,[p=0,l=0,c=0,r=0],true)@COMPLETING
cygnus_1  | time=2021-04-23T09:04:54.015Z | lvl=DEBUG | corr=f7bffea0-a412-11eb-8d4a-0242ac190003 | trans=9aebe718-5668-47af-bb02-306ffbbaa1e1 | srv=library | subsrv=/catalog | comp=cygnus-ngsi | op=write | msg=org.eclipse.jetty.io.WriteFlusher[315] : write: WriteFlusher@76adc9a1{IDLE}->null [HeapByteBuffer@2ee8a74[p=0,l=5,c=1024,r=5]={<<<0\r\n\r\n>>>\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00...\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00}]
cygnus_1  | time=2021-04-23T09:04:54.015Z | lvl=DEBUG | corr=f7bffea0-a412-11eb-8d4a-0242ac190003 | trans=9aebe718-5668-47af-bb02-306ffbbaa1e1 | srv=library | subsrv=/catalog | comp=cygnus-ngsi | op=updateState | msg=org.eclipse.jetty.io.WriteFlusher[119] : update WriteFlusher@76adc9a1{WRITING}->null:IDLE-->WRITING
cygnus_1  | time=2021-04-23T09:04:54.016Z | lvl=DEBUG | corr=f7bffea0-a412-11eb-8d4a-0242ac190003 | trans=9aebe718-5668-47af-bb02-306ffbbaa1e1 | srv=library | subsrv=/catalog | comp=cygnus-ngsi | op=flush | msg=org.eclipse.jetty.io.ChannelEndPoint[288] : flushed 5 SocketChannelEndPoint@376222c4{/172.25.0.3:58186<->/172.25.0.4:5050,OPEN,fill=-,flush=W,to=4/30000}{io=0/0,kio=0,kro=1}->HttpConnection@210f4b41[p=HttpParser{s=END,383 of 383},g=HttpGenerator@59427309{s=COMPLETING}]=>HttpChannelOverHttp@f59a675{r=1,c=true,a=COMPLETING,uri=//cygnus:5050/notify}
cygnus_1  | time=2021-04-23T09:04:54.016Z | lvl=ERROR | corr=f7bffea0-a412-11eb-8d4a-0242ac190003 | trans=9aebe718-5668-47af-bb02-306ffbbaa1e1 | srv=N/A | subsrv=N/A | comp=cygnus-ngsi | op=run | msg=org.apache.flume.SinkRunner$PollingRunner[158] : Unable to deliver event. Exception follows.
cygnus_1  | java.lang.IllegalStateException: close() called when transaction is OPEN - you must either commit or rollback first
cygnus_1  |     at com.google.common.base.Preconditions.checkState(Preconditions.java:172)
cygnus_1  |     at org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
cygnus_1  |     at com.telefonica.iot.cygnus.sinks.NGSISink.processNewBatches(NGSISink.java:646)
cygnus_1  |     at com.telefonica.iot.cygnus.sinks.NGSISink.process(NGSISink.java:373)
cygnus_1  |     at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
cygnus_1  |     at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
cygnus_1  |     at java.lang.Thread.run(Thread.java:748)
damianhorna commented 3 years ago

Possible steps to reproduce:

docker-compose.yml:

version: "3.9"
services:
  mongo:
    image: mongo:4.4
    command: --nojournal
  orion:
    image: fiware/orion
    links:
      - mongo
    ports:
      - "1026:1026"
    command: -dbhost mongo
  cygnus:
    image: fiware/cygnus-ngsi
    volumes:
      - ./agent.conf:/opt/apache-flume/conf/agent.conf
    depends_on:
      - orion
    expose:
      - "5050"
      - "5080"
    ports:
      - "5050:5050"
      - "5080:5080"
    environment:
      - CYGNUS_LOG_LEVEL=DEBUG
      - CYGNUS_SKIP_CONF_GENERATION=true
      - CYGNUS_MULTIAGENT=false

agent.conf:

cygnus-ngsi.sources = http-source
cygnus-ngsi.sinks = kafka-sink
cygnus-ngsi.channels = kafka-channel

cygnus-ngsi.sources.http-source.channels = kafka-channel
cygnus-ngsi.sources.http-source.type = org.apache.flume.source.http.HTTPSource
cygnus-ngsi.sources.http-source.port = 5050
cygnus-ngsi.sources.http-source.handler = com.telefonica.iot.cygnus.handlers.NGSIRestHandler
cygnus-ngsi.sources.http-source.handler.notification_target = /notify
cygnus-ngsi.sources.http-source.handler.default_service = def_serv
cygnus-ngsi.sources.http-source.handler.default_service_path = /def_servpath
cygnus-ngsi.sources.http-source.handler.events_ttl = 2
cygnus-ngsi.sources.http-source.interceptors = ts gi
cygnus-ngsi.sources.http-source.interceptors.ts.type = timestamp
cygnus-ngsi.sources.http-source.interceptors.gi.type = com.telefonica.iot.cygnus.interceptors.NGSIGroupingInterceptor$Builder
cygnus-ngsi.sources.http-source.interceptors.gi.grouping_rules_conf_file = /opt/apache-flume/conf/grouping_rules.conf

cygnus-ngsi.channels.kafka-channel.type = memory
cygnus-ngsi.channels.kafka-channel.capacity = 1000
cygnus-ngsi.channels.kafka-channel.trasactionCapacity = 100

cygnus-ngsi.sinks.kafka-sink.type = com.telefonica.iot.cygnus.sinks.NGSIKafkaSink
cygnus-ngsi.sinks.kafka-sink.channel = kafka-channel
cygnus-ngsi.sinks.kafka-sink.enable_grouping = false
cygnus-ngsi.sinks.kafka-sink.data_model = dm-by-entity
cygnus-ngsi.sinks.kafka-sink.broker_list = 172.17.0.1:9092
cygnus-ngsi.sinks.kafka-sink.zookeeper_endpoint = 172.17.0.1:2181
cygnus-ngsi.sinks.kafka-sink.batch_size = 1
cygnus-ngsi.sinks.kafka-sink.batch_timeout = 10

And then requests as in the tutorial (https://fiware-cygnus.readthedocs.io/en/latest/cygnus-ngsi/integration/orion_cygnus_kafka/index.html):

(curl localhost:1026/v1/subscribeContext -s -S --header 'Content-type: application/json' --header 'Accept: application/json' --header 'Fiware-Service: Library' --header 'Fiware-ServicePath: /catalog' -d @- | python -mjson.tool) <<EOF
{
    "entities": [
        {
            "type": "Book",
            "isPattern": "false",
            "id": "Book1"
        }
    ],
    "attributes": [
    ],
    "reference": "http://cygnus:5050/notify",
    "duration": "P1M",
    "notifyConditions": [
        {
            "type": "ONCHANGE",
            "condValues": [
                "title",
                "pages",
                "price"
            ]
        }
    ],
    "throttling": "PT5S"
}
EOF
(curl localhost:1026/v1/updateContext -s -S --header 'Content-Type: application/json' --header 'Accept: application/json' --header 'Fiware-Service: Library' --header 'Fiware-ServicePath: /catalog' -d @- | python -mjson.tool) <<EOF
{
    "contextElements": [
        {
            "type": "Book",
            "isPattern": "false",
            "id": "Book1",
            "attributes": [
                {
                    "name": "title",
                    "type": "text",
                    "value": "Game of Thrones: Book 1"
                },
                {
                    "name": "pages",
                    "type": "integer",
                    "value": "927"
                },
                {
                    "name": "price",
                    "type": "float",
                    "value": "18.50"
                }
            ]
        }
    ],
    "updateAction": "APPEND"
}
EOF

After that the error appears in the console.

mapedraza commented 3 years ago

Thanks for adding the information Damian. The problem seems to be a specific problem with the Kafka sink, which it does not have a high priority on the maintenance list at the present moment by Cygnus development team, so we cannot offer a quick answer to your problem. However, we are open to contributions to get Kafka sink working properly.

AlvaroVega commented 3 years ago

Could you also provide how you starts kafka broker?

filgiuff commented 3 years ago

I've tried with different versions of Kafka and docker-compose Following one that I used:

version: "3"

services:
  zookeeper:
    image: docker.io/bitnami/zookeeper:3
    ports:
      - "2181:2181"
    volumes:
      - "zookeeper_data:/bitnami"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
    deploy:
      replicas: 1
      restart_policy:
        condition: any
        delay: 5s
        max_attempts: 3
        window: 120s  

  kafka:
    image: docker.io/bitnami/kafka:2
    ports:
      - "9092:9092"
    volumes:
      - "kafka_data:/bitnami"
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_LISTENERS=PLAINTEXT://:9092
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.142:9092
    depends_on:
      - zookeeper
    deploy:
      replicas: 1
      restart_policy:
        condition: any
        delay: 5s
        max_attempts: 3
        window: 120s  

volumes:
  zookeeper_data:
    driver: local
  kafka_data:
    driver: local

Started using docker stack deploy -c docker-compose.yml kafka

The result was always the same error.

jaimeventura commented 2 years ago

Hey, im having the same issue. Is there any update on this? Thanks