OpenTSDB / opentsdb

A scalable, distributed Time Series Database.
http://opentsdb.net
GNU Lesser General Public License v2.1
4.99k stars 1.25k forks source link

Make OpenTSDB more MR friendly #660

Open bobrik opened 8 years ago

bobrik commented 8 years ago

I came to the need to reindex OpenTSDB. I started with doing CopyTable, then reimplementing CopyTable on Spark, then thinking if I need something better. Now I want to try reindexing with different settings that cannot be changed after writing data: salting and metric / tag width.

It seems that doing map-reduce over OpenTSDB's data in TSDB's terms is a good idea. You feed data directly to the new OpenTSDB, save it in different format, do cool things in Spark, etc. However, there is no support for this currently from OpenTSDB's side.

I ended up doing a POC to iterate over tsdb table and emit metrics (ts + metric + tags + value):

    public static class ExportScanner {

        TSDB tsdb;

        Scanner scanner;

        public ExportScanner(TSDB tsdb, Scanner scanner) {
            this.tsdb = tsdb;
            this.scanner = scanner;
        }

        public void scan() throws Exception {
            ArrayList<ArrayList<KeyValue>> rows;
            while ((rows = scanner.nextRows().joinUninterruptibly()) != null) {
                System.out.println(rows.size());
                for (final ArrayList<KeyValue> row : rows) {
                    byte[] key = row.get(0).key();

                    String metric = Internal.metricName(tsdb, key);
                    long baseTime = Internal.baseTime(tsdb, key);
                    Map<String, String> tags = Internal.getTags(tsdb, key);

                    for (final KeyValue kv : row) {
                        byte[] qualifier = kv.qualifier();

                        if (qualifier.length == 2 || qualifier.length == 4 && Internal.inMilliseconds(qualifier)) {
                            final Internal.Cell cell = Internal.parseSingleValue(kv);
                            if (cell == null) {
                                throw new IllegalDataException("Unable to parse row: " + kv);
                            }

                            printDataPoint(metric, tags, parseCell(cell, baseTime));
                        } else {
                            // compacted column
                            final ArrayList<Internal.Cell> cells = Internal.extractDataPoints(kv);
                            for (Internal.Cell cell : cells) {
                                printDataPoint(metric, tags, parseCell(cell, baseTime));
                            }
                        }
                    }
                }
            }

            scanner.close();
        }

        protected Pair<Long, Number> parseCell(Internal.Cell cell, long baseTime) {
            return new Pair<>(cell.absoluteTimestamp(baseTime), cell.parseValue());
        }

        protected void printDataPoint(String metric, Map<String, String> tags, Pair<Long, Number> dp) {
            System.out.println(dp.getKey() + " " + metric + " " + tags.toString() + " " + dp.getValue());
        }
    }

Poking around Internal things and reading from HBase directly doesn't look like a good practice. I think that class with the similar functionality should be present in OpenTSDB itself. I want to be able to get something like this to iterate over:

// metric, tags, timestamp, value
JavaRDD<Tuple4<String, Map<String, String>, Long, Number>>
bobrik commented 8 years ago

Okay, I think I have what I need implemented locally:

        SparkConf sc = new SparkConf().setAppName("Whatever");
        sc.setMaster("local[24]");
        sc.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

        JavaSparkContext ctx = new JavaSparkContext(sc);

        Config config = new Config(false);
        config.overrideConfig("tsd.storage.hbase.zk_quorum", "myzk:2181");
        config.overrideConfig("tsd.storage.hbase.zk_basedir", "/hbase/metrics");
        config.overrideConfig("tsd.storage.hbase.data_table", "fallback-tsdb");
        config.overrideConfig("tsd.storage.hbase.uid_table", "fallback-tsdb-uid");
        config.overrideConfig("tsd.storage.enable_compaction", "false");

        Configuration hbaseConfiguration = new Configuration();
        hbaseConfiguration.setLong(TableInputFormat.SCAN_TIMERANGE_START, System.currentTimeMillis() - 86400 * 1 * 1000);
        hbaseConfiguration.setLong(TableInputFormat.SCAN_TIMERANGE_END, System.currentTimeMillis());
        hbaseConfiguration.setInt("hbase.client.scanner.caching", 1000);
        hbaseConfiguration.setInt("hbase.rpc.timeout", 86400 * 1000);
        hbaseConfiguration.setInt("hbase.client.scanner.timeout.period", 86400 * 1000);
        hbaseConfiguration.setInt("hbase.client.retries.number", 1000000);

        JavaRDD<OpenTSDBInput.DataPoint> rdd = OpenTSDBInput.rdd(ctx, config, hbaseConfiguration);

        List<OpenTSDBInput.DataPoint> dps = rdd.filter(x -> x.getMetric().startsWith("tsd.")).takeOrdered(50, new DataPointComparator());

        for (OpenTSDBInput.DataPoint dp : dps) {
            System.out.println(dp);
        }

But here's another issue that stops me from running it on Spark properly: OpenTSDB/asynchbase#99.

bobrik commented 8 years ago

Posting OpenTSDBInput if anyone needs it:

IDerr commented 7 years ago

@bobrik you should post an article online, great job.

IDerr commented 6 years ago

Hi @bobrik, if your problem has been resolved, could you please close this issue ?

Thanks :)