lf-edge / ekuiper

Lightweight data stream processing engine for IoT edge
https://ekuiper.org
Apache License 2.0
1.46k stars 411 forks source link

concurrent map read and map write error #849

Closed salmanazmat666 closed 3 years ago

salmanazmat666 commented 3 years ago

Hi,

I'm trying to test the performance of Kuiper 1.2.0. I'm using a modified version of the edgex source which better suits my needs. I'm testing 100 rules with each rule running 5 times every second. I get following error.

fatal error: concurrent map read and map write

goroutine 1722 [running]:
runtime.throw(0x130b210, 0x21)
        runtime/panic.go:1117 +0x72 fp=0xc002373768 sp=0xc002373738 pc=0x7c9d12
runtime.mapaccess2_faststr(0x12255a0, 0xc000ccc3c0, 0xc000d0d138, 0x4, 0x0, 0x0)
        runtime/map_faststr.go:116 +0x4a5 fp=0xc0023737d8 sp=0xc002373768 pc=0x7a6ec5
github.com/emqx/kuiper/xsql.Message.Value(0xc000ccc3c0, 0xc000d0d138, 0x4, 0x7f24fcd13330, 0x10, 0x18)
        github.com/emqx/kuiper/xsql/ast.go:676 +0xc5 fp=0xc002373860 sp=0xc0023737d8 pc=0xed03a5
github.com/emqx/kuiper/xsql.(*Tuple).Value(0xc002e245a0, 0xc000d0d138, 0x4, 0x0, 0x203000, 0x1)
        github.com/emqx/kuiper/xsql/ast.go:732 +0x45 fp=0xc0023738a0 sp=0xc002373860 pc=0xed0965
github.com/emqx/kuiper/xsql.multiValuer.Value(0xc00179d340, 0x4, 0x4, 0xc000d0d138, 0x4, 0xc0023739a0, 0x7a027b, 0xc003a$
        github.com/emqx/kuiper/xsql/ast.go:1119 +0x72 fp=0xc0023738f0 sp=0xc0023738a0 pc=0xed4912
github.com/emqx/kuiper/xsql.(*multiAggregateValuer).Value(0xc00179d380, 0xc000d0d138, 0x4, 0x7f24fcc74fd8, 0x110, 0x110)
        <autogenerated>:1 +0x5e fp=0xc002373940 sp=0xc0023738f0 pc=0xeff7de
github.com/emqx/kuiper/xsql.(*ValuerEval).Eval(0xc002373dd8, 0x141cb98, 0xc002373ae8, 0x6, 0xc003a5a628)
        github.com/emqx/kuiper/xsql/ast.go:1243 +0x889 fp=0xc002373a18 sp=0xc002373940 pc=0xed5929
github.com/emqx/kuiper/xstream/operators.project(0xc000d10f00, 0x2, 0x2, 0xc002373dd8, 0xc002e24500, 0x0, 0x12f2dca, 0x8)
        github.com/emqx/kuiper/xstream/operators/project_operator.go:107 +0x205 fp=0xc002373ba8 sp=0xc002373a18 pc=0x102$
github.com/emqx/kuiper/xstream/operators.(*ProjectOp).Apply(0xc00028b060, 0x1434fc8, 0xc000bfeaf0, 0x1258400, 0xc002fb9a$
        github.com/emqx/kuiper/xstream/operators/project_operator.go:48 +0x1dd9 fp=0xc002373e30 sp=0xc002373ba8 pc=0x102$
github.com/emqx/kuiper/xstream/nodes.(*UnaryOperator).doOp(0xc000c620a0, 0x1434fc8, 0xc000bfe4d0, 0xc000bfc060)
        github.com/emqx/kuiper/xstream/nodes/operations.go:109 +0x50c fp=0xc002373fc0 sp=0xc002373e30 pc=0xff7fac
runtime.goexit()
        runtime/asm_amd64.s:1371 +0x1 fp=0xc002373fc8 sp=0xc002373fc0 pc=0x801701
created by github.com/emqx/kuiper/xstream/nodes.(*UnaryOperator).Exec
        github.com/emqx/kuiper/xstream/nodes/operations.go:72 +0x1d3

It seems that the maps which cause the panic are the data and meta maps in the source code. To resolve this, before sending these maps over api.SourceTuple channel, I create a fresh copy of these maps, like following.

consumer <- api.NewDefaultSourceTuple(mapCopy(es.data), mapCopy(es.meta))

And my mapCopy function is defined as follows.

func mapCopy(m map[string]interface{}) map[string]interface{} {
    mn := make(map[string]interface{})
    for k,v := range m {
        if reflect.ValueOf(v).Kind() == reflect.Map {
            mn[k] = mapCopy(v.(map[string]interface{}))
        } else {
            mn[k] = v
        }
    }
    return mn
}

But the error still comes, is it a bug in the kuiper source or in my source plugin ?

ngjaying commented 3 years ago

@salmanazmat666 What do the rules look like? Could you please share an example? Thanks

salmanazmat666 commented 3 years ago

SELECT meta(t1->name) as message, collect(meta(t1->origin))[-1]/1000.0 as origin, sum(t1) as st1 from stream1 GROUP BY SlidingWindow(ss, 30)

There are 100 such rules for values t1, t2, t3 etc. And there are 100 streams, 1 stream for each rule.

Stream example are

create stream stream1 (t1 bigint) WITH (FORMAT="JSON", TYPE="edgex_custom") create stream stream2 (t2 bigint) WITH (FORMAT="JSON", TYPE="edgex_custom") create stream stream3 (t3 bigint) WITH (FORMAT="JSON", TYPE="edgex_custom") ... so on

ngjaying commented 3 years ago

The message is supposed to be immutabble. It is likely a problem of aliasing mechanism which try to modify the messages. Could you try to use rules which do no use alias?

SELECT meta(t1->name), collect(meta(t1->origin))[-1]/1000.0, sum(t1) from stream1 GROUP BY SlidingWindow(ss, 30)
salmanazmat666 commented 3 years ago

So the issue has disappeared after removing the aliases. But aliases are very important for formatting of the message, otherwise the message is not much readable, for example I get following result of the above mentioned rule.

[{"kuiper_field_0":1624338035800401,"meta":"t1","sum":596}]

This is not readable at all and not even deterministic. Can you please take this issue as a bug ?

ngjaying commented 3 years ago

@salmanazmat666 Thanks for verifying. I agree this is a bug. The alias already have several known issues and we need to look into them together as a whole and we are working on them.

Currently, you can use DataTemplate to format the output including rename those fields.

raycola commented 3 years ago

Hello @ngjaying , I have the same exception with 53 rules on the same stream... the exception raises usually randomly. I have a serious problem in production beacause kuiper crashs all the time... :-( but if I delete the edgex core data events kuiper restarts successfully.

Your solution doesn't work for me because I have this SQL:

"sql": "SELECT (max(AI3) - min(AI3)), collect(*)[1]->AI3, meta(id), meta(Origin) FROM edgex_events GROUP BY COUNTWINDOW(2, 1) FILTER( WHERE meta(device) = \"TAW_XP_4\" AND AI3 != nil ) HAVING (max(AI3) - min(AI3)) > 1000"

and I have this result:

 {
      "kuiper_field_0": 2000,
      "kuiper_field_1": 25000,
      "meta": "85c93b82-1996-4958-a0bf-7684a8458df9",
      "id": 5
    },

and I cannot distinguish between meta(id) and meta(Origin) and it overwrites both with meta...

if I use alias I have not this problem...

So I need to understand which edgex events or data cause this exception and I tried to change kuiper configuration with debug: "true" but after 36 hours the server drive ran out of space beacause there were two files one of 4GB (backup) and one of 2GB (current stream.log), the free space is 6GB at startup.

Then I tried to change following the guide here

This is docker-compose entry in edgex foundry environment:

rulesengine:
    container_name: edgex-kuiper
    depends_on:
    - app-service-rules
    environment:
      EDGEX__DEFAULT__PORT: 5566
      EDGEX__DEFAULT__PROTOCOL: tcp
      EDGEX__DEFAULT__SERVER: edgex-app-service-configurable-rules
      EDGEX__DEFAULT__SERVICESERVER: http://edgex-core-data:48080
      EDGEX__DEFAULT__TOPIC: events
      KUIPER__BASIC__CONSOLELOG: "true"
      KUIPER__BASIC__RESTPORT: 48075
      KUIPER__BASIC__DEBUG: "true"
      KUIPER__BASIC__ROTATETIME: "1"
      KUIPER__BASIC__MAXAGE: "1"
    hostname: edgex-kuiper
    image: emqx/kuiper:1.2.0-alpine
    volumes:
      - kuiper-data:/kuiper/data:z
    networks:
      edgex-network: {}
    ports:
    - 127.0.0.1:20498:20498/tcp
    - 127.0.0.1:48075:48075/tcp

This is kuiper.yaml

/kuiper/etc $ more kuiper.yaml
basic:
  consoleLog: true
  debug: true
  fileLog: true
  ip: 0.0.0.0
  maxAge: 72
  maxage: 1
  pluginHosts: https://www.emqx.io/downloads
  port: 20498
  prometheus: false
  prometheusPort: 20499
  restIp: 0.0.0.0
  restPort: 48075
  rotateTime: 24
  rotatetime: 1
rule:
  checkpointInterval: 300000
  qos: 0
  sendError: true
sink:
  cacheThreshold: 10
  cacheTriggerCount: 15
  disableCache: true

The maxAge and rotateTime did not overwrite!

What did I do wrong?

Any suggestion is appreciated

Best regards Ray

ngjaying commented 3 years ago

Hi Ray,

Regarding the config setting, this looks like a bug. As a workaround, you can edit the kuiper.yaml directly and restart the container instead of setting it be env.

For the alias problem, it should be fixed in the next release, we have a bet build lfedge/ekuiper:1.3.0-beta.0-alpine. However, the version will only supports EdgeX v2, I am wondering if you would like to upgrade.

raycola commented 3 years ago

Thanks @ngjaying

I resolved by external volume

    volumes:
      - kuiper-data:/kuiper/data:z
      - ./res/edgex-kuiper/kuiper.yaml:/kuiper/etc/kuiper.yaml:z