noctarius / timescaledb-event-streamer

timescaledb-event-streamer is a command line program to create a stream of CDC (Change Data Capture) TimescaleDB Hypertable events from a PostgreSQL installation running the TimescaleDB extension.
Apache License 2.0
39 stars 3 forks source link

Event streamer tries to read from dropped chunks #206

Open vidosits opened 5 months ago

vidosits commented 5 months ago

It seems to me that event streamer is trying to read from / work with dropped hypertable chunks, which of course it can't find.

CDC (Change Data Capture) for TimescaleDB Hypertable
timescaledb-event-streamer version 0.12.1-dev (git revision unknown; branch unknown)
Loading configuration file: config.toml
[2024/02/07T15:35:33.391] [INFO] [SystemCatalog] Selected tables for replication:  
[2024/02/07T15:35:33.391] [INFO] [SystemCatalog]   * [REDACTED] (type: Hypertable, replica identity: DEFAULT)  
[2024/02/07T15:35:33.391] [INFO] [Replicator] Discovered System Information:  
[2024/02/07T15:35:33.392] [INFO] [Replicator]   * PostgreSQL version 15.3  
[2024/02/07T15:35:33.392] [INFO] [Replicator]   * TimescaleDB version 2.10.3  
[2024/02/07T15:35:33.392] [INFO] [Replicator]   * PostgreSQL System Identity 7312038712204218402  
[2024/02/07T15:35:33.392] [INFO] [Replicator]   * PostgreSQL Timeline 13  
[2024/02/07T15:35:33.392] [INFO] [Replicator]   * PostgreSQL DatabaseName smartcitydb  
[2024/02/07T15:35:33.392] [INFO] [Replicator]   * PostgreSQL Types loaded 597  
[2024/02/07T15:35:33.392] [INFO] [FileStateStorage] Starting FileStateStorage at /tmp/statestorage.dat  
[2024/02/07T15:35:33.392] [INFO] [FileStateStorage] Loading FileStateStorage at /tmp/statestorage.dat  
[2024/02/07T15:35:35.285] [INFO] [ReplicationConnection] SystemId: 7312038712204218402, Timeline: 13, XLogPos: C0C/DAE6FC28, DatabaseName: [Redacted]  
[2024/02/07T15:35:37.294] [INFO] [ReplicationChannel] Reused replication slot: event_streamer_test  
[2024/02/07T15:35:38.154] [INFO] [FileStateStorage] Stopping FileStateStorage at /tmp/statestorage.dat  
[2024/02/07T15:35:38.154] [INFO] [FileStateStorage] Storing FileStateStorage at /tmp/statestorage.dat  
[2024/02/07T15:35:38.154] [INFO] [TaskDispatcher] TaskManager shutting down  
ERROR: relation "_timescaledb_internal._hyper_1_7_chunk" does not exist (SQLSTATE 42P01)

Checking chunks in timescaledb catalog:

select * from _timescaledb_catalog.chunk where hypertable_id = 1;
id hypertable_id schema_name table_name compressed_chunk_id dropped status osm_chunk
7 1 _timescaledb_internal _hyper_1_7_chunk 1 0 0
8 1 _timescaledb_internal _hyper_1_8_chunk 1 0 0
10 1 _timescaledb_internal _hyper_1_10_chunk 0 0 0
11 1 _timescaledb_internal _hyper_1_11_chunk 0 0 0

Checking the tables in _timescaledb_internal schema:

SELECT table_name FROM information_schema.tables WHERE table_schema='_timescaledb_internal' AND table_type='BASE TABLE' order by table_name asc;
table_name
_hyper_1_10_chunk
_hyper_1_11_chunk
_hyper_2_2_chunk
_hyper_2_9_chunk
_materialized_hypertable_2
bgw_job_stat
bgw_policy_chunk_stats
job_errors

My best idea for fixing this would be to:

does either of these make sense?

I've used the following patch and it worked, but I'm wondering if this is the right thing to do.

diff --git a/internal/systemcatalog/systemcatalog.go b/internal/systemcatalog/systemcatalog.go
index 6aea5ac..1655449 100644
--- a/internal/systemcatalog/systemcatalog.go
+++ b/internal/systemcatalog/systemcatalog.go
@@ -375,7 +375,9 @@ func (sc *systemCatalog) GetAllChunks() []systemcatalog.SystemEntity {
    chunkTables := make([]systemcatalog.SystemEntity, 0)
    for _, chunk := range sc.chunks {
        if sc.IsHypertableSelectedForReplication(chunk.HypertableId()) {
-           chunkTables = append(chunkTables, chunk)
+           if !chunk.Dropped() {
+               chunkTables = append(chunkTables, chunk)
+           }
        }
    }
    return chunkTables
noctarius commented 5 months ago

Hey @vidosits!

Thanks for the report. I think the change is perfectly fine. It'd be awesome if you can send a pull request for it. 🙏 If you feel ok adding a unit test for it, feel free, otherwise I'll do it 👍

vidosits commented 5 months ago

Thanks, I tried with the test, but I only got like this far before having had to work on work stuff :(

timescaledb-event-streamer/internal/systemcatalog/systemcatalog_test.go

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package systemcatalog

import (
    "github.com/jackc/pgx/v5"
    "github.com/noctarius/timescaledb-event-streamer/spi/config"
    "github.com/noctarius/timescaledb-event-streamer/spi/pgtypes"
    "github.com/noctarius/timescaledb-event-streamer/spi/systemcatalog"
    "github.com/stretchr/testify/assert"
    "testing"
)

func Test_Dropped_Chunks_Are_Excluded(
    t *testing.T,
) {
    var userConfig pgx.ConnConfig
    var testConfig config.Config

    systemCatalog, err := NewSystemCatalog(&testConfig, &userConfig, nil, nil, nil, nil, nil, nil)
    if err != nil {
        t.Fatalf("Couldn't create system catalog: %+v", err)
    }

    for i := 0; i < 10; i++ {
        systemcatalog.NewChunk(.. .. .., dropped = True),
    }

    chunks := systemCatalog.GetAllChunks()
    assert.Equal(t, false, chunks == nil)
}

func makeHypertable(
    id int32, schemaName, tableName string,
) *systemcatalog.Hypertable {

    return systemcatalog.NewHypertable(
        id,
        schemaName,
        tableName,
        "test",
        "test",
        nil,
        0,
        false,
        nil,
        nil,
        pgtypes.DEFAULT,
    )
}
noctarius commented 5 months ago

That's fine. I'll add the test. You took the "unit test" a bit strikter than I expected. The are very little real unit tests, since most tests require the database to be available. They're more like integration tests. There's a whole construct around those things.

Anyway, for a bit more understanding, it is possible to mock out quite a few things to retrieve data, but it's gonna be complicated and you'd have to mock all of the actual database access layer. It's easier to just build the test like this one. You will have to start and stop the streamer, and you need to make sure you have the state persistence working for the test.

But as I said, happy to build the test if you send a PR 👍

vidosits commented 5 months ago

Thanks! I did send #207 .

RE: Tests, you're of course completely right, I just didn't know how strictly you meant unit test :)

I was gonna go with something like this initally:

func (pts *PublicationTestSuite) Test_Dropped_Chunks_Are_Not_Processed() {
    testSink := testsupport.NewEventCollectorSink()

    publicationName := lo.RandomString(10, lo.LowerCaseLettersCharset)

    var tableName string
    chunksNotToBeProcessed := make([]string, 0)

    pts.RunTest(
        func(ctx testrunner.Context) error {
            existingChunks, publishedChunks, err := readAllAndPublishedChunks(ctx, tableName, publicationName)

            if err != nil {
                return err
            }

            *** somewhere around here we should be testing if the streamer has picked up dropped chunks or not ***
            return nil
        },

        testrunner.WithSetup(func(ctx testrunner.SetupContext) error {
            _, tn, err := ctx.CreateHypertable("ts", time.Hour,
                testsupport.NewColumn("ts", "timestamptz", false, true, nil),
                testsupport.NewColumn("val", "integer", false, false, nil),
            )
            if err != nil {
                return err
            }
            tableName = tn

            if _, err := ctx.Exec(context.Background(),
                fmt.Sprintf(
                    "INSERT INTO \"%s\" SELECT ts, ROW_NUMBER() OVER (ORDER BY ts) AS val FROM GENERATE_SERIES('2023-03-25 00:00:00'::TIMESTAMPTZ, '2023-03-25 23:59:59'::TIMESTAMPTZ, INTERVAL '1 minute') t(ts)",
                    tableName,
                ),
            ); err != nil {
                return err
            }

            if droppedChunks, err := ctx.Query(context.Background(),
                fmt.Sprintf(
                    "SELECT drop_chunks('%s', '2023-03-25 12:00:00'::TIMESTAMPTZ)",
                    tableName,
                ),
            ); err != nil {
                return err
            } else {
                for droppedChunks.Next() {
                    var chunkName string
                    if err := droppedChunks.Scan(&chunkName); err != nil {
                        return nil
                    }
                    chunksNotToBeProcessed = append(chunksNotToBeProcessed, chunkName)
                }
            }

            ctx.AddSystemConfigConfigurator(testSink.SystemConfigConfigurator)
            ctx.AddSystemConfigConfigurator(func(config *sysconfig.SystemConfig) {
                config.PostgreSQL.Publication.Name = publicationName
            })
            return nil
        }),
    )
}

but then I was like, wait does this go into replicator tests or somewhere else? oh maybe I just just test the func or whatever

anyways, I'll take a look once you've done it and then be more productive the next time :)

noctarius commented 5 months ago

The looks like a good start. You'd have to get the replicator started once, so that the publication and everything is set up and the persistence state file knows about the existing chunks. At that point you'd suspend the replicator, which will shut it down, you drop the chunk, and resume (or restart) the replicator. That should now fail, do to the state file knowing about a chunk which doesn't exist anymore (or doesn't work anymore).

At least that's my guess about what happens, but maybe that a second case which may fail 🫣