spring-cloud / spring-cloud-stream-binder-kafka

Spring Cloud Stream binders for Apache Kafka and Kafka Streams
Apache License 2.0
331 stars 302 forks source link

One of stream is not working when QueryableStoreRegistry autowired in more than one classes with same project #366

Closed hanrw closed 6 years ago

hanrw commented 6 years ago

I have two classes UserRegisterStoreController and GameTransactionStoreController in the same project But only one can got data random from queryableStoreRegistry and got exception Seems that QueryableStoreRegistry could bind one instance?

@RequestMapping("/game/transaction/store/")
public class GameTransactionStoreController {
  @Autowired private QueryableStoreRegistry queryableStoreRegistry;
  DateTimeFormatter formatter = DateTimeFormat.forPattern("MM-dd-yyyy-HH:mm");

  @GetMapping(path = "/{key}", produces = "application/json")
  public List games(@PathVariable String key) {
    List<PeriodGameTransaction> list = new ArrayList<>();
    WindowStoreIterator<PeriodGameTransactionStore> iterator =
        queryableStoreRegistry
            .getQueryableStoreType(
                GameTransactionAnalyticsBinding.GAME_TRANSACTION_1_MINS_WINDOW_MV,
                QueryableStoreTypes.<String, PeriodGameTransactionStore>windowStore())
            .fetch(key, 0, System.currentTimeMillis());
    while (iterator.hasNext()) {
      KeyValue<Long, PeriodGameTransactionStore> next = iterator.next();
      long windowTimestamp = next.key;
      PeriodGameTransactionStore periodGameTransactionStore = next.value;
      PeriodGameTransaction periodGameTransaction =
          transform(windowTimestamp, periodGameTransactionStore);
      log.info(
          "'game transaction' @ time "
              + LocalDateTime.fromDateFields(new Date(windowTimestamp)).toString(formatter)
              + " CashBet is "
              + periodGameTransaction);
      list.add(periodGameTransaction);
    }
    return list;
  }

@RestController
@RequestMapping("/user/count/")
public class UserRegisterStoreController {
  @Autowired private QueryableStoreRegistry queryableStoreRegistry;

  @GetMapping("/{date}")
  public Long count(@PathVariable String date) {
    return (Long)
        queryableStoreRegistry
            .getQueryableStoreType(
                UserRegisterAnalyticsBinding.USER_REGISTER_COUNT_MV,
                QueryableStoreTypes.keyValueStore())
            .get(date);
  }

Exception

2018-04-21 06:52:57 [user-ccd841e8-c91b-4f8a-a76c-098292b129fb-StreamThread-1] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [user-ccd841e8-c91b-4f8a-a76c-098292b129fb-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
2018-04-21 06:52:57 [user-ccd841e8-c91b-4f8a-a76c-098292b129fb-StreamThread-1] INFO  o.apache.kafka.streams.KafkaStreams - stream-client [user-ccd841e8-c91b-4f8a-a76c-098292b129fb]State transition from REBALANCING to RUNNING
2018-04-21 06:53:49 [reactor-http-nio-2] WARN  o.s.b.a.w.r.e.DefaultErrorWebExceptionHandler - Failed to handle request [GET http://localhost:8082/user/count/game/2018-04-19]: Response status 404
2018-04-21 06:53:53 [reactor-http-nio-2] ERROR o.s.b.a.w.r.e.DefaultErrorWebExceptionHandler - Failed to handle request [GET http://localhost:8082/user/count/2018-04-19]
org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, urcmv, may have migrated to another instance.
    at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:60)
    at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1043)
    at org.springframework.cloud.stream.binder.kafka.streams.QueryableStoreRegistry.getQueryableStoreType(QueryableStoreRegistry.java:47)
    at com.finnplay.streams.api.UserRegisterStoreController.count(UserRegisterStoreController.java:35)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.refle

btw if the classes are in different projects, no exceptions and it works.

Reproduce codes on https://github.com/MadeInChina/spring-cloud-streams-kafka-streams-example

hanrw commented 6 years ago

The issue with here. If we have two streams A stream store name a-store B stream store name b-store

We are going to query b-store but we got A stream in the loop but A-stream.store(b-store, storeType) not return null

public <T> T getQueryableStoreType(String storeName, QueryableStoreType<T> storeType) {

        for (KafkaStreams kafkaStream : kafkaStreams) {
            T store = kafkaStream.store(storeName, storeType);
            if (store != null) {
                return store;
            }
        }
        return null;
    }

instead of thow InvalidStateStoreException in QueryableStoreProvider

        if (allStores.isEmpty()) {
            throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
        }