influxdata / docs-v2

InfluxData Documentation that covers InfluxDB Cloud, InfluxDB OSS 2.x, InfluxDB OSS 1.x, InfluxDB Enterprise, Telegraf, Chronograf, Kapacitor, and Flux.
https://docs.influxdata.com
MIT License
72 stars 262 forks source link

Migration from cloud to OSS successfully completes but does nothing #5384

Closed madchap closed 5 months ago

madchap commented 5 months ago

I've followed the instructions to migrate data from cloud to OSS. There is no error, but no data migrated either.

I am running:

image

image

Any hints as to where else I could look? Thank you.

sanderson commented 5 months ago

@madchap Can you provide your full migration task?

madchap commented 5 months ago

Hi @sanderson

The only thing modified was the migrationpiece:

migration = {
    start: 2022-01-01T00:00:00Z,
    stop: 2024-03-23T00:00:00Z,
    batchInterval: 30d,
    batchBucket: "migration",
    sourceHost: "https://eu-central-1-1.aws.cloud2.influxdata.com",
    sourceOrg: "5147xxxxxxxxxxxx",
    sourceToken: secrets.get(key: "INFLUXDB_CLOUD_TOKEN"),
    sourceBucket: "home_assistant",
    destinationBucket: "home_assistant_from_cloud",
}

I imported the task from https://github.com/influxdata/community-templates/tree/master/influxdb-cloud-oss-migration/

sanderson commented 5 months ago

From your screenshot, the migration is running, but the batches are all empty. Have you checked your task run logs to see if there might be any clues there?

madchap commented 5 months ago

Yea, nothing it seems. It starts the task (and seems to dump the whole script in the log) and ends within 1 second of starting with a "completed(success)" status and 0 row processed. I could see nothing in the container's log either.

sanderson commented 5 months ago

Can you try just copying the task code into the Data Explorer and try running it there. I wonder if an error is getting suppressed.

madchap commented 5 months ago

Thanks @sanderson for your time helping me out here.

Assuming I did things correctly, the following query when launched from my OSS instance:

(from(bucket: "migration")
    |> range(start: 2022-01-01T00:00:00Z)
    |> filter(fn: (r) => r._field == "batch_stop")
    |> filter(fn: (r) => r.srcOrg == "51478xxxxxxxx")
    |> filter(fn: (r) => r.srcBucket == "home_assistant")
    |> last()
    |> findRecord(fn: (key) => true, idx: 0))._value

returns the following error: error in query specification while starting program: this Flux script returns no streaming data. Consider adding a "yield" or invoking streaming functions directly, without performing an assignment.

I am sorry, I am a bit hopeless with flux at this time.

sanderson commented 5 months ago

That won't return anything. Copy the entire task script and run it. It will act the same way as a single task execution.

import "array"
import "experimental"
import "influxdata/influxdb/secrets"

// Configure the task
option task = {every: 5m, name: "Migrate data from InfluxDB Cloud"}

// Configure the migration
migration = {
    start: 2022-01-01T00:00:00Z,
    stop: 2024-03-23T00:00:00Z,
    batchInterval: 30d,
    batchBucket: "migration",
    sourceHost: "https://eu-central-1-1.aws.cloud2.influxdata.com",
    sourceOrg: "5147xxxxxxxxxxxx",
    sourceToken: secrets.get(key: "INFLUXDB_CLOUD_TOKEN"),
    sourceBucket: "home_assistant",
    destinationBucket: "home_assistant_from_cloud",
}

// batchRange dynamically returns a record with start and stop properties for
// the current batch. It queries migration metadata stored in the
// `migration.batchBucket` to determine the stop time of the previous batch.
// It uses the previous stop time as the new start time for the current batch
// and adds the `migration.batchInterval` to determine the current batch stop time.
batchRange = () => {
    _lastBatchStop =
        (from(bucket: migration.batchBucket)
            |> range(start: migration.start)
            |> filter(fn: (r) => r._field == "batch_stop")
            |> filter(fn: (r) => r.srcOrg == migration.sourceOrg)
            |> filter(fn: (r) => r.srcBucket == migration.sourceBucket)
            |> last()
            |> findRecord(fn: (key) => true, idx: 0))._value
    _batchStart =
        if exists _lastBatchStop then
            time(v: _lastBatchStop)
        else
            migration.start

    return {start: _batchStart, stop: experimental.addDuration(d: migration.batchInterval, to: _batchStart)}
}

// Define a static record with batch start and stop time properties
batch = {start: batchRange().start, stop: batchRange().stop}

// Check to see if the current batch start time is beyond the migration.stop
// time and exit with an error if it is.
finished =
    if batch.start >= migration.stop then
        die(msg: "Batch range is beyond the migration range. Migration is complete.")
    else
        "Migration in progress"

// Query all data from the specified source bucket within the batch-defined time
// range. To limit migrated data by measurement, tag, or field, add a `filter()`
// function after `range()` with the appropriate predicate fn.
data = () =>
    from(host: migration.sourceHost, org: migration.sourceOrg, token: migration.sourceToken, bucket: migration.sourceBucket)
        |> range(start: batch.start, stop: batch.stop)

// rowCount is a stream of tables that contains the number of rows returned in
// the batch and is used to generate batch metadata.
rowCount =
    data()
        |> count()
        |> group(columns: ["_start", "_stop"])
        |> sum()

// emptyRange is a stream of tables that acts as filler data if the batch is
// empty. This is used to generate batch metadata for empty batches and is
// necessary to correctly increment the time range for the next batch.
emptyRange = array.from(rows: [{_start: batch.start, _stop: batch.stop, _value: 0}])

// metadata returns a stream of tables representing batch metadata.
metadata = () => {
    _input =
        if exists (rowCount |> findRecord(fn: (key) => true, idx: 0))._value then
            rowCount
        else
            emptyRange

    return
        _input
            |> map(
                fn: (r) =>
                    ({
                        _time: now(),
                        _measurement: "batches",
                        srcOrg: migration.sourceOrg,
                        srcBucket: migration.sourceBucket,
                        dstBucket: migration.destinationBucket,
                        batch_start: string(v: batch.start),
                        batch_stop: string(v: batch.stop),
                        rows: r._value,
                        percent_complete:
                            float(v: int(v: r._stop) - int(v: migration.start)) / float(
                                    v: int(v: migration.stop) - int(v: migration.start),
                                ) * 100.0,
                    }),
            )
            |> group(columns: ["_measurement", "srcOrg", "srcBucket", "dstBucket"])
}

// Write the queried data to the specified InfluxDB OSS bucket.
data()
    |> to(bucket: migration.destinationBucket)

// Generate and store batch metadata in the migration.batchBucket.
metadata()
    |> experimental.to(bucket: migration.batchBucket)
madchap commented 5 months ago

Got it.

error calling function "die" @51:9-51:86: Batch range is beyond the migration range. Migration is complete.

I subsequently tried with parameters such as only 2024 range, down to a 1d range and various things in between, but the same result is returned.

So I thought of deleting the migration bucket, with all these failed attempts and it seems this is now passing that bar above.

After a few runs, it looks that at first glance, I've got all my data transfered!

Sorry for the noise and thanks so much for this task!

sanderson commented 5 months ago

No worries. I should probably add to notes to the instructions that if you need to reset the migration and rerun it, you need to delete the migration bucket and recreate it. The task reads batch data from the migration bucket to determine the status of the migration. If the migration has already run completely, each subsequent attempt will read the stored metadata and assume the migration is already done.