twitchscience / kinsumer

Native Go consumer for AWS Kinesis streams.
Other
134 stars 35 forks source link

Error using the same application name in two different streams #35

Closed abhijeetDunzo closed 5 years ago

abhijeetDunzo commented 5 years ago

I created consumers reading from two different streams and I am using the same application-name.

On starting the application I am getting this error intermittently:

InvalidArgumentException: StartingSequenceNumber 49598037874099682602785444260063985364880176723926188034 used in GetShardIterator on shard shardId-000000000000 in stream dummy under account 716421886079 is invalid because it did not come from this stream

StartConsuming gets called from main, which calls consumeStream with same application name but different stream name.

func StartConsuming(conf *config.Config, waitForKinesis *sync.WaitGroup) {

    waitForKinesis.Add(1)
    streamNameA := "logistics-executor"
    go consumeStreamA(streamNameA, conf.Application.Name, waitForKinesis)
    //
    //waitForKinesis.Add(1)
    //streamNameB := "dummy2"
    //go consumeStreamB(streamNameB, conf.Application.Name, waitForKinesis)
}
func consumeStreamB(streamName, applicationName string, waitForKinesis *sync.WaitGroup) {
    recordsB := make(chan []byte)
    k, wg := kinesis.Init(recordsB, streamName, applicationName, false)
    readFromStreamB(recordsB)
    logger.Log.Infof("Stopping K from stream B")
    k.Stop()

    logger.Log.Infof("waiting on wg from stream B")
    wg.Wait()
    logger.Log.Infof("done on wg from stream B")
    waitForKinesis.Done()

}

func readFromStreamB(records chan []byte) {
    logger.Log.Infof("Reading inside ReadFromStream B")
    sigc := make(chan os.Signal, 1)
    signal.Notify(sigc, syscall.SIGINT)

    for {
        //time.Sleep(2 * time.Second)
        select {
        case <-sigc:
            logger.Log.Infof("sigc readFromStreamB")
            return

        case record := <-records:
            logger.Log.Infof("ReadFromStream B %v\n", string(record))
            //logger.Log.Infof("perform business logic here B !! ")
        }
    }
}
garethlewin commented 5 years ago

Kinsumer isn't designed to handle two streams with the same application name. There is an issue #34 which when implemented might allow this but for now, the dynamo tables names are derived from just the application name, so two different streams would collide.

I recommend using different application names for now.

kunalkapadia commented 5 years ago

Thanks for the quick revert, @GarethLewin!