openzipkin / zipkin

Zipkin is a distributed tracing system
https://zipkin.io/
Apache License 2.0
17.01k stars 3.09k forks source link

Why is the data lost in elasticsearch ? #1141

Closed liangman closed 8 years ago

liangman commented 8 years ago

I used the zipkin, kafka, and elasticsearch for testing. The elasticsearch was only a node. It wasn't problem that the data was transferred from kafka to zipkin, but the data is lost from zipkin to elasticsearch. I wrote 500000 log-data to kafka, but it was only 212162 in the elasticsearch. The data:

[{"traceId":"ffffffffffffffff","name":"fermentum","id":"ffffffffffffffff","annotations":[{"timestamp":1466386794757000,"value":"sr","endpoint":{"serviceName":"semper","ipv4":"113.29.89.129","port":2131}},{"timestamp":1466386794757000,"value":"sagittis","endpoint":{"serviceName":"semper","ipv4":"113.29.89.129","port":2131}},{"timestamp":1466386794758000,"value":"montes","endpoint":{"serviceName":"semper","ipv4":"113.29.89.129","port":2131}},{"timestamp":1466386794758000,"value":"augue","endpoint":{"serviceName":"semper","ipv4":"113.29.89.129","port":2131}},{"timestamp":1466386794759000,"value":"malesuada","endpoint":{"serviceName":"semper","ipv4":"113.29.89.129","port":2131}},{"timestamp":1466386794760000,"value":"ss","endpoint":{"serviceName":"semper","ipv4":"113.29.89.129","port":2131}}],"binaryAnnotations":[{"key":"mollis","value":"hendrerit","endpoint":{"serviceName":"semper","ipv4":"113.29.89.129","port":2131}}]}]

"ffffffffffffffff" was replaced in "1-5000000"; It was no regular!!!

codefromthecrypt commented 8 years ago

there's a "/metrics" endpoint. It would show how many spans were accepted by kafka on the zipkin-server side. can you send relevant output from that?

also, I don't understand what you mean by

"ffffffffffffffff" was replaced in "1-5000000";

do you mean that when you post a span where the id is all bits set, it comes back not in hex?

liangman commented 8 years ago

I set id from 1-500000 in hex. I think that the index have a conflict.

liangman commented 8 years ago

The lost data is different for storing in the elasticsearch every, when I wrote data in the kafka.

codefromthecrypt commented 8 years ago

wondering if this is to do with the data being older? Ex. if I post this, I have to set lookback to a relatively high value to see it.

$ curl -s localhost:9411/api/v1/spans -X POST -H "Content-Type: application/json" --data '[
  {
    "traceId": "ffffffffffffffff",
    "name": "fermentum",
    "id": "ffffffffffffffff",
    "annotations": [
      {
        "timestamp": 1466386794757000,
        "value": "sr",
        "endpoint": {
          "serviceName": "semper",
          "ipv4": "113.29.89.129",
          "port": 2131
        }
      },
      {
        "timestamp": 1466386794757000,
        "value": "sagittis",
        "endpoint": {
          "serviceName": "semper",
          "ipv4": "113.29.89.129",
          "port": 2131
        }
      },
      {
        "timestamp": 1466386794758000,
        "value": "montes",
        "endpoint": {
          "serviceName": "semper",
          "ipv4": "113.29.89.129",
          "port": 2131
        }
      },
      {
        "timestamp": 1466386794758000,
        "value": "augue",
        "endpoint": {
          "serviceName": "semper",
          "ipv4": "113.29.89.129",
          "port": 2131
        }
      },
      {
        "timestamp": 1466386794759000,
        "value": "malesuada",
        "endpoint": {
          "serviceName": "semper",
          "ipv4": "113.29.89.129",
          "port": 2131
        }
      },
      {
        "timestamp": 1466386794760000,
        "value": "ss",
        "endpoint": {
          "serviceName": "semper",
          "ipv4": "113.29.89.129",
          "port": 2131
        }
      }
    ],
    "binaryAnnotations": [
      {
        "key": "mollis",
        "value": "hendrerit",
        "endpoint": {
          "serviceName": "semper",
          "ipv4": "113.29.89.129",
          "port": 2131
        }
      }
    ]
  }
]'
$ curl -s 'localhost:9411/api/v1/traces?serviceName=semper&lookback=5000000000'|jq .
codefromthecrypt commented 8 years ago

I started a local server against elasticsearch and got the same output. ex the trace id returned the same as if it were not using elasticsearch

$ SELF_TRACING_ENABLED=false STORAGE_TYPE=elasticsearch java -jar zipkin.jar 
codefromthecrypt commented 8 years ago

@liangman we need some way of reproducing the problem, so maybe verify versions and see if you can reproduce something using POST like above. Our tests run latest zipkin against elasticsearch 2.2.1

liangman commented 8 years ago

Ok, It is normal for using "POST", but why is the data lost for using "kafka + ES"?

codefromthecrypt commented 8 years ago

great... glad to see progress. First step is to make sure that when you say POST you mean POST + ES. That way, there's only one variable changing, the transport. After you run your scenario, store the collector metrics, which should show how many messages were sent, if any were dropped etc.

ex. http://localhost:9411/metrics

https://github.com/openzipkin/zipkin/tree/master/zipkin-server#collector

Once you verify this, change only the transport variable and run the same scenario (i.e. report using Kafka, not HTTP).

look at the /metrics endpoint and compare the kafka stats with the http stats from the prior pass. you can also run the server with the argument --logging.level.zipkin=DEBUG commandline argument

https://github.com/openzipkin/zipkin/tree/master/zipkin-server#logging

You might see dropped spans or dropped messages in the metrics output, and you might see exceptions in the log output. This is how we can start progressing from here.

liangman commented 8 years ago

Of course, it was the "POST + ES"; According to what you said, I find that it has the same result. kafka: "counter.zipkin_collector.messages.kafka":100,"gauge.zipkin_collector.message_spans.kafka":1.0,"gauge.zipkin_collector.message_bytes.kafka":4068.0,"counter.zipkin_collector.spans.kafka":100,"counter.zipkin_collector.bytes.kafka":406770,"httpsessions.max":-1,"httpsessions.active":0} http: "gauge.zipkin_collector.message_bytes":1569.0,"counter.zipkin_collector.spans":100,"gauge.zipkin_collector.message_spans":1.0,"counter.zipkin_collector.messages.http":100,"counter.zipkin_collector.bytes":156870,"gauge.response.api.v1.spans":2.0,"counter.status.202.api.v1.spans":100,"httpsessions.max":-1,"httpsessions.active":0} And I run the zipkin with the argument --logging.level.zipkin=DEBUG, but it is normal.

liangman commented 8 years ago

I had sent the same 100 data with kafka and http. kafka + es: paas@PaasAPMBootstrap:/var/paas$ curl -XGET '129.188.37.108:9200/_cat/indices?v' health status index pri rep docs.count docs.deleted store.size pri.store.size yellow open zipkin-2016-06-20 5 1 114 0 8.8kb 8.8kb

http+es: paas@PaasAPMBootstrap:/var/paas$ curl -XGET '129.188.37.108:9200/_cat/indices?v' health status index pri rep docs.count docs.deleted store.size pri.store.size yellow open zipkin-2016-06-20 5 1 200 0 20.4kb 20.4kb

codefromthecrypt commented 8 years ago

looking at your metrics output, it seems you aren't running the latest version of zipkin (counter.zipkin_collector.bytes should have read counter.zipkin_collector.bytes.http). I don't think this impacts your issue, but it would be less distracting to use the same version of code (latest is 1.1.5).

One thing that seems odd is that the cumulative bytes collected from http(157k) are less than the cumulative bytes collected from kafka (407k). Are you using the same encoding for both? the byte count is after any decompression, so I'd expect figures to be similar...

Regardless, if a scenario of only 100 spans can create the concern, it seems small enough to be something myself or someone else could run with ease.

do you mind posting your script somewhere so that I can try it?

liangman commented 8 years ago

senddata.sh:

#!/bin/bash
TIMESTAMP=$(node -e 'console.log(new Date().getTime())')000
curl -s localhost:9411/api/v1/spans -X POST -H "Content-Type: application/json" --data '[{
    "traceId": "'${1}'",
    "name": "fermentum",
    "id": "'${1}'",
    "annotations": [
      {
        "timestamp": '${TIMESTAMP}',
        "value": "sr",
        "endpoint": {
          "serviceName": "semper",
          "ipv4": "113.29.89.129",
          "port": 2131
        }
      },
      {
        "timestamp": '${TIMESTAMP}',
        "value": "sagittis",
        "endpoint": {
          "serviceName": "semper",
          "ipv4": "113.29.89.129",
          "port": 2131
        }
      },
      {
        "timestamp": '${TIMESTAMP}',
        "value": "montes",
        "endpoint": {
          "serviceName": "semper",
          "ipv4": "113.29.89.129",
          "port": 2131
        }
      },
      {
        "timestamp": '${TIMESTAMP}',
        "value": "augue",
        "endpoint": {
          "serviceName": "semper",
          "ipv4": "113.29.89.129",
          "port": 2131
        }
      },
      {
        "timestamp": '${TIMESTAMP}',
        "value": "malesuada",
        "endpoint": {
          "serviceName": "semper",
          "ipv4": "113.29.89.129",
          "port": 2131
        }
      },
      {
        "timestamp": '${TIMESTAMP}',
        "value": "ss",
        "endpoint": {
          "serviceName": "semper",
          "ipv4": "113.29.89.129",
          "port": 2131
        }
      }
    ],
    "binaryAnnotations": [
      {
        "key": "mollis",
        "value": "hendrerit",
        "endpoint": {
          "serviceName": "semper",
          "ipv4": "113.29.89.129",
          "port": 2131
        }
      }
    ]
  }]'
liangman commented 8 years ago

start_send.sh:

#!/bin/bash

i=1

while [ $i -lt $1 ]
        do
        echo $i
        ./senddata.sh `printf "%x" $i`
        let "i=${i}+1"
done
liangman commented 8 years ago

./start_send.sh 101 This is the script of "post + kafka".

liangman commented 8 years ago

I use the java for writing the data in kafka, so I don't know how i post it. But I can write the script with the python. Please wait a moment.

codefromthecrypt commented 8 years ago

maybe you can send the same json you send via http using something like this?

$ kafka-console-producer.sh --broker-list $ADVERTISED_HOST:9092 --topic zipkin
[{"traceId":"1","name":"bang","id":"2","timestamp":1234,"binaryAnnotations":[{"key":"lc","value":"bamm-bamm","endpoint":{"serviceName":"flintstones","ipv4":"127.0.0.1"}}]}]

https://github.com/openzipkin/zipkin/tree/master/zipkin-collector/kafka#json

liangman commented 8 years ago

senddatatokafka.py:

#!/bin/python
import time
import os

data=data="""[{
"traceId": "%x",
"name": "fermentum",
"id": "%x",
"annotations": [
{
"timestamp": %i,
"value": "sr",
"endpoint": {
"serviceName": "semper",
"ipv4": "113.29.89.129",
"port": 2131
}
},
{
"timestamp": %i,
"value": "sagittis",
"endpoint": {
"serviceName": "semper",
"ipv4": "113.29.89.129",
"port": 2131
}
},
{
"timestamp": %i,
"value": "montes",
"endpoint": {
"serviceName": "semper",
"ipv4": "113.29.89.129",
"port": 2131
}
},
{
"timestamp": %i,
"value": "augue",
"endpoint": {
"serviceName": "semper",
"ipv4": "113.29.89.129",
"port": 2131
}
},
{
"timestamp": %i,
"value": "malesuada",
"endpoint": {
"serviceName": "semper",
"ipv4": "113.29.89.129",
"port": 2131
}
},
{
"timestamp": %i,
"value": "ss",
"endpoint": {
"serviceName": "semper",
"ipv4": "113.29.89.129",
"port": 2131
}
}
],
"binaryAnnotations": [
{
"key": "mollis",
"value": "hendrerit",
"endpoint": {
"serviceName": "semper",
"ipv4": "113.29.89.129",
"port": 2131
}
}
]
}]"""
def main():
  count = 100  
  data1=data.replace(' ', '').replace('\n', '')
  cmdp = r'./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic zipkin'
  pipp = os.popen(cmdp, 'w')
  i = 0
  while i < count:
    i += 1
    timestamp = time.time() * 10 ** 6
    pipp.write(data1%(i, i, timestamp, timestamp, timestamp, timestamp, timestamp, timestamp) + "\r\n")
    #print data1%(i, i, timestamp, timestamp, timestamp, timestamp, timestamp, timestamp)
  print 'finsh!'
  pipp.close()

if __name__ == '__main__':
  main()
python senddatatokafka.py
liangman commented 8 years ago

Beause I remove the " " and "\n" for sending to kafka. I think that you can try it with the script.

codefromthecrypt commented 8 years ago

OK so I've verified that with the following setup, I get readbacks between 58 and 100 spans when using the kafka script vs the http one which routinely reads back 100.

what I do, is run the scenarios below while kafka is left up, but elasticsearch is cleaned between runs


Where below instructions run kafka and ES

# start kafka
$ curl -SL http://www.us.apache.org/dist/kafka/0.8.2.2/kafka_2.11-0.8.2.2.tgz | tar xz
$ nohup bash -c "cd kafka_* && bin/zookeeper-server-start.sh config/zookeeper.properties >/dev/null 2>&1 &"
$ nohup bash -c "cd kafka_* && bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &"

# start ES
$ curl -SL https://download.elasticsearch.org/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.2.1/elasticsearch-2.2.1.tar.gz | tar xz
$ elasticsearch-*/bin/elasticsearch -d > /dev/null

And.. I start zipkin-server like so..

SELF_TRACING_ENABLED=false KAFKA_ZOOKEEPER=localhost:2181 STORAGE_TYPE=elasticsearch java -jar zipkin.jar --logging.level.zipkin=DEBUG

HTTP

When I run the HTTP test like this:

for i in {1..100}; do ./senddata.sh `printf "%x" $i`; done

I get these collector metrics:

  "gauge.zipkin_collector.message_spans.http": 1,
  "counter.zipkin_collector.spans.http": 100,
  "gauge.zipkin_collector.message_bytes.http": 1569,
  "counter.zipkin_collector.messages.http": 100,
  "counter.zipkin_collector.bytes.http": 156870,

And the api count looks correct:

$ curl -s 'localhost:9411/api/v1/traces?lookback=500000000&limit=100'|jq '. | length'
100

Kafka

When I run the Kafka test like this:

$ python senddatatokafka.py 
Java HotSpot(TM) 64-Bit Server VM warning: Option UseParNewGC was deprecated in version 9.0 and will likely be removed in a future release.
[2016-06-25 14:11:06,504] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
finsh!
[2016-06-25 14:11:06,676] WARN Error while fetching metadata [{TopicMetadata for topic zipkin -> 
No partition metadata for topic zipkin due to kafka.common.LeaderNotAvailableException}] for topic [zipkin]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2016-06-25 14:11:06,684] WARN Error while fetching metadata [{TopicMetadata for topic zipkin -> 
No partition metadata for topic zipkin due to kafka.common.LeaderNotAvailableException}] for topic [zipkin]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2016-06-25 14:11:06,684] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: zipkin (kafka.producer.async.DefaultEventHandler)

I get these collector metrics:

  "gauge.zipkin_collector.message_bytes.kafka": 903,
  "counter.zipkin_collector.bytes.kafka": 90270,
  "counter.zipkin_collector.spans.kafka": 100,
  "gauge.zipkin_collector.message_spans.kafka": 1,
  "counter.zipkin_collector.messages.kafka": 100,

And the api count looks correct sometimes, and not others (always the stats look the same):

$ curl -s 'localhost:9411/api/v1/traces?lookback=500000000&limit=100'|jq '. | length'
100
codefromthecrypt commented 8 years ago

NEXT STEP:

one difference between in-memory storage and ES storage is that the former doesn't do anything asynchronously. We should validate that this scenario against Cassandra, too (as it also uses guava futures).

liangman commented 8 years ago

en, I will try it again following you step.

codefromthecrypt commented 8 years ago

Might be an issue in cassandra, too, but looks like #1142 is blocking my ability to use the normal readback (only returns 10-16)

liangman commented 8 years ago

No, when I send 100 data to the kafka, all data is written in cassandra. But I view 10 data from the page of zipkin. Maybe there is a bug in the code...

codefromthecrypt commented 8 years ago

@liangman I edited the comment for the http script. can you edit the one for kafka and make sure that timestamps are reset each time (using epoch micros)?

codefromthecrypt commented 8 years ago

NEXT STEP:

See the behavior when the kafka script reports spans with unique timestamps. For example TIMESTAMP=$(node -e 'console.log(new Date().getTime())')000. I don't really expect this to make a difference, but we ought to be consistent.

A step after that would be to instrument ElasticsearchSpanConsumer in some way that we can track the futures (possibly ensuring the result has the correct numberOfActions() etc). This might be hard to track down, but at least the scenario is repeatable.

ps I'm offline likely the rest of the day, but might look into this tomorrow.

codefromthecrypt commented 8 years ago

sure.. I'd like you to edit your comment here https://github.com/openzipkin/zipkin/issues/1141#issuecomment-228511204

in the span you are generating in python, please make it have timestamps according to current system time.

That reduces the work needed when querying and also ttl considerations. For example, you can look at how I edited the http script.

https://github.com/openzipkin/zipkin/issues/1141#issuecomment-228507867

after that you can try to troubleshoot ElasticsearchSpanConsumer by customizing the class, building and running locally. https://github.com/openzipkin/zipkin/tree/master/zipkin-server#running-locally For example, you could add print statements etc.

If you aren't familiar enough to do that, you'd likely need to wait until I have more time to help (or someone else does).

liangman commented 8 years ago

I have edited the script again. It may meet your requirements.

codefromthecrypt commented 8 years ago

thanks. will take a look today

On Sat, Jun 25, 2016 at 5:19 PM, liangman notifications@github.com wrote:

I have edited the script again. It may meet your requirements.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/openzipkin/zipkin/issues/1141#issuecomment-228525872, or mute the thread https://github.com/notifications/unsubscribe/AAD616_seZUf5vy0cjj1plPDalYGfOJeks5qPPKWgaJpZM4I9cMB .

codefromthecrypt commented 8 years ago

Update:

Adding a 100ms sleep between kafka messages seems to avoid this issue.. There's no errors, but something seems to drop when processing concurrently.

codefromthecrypt commented 8 years ago

Note: synchronizing ElasticsearchSpanConsumer doesn't make the problem go away.. there's something else going on that sleeping 100ms between creating each future avoids.

codefromthecrypt commented 8 years ago

I've found a workaround. This happens when a bulk request is used for a single request (only 1 span). When I special-case to not use bulk requests when there's only 1 span, the problem disappears.

I've two remedies:

We need to create a soak test at some point, too, as the special-casing may not be fixing the root cause, even if it helps. cc @anuraaga

prat0318 commented 8 years ago

@adriancole i am a bit confused. How come the way messages from kafka are read can affect something specific to ElasticSearch. If i am not wrong, the flow will be kafka -> collector -> ES/C. From what i read, message loss is only seen for ES and not C. But the transport (kafka/http) work is done once the data is fetched via collector. So how is the issue related to kafka batching plus ES storage?

codefromthecrypt commented 8 years ago

From what i read, message loss is only seen for ES and not C. But the transport (kafka/http) work is done once the data is fetched via collector. So how is the issue related to kafka batching plus ES storage?

I think http only worked because the test is slower. For example, sleeping 100ms in kafka loop also succeeded.

TL;DR; I would recommend the bundling feature to anyone who seems like they are writing tests or instrumentation. It isn't about this issue specifically, more about encouraging supportable practice.

Buffering and bundling spans ends up using AsyncSpanConsumer.accept(List...) as it was written to be used. At the moment, the only way to control the count of spans stored at a time by zipkin is to change the instrumentation/reporters to send more than one span per message. I recall a discussion of adding a buffering layer internally to the kafka code, but that didn't go anywhere.

We've regularly encountered issues with not bundling across different transports and storage backends.. to the point where we changed the standard format to be a list (the storage api was always a list even in scala). You spent a lot of time learning that bundling helps a few months back, but this isn't a c* only concern. ElasticSearch was written with the assumption that storing lists is the common case.. else it wouldn't have used Bulk operations in the first place. This is the same thing with MySQL and likely will end up the way for C* at some point. We really ought to encourage bundling as a standard feature for reporting spans regardless if the bundle size policy will be transport-specific.

The other option is to see a practice, like dumping many messages at the same time, and say nothing. That person might go production etc assuming span-per-message is fine.. to a point where it is hard to change their instrumentation. Maybe they never considered bundling at all. Would they have the experience and time you did to troubleshoot and refactor late in deployment? Would someone be available for free OSS support for several days? Would folks want to hack zipkin to buffer on behalf of them? Would that debate finish quick enough and with the desired outcome to resolve the issue? Maybe to all, but I'd rather raise a flag about a known stick then get smacked with it later.

Truth is, we can't count on volunteers to do free support, design and dev work on-demand.. so we have to work in ways that are likely to use the limited time we have to help the highest amount of users. When folks have bundling in mind from the beginning, it can be adjusted when they get into a support problem, or in their test scenario. They can solve or work around more problems without us.

codefromthecrypt commented 8 years ago

here's the workaround.. when merged it should be testable from snapshot (of course if you are in a position to build the branch, please do

https://github.com/openzipkin/zipkin/pull/1146

liangman commented 8 years ago

Ok, I have updated the file ElasticsearchSpanConsumer.java.

count = 250

But when I set the count of log-data for sending the kafka, there is an Wranning here:

2016-06-27 11:33:08.106  WARN 59971 --- [[listener][T#3]] zipkin.collector.kafka.KafkaCollector    : Cannot store spans [00000000000000e3.00000000000000e3<:00000000000000e3] due to EsRejectedExecutionException(rejected execution of org.elasticsearch.transport.TransportService$4@53793b3c on EsThreadPoolExecutor[index, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@33aec4eb[Running, pool size = 4, active threads = 4, queued tasks = 200, completed tasks = 671]])

org.elasticsearch.common.util.concurrent.EsRejectedExecutionException: rejected execution of org.elasticsearch.transport.TransportService$4@53793b3c on EsThreadPoolExecutor[index, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@33aec4eb[Running, pool size = 4, active threads = 4, queued tasks = 200, completed tasks = 671]]
    at org.elasticsearch.common.util.concurrent.EsAbortPolicy.rejectedExecution(EsAbortPolicy.java:50) ~[elasticsearch-2.3.2.jar!/:2.3.2]
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) [na:1.8.0_73]
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) [na:1.8.0_73]
    at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.execute(EsThreadPoolExecutor.java:85) ~[elasticsearch-2.3.2.jar!/:2.3.2]
    at org.elasticsearch.transport.TransportService.sendLocalRequest(TransportService.java:346) ~[elasticsearch-2.3.2.jar!/:2.3.2]
    at org.elasticsearch.transport.TransportService.sendRequest(TransportService.java:310) ~[elasticsearch-2.3.2.jar!/:2.3.2]
    at org.elasticsearch.action.support.replication.TransportReplicationAction$ReroutePhase.performAction(TransportReplicationAction.java:463) ~[elasticsearch-2.3.2.jar!/:2.3.2]
    at org.elasticsearch.action.support.replication.TransportReplicationAction$ReroutePhase.doRun(TransportReplicationAction.java:444) ~[elasticsearch-2.3.2.jar!/:2.3.2]
    at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) ~[elasticsearch-2.3.2.jar!/:2.3.2]
    at org.elasticsearch.action.support.replication.TransportReplicationAction.doExecute(TransportReplicationAction.java:125) ~[elasticsearch-2.3.2.jar!/:2.3.2]
    at org.elasticsearch.action.index.TransportIndexAction.innerExecute(TransportIndexAction.java:134) ~[elasticsearch-2.3.2.jar!/:2.3.2]
    at org.elasticsearch.action.index.TransportIndexAction.doExecute(TransportIndexAction.java:118) ~[elasticsearch-2.3.2.jar!/:2.3.2]
    at org.elasticsearch.action.index.TransportIndexAction.doExecute(TransportIndexAction.java:65) ~[elasticsearch-2.3.2.jar!/:2.3.2]
    at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:70) ~[elasticsearch-2.3.2.jar!/:2.3.2]
    at org.elasticsearch.action.support.replication.TransportReplicationAction$OperationTransportHandler.messageReceived(TransportReplicationAction.java:238) ~[elasticsearch-2.3.2.jar!/:2.3.2]
    at org.elasticsearch.action.support.replication.TransportReplicationAction$OperationTransportHandler.messageReceived(TransportReplicationAction.java:235) ~[elasticsearch-2.3.2.jar!/:2.3.2]
    at org.elasticsearch.transport.netty.MessageChannelHandler.handleRequest(MessageChannelHandler.java:244) ~[elasticsearch-2.3.2.jar!/:2.3.2]
    at org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(MessageChannelHandler.java:114) ~[elasticsearch-2.3.2.jar!/:2.3.2]
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) ~[netty-3.10.5.Final.jar!/:na]
    at org.elasticsearch.common.netty.OpenChannelsHandler.handleUpstream(OpenChannelsHandler.java:75) ~[elasticsearch-2.3.2.jar!/:2.3.2]
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) ~[netty-3.10.5.Final.jar!/:na]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_73]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_73]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_73]
curl -s 'localhost:9411/api/v1/traces?lookback=500000000&limit=100'|jq '. | length'
100
liangman commented 8 years ago

The result in the ES:

health status index             pri rep docs.count docs.deleted store.size pri.store.size 
yellow open   zipkin-2016-06-27   5   1        414            0    147.1kb        147.1kb 
codefromthecrypt commented 8 years ago

@liangman so let's first make sure we know what does work and what doesn't. Are you saying 100 works, but if you send 250 messages at the same time, you get that rejected exception?

since the error includes a capacity of 200, again I'm wondering what would happen if instead of doing 250 spans w/ 250 messages, you instead batched them as 10 or more per message (ex the json array includes 10 items not 1).

ps here's the latest snapshot, if it helps http://oss.jfrog.org/oss-snapshot-local/io/zipkin/java/zipkin-server/1.1.6-SNAPSHOT/zipkin-server-1.1.6-20160627.030409-3-exec.jar

codefromthecrypt commented 8 years ago

Back to the error.. Right now the index queue depth is 200 and you've overrun it. That means requests are being accepted faster than they can be processed.

I think it will be useful to adjust the spans/message count to see if you can make storage more efficient with the same topology.

There's a lot of reasons it could be backed up, including the usual cpu, mem, network bottlenecks. It could be backed up from a slow cluster even.. We won't be able to troubleshoot what's the bottleneck in your environment, suffice to say you've hit a limit.

From an ES point of view, you can look at the tuning options there. Maybe start with this https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-threadpool.html and run experiments until you've found the key parameters that help.

anuraaga commented 8 years ago

FWIW, it is possible to adjust the default queue size through elasticsearch.yml or cluster settings if you need https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-threadpool.html

...though Adrian beat me to it ;)

But as Adrian said, it seems like you are having issues with a highly-synthetic workload, not a real-world one. If this synthetic workload is important to you, then I'd recommend trying to modify the cluster settings, but otherwise would recommend trying to send spans in batch instead (as the Java brave would). And keep in mind, Elasticsearch is IMO a medium-scale datastore. It's not going to get reasonable performance on a single node (this config is only for testing / development) and it's not unlikely you'd run into perf issues with it.

liangman commented 8 years ago

Maybe I have to consider changing the database from ES to cassandra. Because we forecast that there are at least 100 thousand log-datas to be sent to kafka per 1s in our micro service. So I will need a distributed Zipkin for consuming the data.

liangman commented 8 years ago

When I use the cassandra for storing, it spends about 100s for zipkin consuming 500000 data (per about 1kb).

codefromthecrypt commented 8 years ago

Thanks for the update.. just curious, was that with a single-node zipkin against a single-node cassandra cluster?

codefromthecrypt commented 8 years ago

I'm closing this issue for now as we've progressed from no-errors and dropped data, to errors that explain why data was dropped (overran queue length), and an ES storage config suggestion to improve that.

liangman commented 8 years ago

ok.

codefromthecrypt commented 8 years ago

@liangman by the way, thanks your script allowed us to repeat the problem and convert it from a mystery to an explanation. The next users will be better off from your efforts.

xqliang commented 7 years ago

I use the latest Zipkin(1.26) and with a single node elasticsearch(2.3.4) for storage(the default docker-zipkin configuration), and still encounter data lost.

Here is the test script(sendtozipkin.sh):

#!/bin/bash

function send_to_zipkin() {
    id=$1
    id2=$2
    millis=$(python -c 'import time; print "%d" % (time.time() * 1000 * 1000)')
    curl localhost:9411/api/v1/spans -X POST -H "Content-Type: application/json" --data '[{
        "traceId": "'${id}'",
        "name": "fermentum",
        "id": "'${id}'",
        "annotations": [
          {
            "timestamp": '${millis}',
            "value": "sr",
            "endpoint": {
              "serviceName": "semper",
              "ipv4": "113.29.89.129",
              "port": 2131
            }
          }
        ]
    }, {
        "traceId": "'${id}'",
        "name": "fermentum1",
        "id": "'${id2}'",
        "annotations": [
         {
            "timestamp": '${millis}',
            "value": "sr",
            "endpoint": {
              "serviceName": "semper",
              "ipv4": "113.29.89.129",
              "port": 2131
            }
          }
        ]
    }]'
}
i=0

while [ $i -lt $1 ]; do
    let "i=${i}+1"
    let "j=${i}+16777216"
    echo $i
    send_to_zipkin `printf "%016x" $i` `printf "%016x" $j`
done

Then send 100(50*2) messages to Zipkin:

$ ./sendtozipkin.sh 50

And some random messages are lost, I addES_HTTP_LOGGING=BODY (introduced in Zipkin 1.25 ) environment to docker-compose-elasticsearch.yaml, and saw these errors:

# grep 'errors\\":true' $(docker inspect --format='{{.LogPath}}' zipkin) | head -1
{"log":"2017-06-12 03:04:29.733  INFO 6 --- [41.133:9200/...] z.s.e.http.ElasticsearchHttpStorage      : 
{\"took\":1,\"errors\":true,\"items\":[{\"create\":{\"_index\":\"zipkin-2017-06-03\",\"_type\":\"span\",
\"_id\":\"AVyaQoUkPGYSeXOvPsRT\",\"status\":429,\"error\":{\"type\":\"es_rejected_execution_exception\",
\"reason\":\"rejected execution of org.elasticsearch.transport.TransportService$4@5742b77e on EsThreadPoolExecutor[bulk, queue capacity = 50, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@2fcaec02[Running, pool size = 8, active threads = 8, queued tasks = 50, completed tasks = 0]]\"}}},
{\"index\":{\"_index\":\"zipkin-2017-06-03\",\"_type\":\"servicespan\",
\"_id\":\"gamerebategift|timeraddpoolamounttask\",\"status\":429,\"error\":{\"type\":\"es_rejected_execution_exception\",
\"reason\":\"rejected execution of org.elasticsearch.transport.TransportService$4@70fc3bfe on EsThreadPoolExecutor[bulk, queue capacity = 50, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@2fcaec02[Running, pool size = 8, active threads = 8, queued tasks = 50, completed tasks = 0]]\"}}}]}\n",
"stream":"stdout","time":"2017-06-12T03:04:29.733933728Z"}

Check the default configuration:

$ curl -XGUT localhost:9200/_cluster/settings 
{"persistent":{},"transient":{}}

Change threadpool.bulk.queue_size to 500:

$ curl -XPUT localhost:9200/_cluster/settings -d '{"transient": {"threadpool.bulk.queue_size": 500}}'
{"acknowledged":true,"persistent":{},"transient":{"threadpool":{"bulk":{"queue_size":"500"}}}}

Then rerun the sendtozipkin.sh script, there were no data lost any more.

codefromthecrypt commented 7 years ago

What you are saying is that when there's an overload on the elasticsearch cluster, zipkin drops data. The only recourse besides tuning ES would be to buffer (or push-back if using kafka), right?

xqliang commented 7 years ago

Yes, at least Zipkin should log a WARNING/ERROR log if writing ES with errors returned while not setting ES_HTTP_LOGGING=BODY, so we can monitor the log for alerting.

codefromthecrypt commented 7 years ago

Yes, and I think Zipkin should log a WARNING/ERROR log if writing ES with errors returned while not setting ES_HTTP_LOGGING=BODY.

logging during collection, especially at warning or error level can fill logs and cause even more problems. Not sure this will be a good solution. Usually, you'd monitor for dropped messages or such by monitoring collector metrics. logging failures at debug or trace may be fine either way.

Notes on collector metrics: https://github.com/openzipkin/zipkin/blob/master/zipkin-server/README.md#collector

Example dashboard: https://github.com/openzipkin/docker-zipkin/pull/135#issuecomment-307609874