mesos / kafka

Apache Kafka on Apache Mesos
Apache License 2.0
414 stars 140 forks source link

Basic broker metrics #175

Closed olegkovalenko closed 8 years ago

olegkovalenko commented 8 years ago

Motivation: Metrics are useful and important information for monitoring broker. Important metrics:

1 kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions number of under-replicated partitions (| ISR | < | all replicas |) (dangerous when 0). 2 kafka.controller:type=KafkaController,name=OfflinePartitionsCount number of partitions that don't have an active leader and are hence not writable or readable (dangerous when 0). 3 kafka.controller:type=KafkaController,name=ActiveControllerCount number of active controllers in the cluster. (dangerous when value is anything other than 1).

for more information on metrics see http://docs.confluent.io/1.0/kafka/monitoring.html

Changes:

./kafka-mesos.sh broker list
brokers:
  id: 0
  active: true
  state: running
  ...
  metrics:
    collected: 2016-01-18 11:53:36Z
    under-replicated-partitions: 0
    offline-partitions-count: 0
    active-controller-count: 1

  id: 1
  active: true
  state: running
  ...
  metrics:
    collected: 2016-01-18 11:53:36Z
    under-replicated-partitions: 0
    offline-partitions-count: 0
    active-controller-count: 0
joestein commented 8 years ago

@olegkovalenko thanks for the PR, overall it looks like a good first stab at a change here. Overall one thing you need to consider (as the metrics are important to 1) overall is the system healthy 2) information for humans and other machines to-do their job) is that we need to-do things like meta data requests too to ascertain health. If all of the brokers metadata isn't the same (and a leader election isn't going on which you can figure out via jmx metrics too) then we want to have that in the status too. We also want to be able to proxy in requests to specific brokers via the schedule with api calls as we look at how something like https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations can get used with the scheduler once on kafka trunk (some future work tbd but reasoning to make sure we put work in that eases upstream work back down eventually when it is there).

as for this specific pr changes, i think if you have a default (which can be in code) for the metrics with the ability for folks to add/remove from it via the CLI & API (assume they have some csv of metrics). We should run some tests there and see what the upper bound in and default that for folks so they don't go over it and get a warning if they try too (but still allow them to override).

so out of the box mesos/kafka will give some metrics (we may want to add another default or two like which is the controller) and then let users "change the map" being used of metrics. This would mean if you didn't have the defaults supplied in what you pass the scheduler (via the CLI or REST API) then those metrics wouldn't be there.

Also we should start to consider other options besides zookeeper for storing state information, we could add a storage plug-in so we can instead start storing the data in cassandra too (seperate issue but bringing it up because it is a case here where we are not using zookeeper in a way that is beneficial (more and more state on more and more nodes) lots of folks use shared (even though they shouldn't) ensembles so we have to be considerate of what we are writing and reading (and how fast, etc) with them.

joestein commented 8 years ago

We should also consider too another option (which has come up a few times since folks very often end up doing this in some fashion anyways) is to have the metrics and logs get written to topics (optionally to another broker cluster running (for now, with a different scheduler). In any case it should be a different zookeeper ensemble cluster. Brokers emit lots of logs and metrics and having them "just go" to a topic defined with a broker list as a feature flag on the CLI (logs == active with topic and broker list to supply (if no broker list then existing cluster) and metrics == active same (but likely different topics).

if we want to make it best integrated I would suggest too that write the metric and log data with an avro message like log line is pretty standard starting point for that.

we could put into the mesos/kafka code base something like

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package ly.stealth.kafka.metrics.yammer;

import com.yammer.metrics.core.*;
import com.yammer.metrics.reporting.AbstractPollingReporter;
import com.yammer.metrics.stats.Snapshot;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import ly.stealth.avro.JsonToAvroConverter;
import org.apache.avro.generic.IndexedRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.json.JSONException;
import org.json.JSONObject;

import java.util.Map;
import java.util.Properties;

/**
 * KafkaReporter produces all metrics data from a provided registry to Kafka topic with a given reporting interval
 * encoded as Avro.
 * Avro schema will be created dynamically to match JSON representation of the registry. LogLine field will also
 * be added to contain simple metadata about metrics. LogLines logtypeid will be 7 (metrics data) and the source
 * will be "metrics".
 */
public class KafkaReporter extends AbstractPollingReporter implements MetricProcessor<JSONObject> {
    private final KafkaProducer<String, IndexedRecord> producer;
    private final MetricPredicate predicate = MetricPredicate.ALL;
    private final JsonToAvroConverter converter;
    private final String topic;

    public KafkaReporter(MetricsRegistry metricsRegistry, Properties producerProperties, String topic) {
        super(metricsRegistry, "kafka-topic-reporter");

        producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);

        this.producer = new KafkaProducer<String, IndexedRecord>(producerProperties);
        this.converter = new JsonToAvroConverter();
        this.topic = topic;

    }

    public void run() {
        try {
            JSONObject object = new JSONObject();
            for (Map.Entry<MetricName, Metric> entry : getMetricsRegistry().allMetrics().entrySet()) {
                if (predicate.matches(entry.getKey(), entry.getValue())) {
                    entry.getValue().processWith(this, entry.getKey(), object);
                }
            }
            ProducerRecord<String, IndexedRecord> record = new ProducerRecord<String, IndexedRecord>(this.topic, converter.convert(object.toString()));
            this.producer.send(record);
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    @Override
    public void processGauge(MetricName name, Gauge<?> gauge, JSONObject context) {
        JSONObject jsonObject = new JSONObject();
        try {
            jsonObject.put("name", getName(name));
            jsonObject.put("type", "gauge");
            jsonObject.put("value", gauge.value());
            context.put(name.getName(), jsonObject);
        } catch (JSONException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void processCounter(MetricName name, Counter counter, JSONObject context) {
        JSONObject jsonObject = new JSONObject();
        try {
            jsonObject.put("name", getName(name));
            jsonObject.put("type", "counter");
            jsonObject.put("value", counter.count());
            context.put(name.getName(), jsonObject);
        } catch (JSONException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void processMeter(MetricName name, Metered meter, JSONObject context) {
        JSONObject jsonObject = new JSONObject();
        try {
            jsonObject.put("name", getName(name));
            jsonObject.put("type", "meter");
            JSONObject meterJsonObject = new JSONObject();

            addMeterInfo(meter, meterJsonObject);

            jsonObject.put("value", meterJsonObject);

            context.put(name.getName(), jsonObject);
        } catch (JSONException e) {
            throw new RuntimeException(e);
        }
    }

    private void addMeterInfo(Metered meter, JSONObject meterJsonObject) throws JSONException {
        meterJsonObject.put("rateUnit", meter.rateUnit());
        meterJsonObject.put("eventType", meter.eventType());
        meterJsonObject.put("count", meter.count());
        meterJsonObject.put("meanRate", meter.meanRate());
        meterJsonObject.put("oneMinuteRate", meter.oneMinuteRate());
        meterJsonObject.put("fiveMinuteRate", meter.fiveMinuteRate());
        meterJsonObject.put("fifteenMinuteRate", meter.fifteenMinuteRate());
    }

    @Override
    public void processHistogram(MetricName name, Histogram histogram, JSONObject context) {
        JSONObject jsonObject = new JSONObject();
        try {
            jsonObject.put("name", getName(name));
            jsonObject.put("type", "meter");
            JSONObject histogramJsonObject = new JSONObject();

            histogramJsonObject.put("min", histogram.min());
            histogramJsonObject.put("max", histogram.max());
            histogramJsonObject.put("mean", histogram.mean());
            histogramJsonObject.put("stdDev", histogram.stdDev());

            Snapshot snapshot = histogram.getSnapshot();
            JSONObject snapshotJsonObject = new JSONObject();
            snapshotJsonObject.put("median", snapshot.getMedian());
            snapshotJsonObject.put("75%", snapshot.get75thPercentile());
            snapshotJsonObject.put("95%", snapshot.get95thPercentile());
            snapshotJsonObject.put("98%", snapshot.get98thPercentile());
            snapshotJsonObject.put("99%", snapshot.get99thPercentile());
            snapshotJsonObject.put("99.9%", snapshot.get999thPercentile());

            histogramJsonObject.put("snapshot", snapshotJsonObject);

            jsonObject.put("value", histogramJsonObject);
            context.put(name.getName(), jsonObject);
        } catch (JSONException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void processTimer(MetricName name, Timer timer, JSONObject context) {
        JSONObject jsonObject = new JSONObject();
        try {
            jsonObject.put("name", getName(name));
            jsonObject.put("type", "meter");
            JSONObject timerJsonObject = new JSONObject();

            timerJsonObject.put("unit", timer.durationUnit());
            timerJsonObject.put("min", timer.min());
            timerJsonObject.put("max", timer.max());
            timerJsonObject.put("mean", timer.mean());
            timerJsonObject.put("stdDev", timer.stdDev());
            addMeterInfo(timer, timerJsonObject);

            Snapshot snapshot = timer.getSnapshot();
            JSONObject snapshotJsonObject = new JSONObject();
            snapshotJsonObject.put("median", snapshot.getMedian());
            snapshotJsonObject.put("75%", snapshot.get75thPercentile());
            snapshotJsonObject.put("95%", snapshot.get95thPercentile());
            snapshotJsonObject.put("98%", snapshot.get98thPercentile());
            snapshotJsonObject.put("99%", snapshot.get99thPercentile());
            snapshotJsonObject.put("99.9%", snapshot.get999thPercentile());

            timerJsonObject.put("snapshot", snapshotJsonObject);

            jsonObject.put("value", timerJsonObject);

            context.put(name.getName(), jsonObject);
        } catch (JSONException e) {
            throw new RuntimeException(e);
        }
    }

    private JSONObject getName(MetricName metricName) throws JSONException {
        String group = metricName.getGroup();
        String name = metricName.getName();
        String scope = metricName.getScope();
        String type = metricName.getType();
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("name", name);
        jsonObject.put("group", group);
        jsonObject.put("scope", scope);
        jsonObject.put("type", type);
        return jsonObject;
    }

    @Override
    public void shutdown() {
        try {
            super.shutdown();
        } finally {
            this.producer.close();
        }
    }
}

and then have the mesos/kafka code run in the broker on startup so we can then can set the properties on the broker for the metrics to be reported to the broker and cluster without having to need knowledge or take time to thread that together every single time you do a setup. I would take that and this

package ly.stealth.kafka.emitters;

import io.confluent.kafka.serializers.KafkaAvroSerializer;
import ly.stealth.avro.LogLine;
import org.apache.avro.generic.IndexedRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.ConfigException;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
import org.apache.log4j.spi.LoggingEvent;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * KafkaLogAppender produces all log events to Kafka topic encoded as Avro.
 * Avro schema is defined as follows:
 * {
 *     "type": "record",
 *     "name": "ly.stealth.avro.LogLine",
 *     "fields": [
 *         {
 *             "name": "line",
 *             "type": ["null", "string"],
 *             "default": null
 *         },
 *         {
 *             "name": "source",
 *             "type": ["null", "string"],
 *             "default": null
 *         },
 *         {
 *             "name": "tag",
 *             "type": [
 *                 "null",
 *                 {
 *                     "type": "map",
 *                     "values": "string"
 *                 }
 *             ],
 *             "default": null
 *         },
 *         {
 *             "name": "logtypeid",
 *             "type": ["null", "long"],
 *             "default": null
 *         },
 *         {
 *             "name": "timings",
 *             "type": [
 *                 "null",
 *                 {
 *                     "type": "map",
 *                     "values": "long"
 *                 }
 *             ],
 *             "default": null
 *         }
 *     ]
 * }
 *
 * "line" field will contain the actual log message.
 * "logtypeid" will be the log level:
 *      1 - Trace
 *      2 - Debug
 *      3 - Info
 *      4 - Warn
 *      5 - Error
 *      6 - Fatal
 * "source" will be the thread name that logged the message.
 * "tag" will contain 2 entries:
 *      "logger.name" - logger name, e.g. the one that is used in call Logger.getLogger()
 *      "location" - event location information, e.g. source code line that logged the message
 * "timings" will contain 1 entry:
 *      "emitted" - timestamp in milliseconds when the message was sent
 */
public class KafkaLogAppender extends AppenderSkeleton {
    public static final Long TraceLogTypeId = 1L;
    public static final Long DebugLogTypeId = 2L;
    public static final Long InfoLogTypeId = 3L;
    public static final Long WarnLogTypeId = 4L;
    public static final Long ErrorLogTypeId = 5L;
    public static final Long FatalLogTypeId = 6L;

    private static final Map<Level, Long> logLevels = new HashMap<Level, Long>();

    static {
        logLevels.put(Level.TRACE, TraceLogTypeId);
        logLevels.put(Level.DEBUG, DebugLogTypeId);
        logLevels.put(Level.INFO, InfoLogTypeId);
        logLevels.put(Level.WARN, WarnLogTypeId);
        logLevels.put(Level.ERROR, ErrorLogTypeId);
        logLevels.put(Level.FATAL, FatalLogTypeId);
    }

    private KafkaProducer<String, IndexedRecord> producer;

    private String topic;
    private String brokerList;
    private String schemaRegistryUrl;

    public KafkaLogAppender() {

    }

    public KafkaLogAppender(Properties props, String topic) {
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        this.producer = new KafkaProducer<String, IndexedRecord>(props);
        this.topic = topic;
    }

    @Override
    public void activateOptions() {
        if (this.brokerList == null)
            throw new ConfigException("The bootstrap servers property is required");

        if (this.schemaRegistryUrl == null)
            throw new ConfigException("The schema registry url is required");

        if (this.topic == null)
            throw new ConfigException("Topic is required");

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerList);
        props.put("schema.registry.url", this.schemaRegistryUrl);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);

        this.producer = new KafkaProducer<String, IndexedRecord>(props);
    }

    @Override
    protected void append(LoggingEvent event) {
        LogLine line = new LogLine();
        line.setLine(event.getMessage() == null ? null : event.getMessage().toString());
        line.setLogtypeid(logLevels.get(event.getLevel()));
        line.setSource(event.getThreadName());

        Map<CharSequence, Long> timings = new HashMap<CharSequence, Long>();
        timings.put("emitted", event.getTimeStamp());

        line.setTimings(timings);

        Map<CharSequence, CharSequence> tags = new HashMap<CharSequence, CharSequence>();
        tags.put("logger.name", event.getLoggerName());
        tags.put("location", event.getLocationInformation().fullInfo);

        line.setTag(tags);

        ProducerRecord<String, IndexedRecord> record = new ProducerRecord<String, IndexedRecord>(this.topic, line);
        producer.send(record);
    }

    @Override
    public void close() {
        this.producer.close();
    }

    @Override
    public boolean requiresLayout() {
        return false;
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public String getBrokerList() {
        return brokerList;
    }

    public void setBrokerList(String brokerList) {
        this.brokerList = brokerList;
    }

    public String getSchemaRegistryUrl() {
        return schemaRegistryUrl;
    }

    public void setSchemaRegistryUrl(String schemaRegistryUrl) {
        this.schemaRegistryUrl = schemaRegistryUrl;
    }
}

as recommended reporters to start with that we will want to start to evolve and work with others on too (maybe even) to help get some consensus around.

joestein commented 8 years ago

@olegkovalenko I chatted some with @dmitrypekar and he thinks the other items I brought up we can fit around more updates incrementally without breaking changes and still getting in new features. I am testing and review it again (as is he) will post back shortly with some feedback maybe another metric or two if makes sense, thanks.

dmitrypekar commented 8 years ago

I've tested locally and the solution works fine. Added 2 small comments about the code. After fixing them, I am ready to merge the code.

olegkovalenko commented 8 years ago

@dmitrypekar and @joestein thanks for feedback!

@dmitrypekar made changes according to comments

joestein commented 8 years ago

one more change and then +1 :+1: for merge sed 's/active-controller-count/is-active-controller/g' throughout it makes more sense as yes/no "is this broker a controller or not" when folks look at kafka for the first time to not confuse.

so looks like and internal change both thnx

vagrant@master:/vagrant$ ./kafka-mesos.sh broker list
brokers:
  id: 0
  active: true
  state: running
  resources: cpus:0.10, mem:256, heap:128, port:auto
  failover: delay:1m, max-delay:10m
  stickiness: period:10m, hostname:master
  task: 
    id: broker-0-037ff93c-9f52-499f-8ad4-93c9d2c8d928
    state: running
    endpoint: master:31000
  metrics: 
    collected: 2016-01-19 13:53:09Z
    under-replicated-partitions: 0
    offline-partitions-count: 0
    is-active-controller: 1

  id: 1
  active: true
  state: running
  resources: cpus:0.10, mem:256, heap:128, port:auto
  failover: delay:1m, max-delay:10m
  stickiness: period:10m, hostname:slave0
  task: 
    id: broker-1-925689c8-05a8-480d-a902-dfcbc9ff17a9
    state: running
    endpoint: slave0:31000
  metrics: 
    collected: 2016-01-19 13:53:05Z
    under-replicated-partitions: 0
    offline-partitions-count: 0
    is-active-controller: 0

  id: 2
  active: true
  state: running
  resources: cpus:0.10, mem:256, heap:128, port:auto
  failover: delay:1m, max-delay:10m
  stickiness: period:10m, hostname:slave0
  task: 
    id: broker-2-c86eaf0f-905e-4b5b-8db5-02cba80e80c2
    state: running
    endpoint: slave0:31001
  metrics: 
    collected: 2016-01-19 13:53:09Z
    under-replicated-partitions: 0
    offline-partitions-count: 0
    is-active-controller: 0
olegkovalenko commented 8 years ago

@joestein renamed active-controller-count according to your recommendation

@dmitrypekar or @joestein could you merge PR ? thanks in advance !