spring-cloud / spring-cloud-stream

Framework for building Event-Driven Microservices
http://cloud.spring.io/spring-cloud-stream
Apache License 2.0
997 stars 607 forks source link

One of stream is not working when QueryableStoreRegistry autowired in more than one classes with same project #1362

Closed hanrw closed 6 years ago

hanrw commented 6 years ago

I have two classes UserRegisterStoreController and GameTransactionStoreController in the same project But only one can got data random from queryableStoreRegistry and got exception Seems that QueryableStoreRegistry could bind one instance?

@RequestMapping("/game/transaction/store/")
public class GameTransactionStoreController {
  @Autowired private QueryableStoreRegistry queryableStoreRegistry;
  DateTimeFormatter formatter = DateTimeFormat.forPattern("MM-dd-yyyy-HH:mm");

  @GetMapping(path = "/{key}", produces = "application/json")
  public List games(@PathVariable String key) {
    List<PeriodGameTransaction> list = new ArrayList<>();
    WindowStoreIterator<PeriodGameTransactionStore> iterator =
        queryableStoreRegistry
            .getQueryableStoreType(
                GameTransactionAnalyticsBinding.GAME_TRANSACTION_1_MINS_WINDOW_MV,
                QueryableStoreTypes.<String, PeriodGameTransactionStore>windowStore())
            .fetch(key, 0, System.currentTimeMillis());
    while (iterator.hasNext()) {
      KeyValue<Long, PeriodGameTransactionStore> next = iterator.next();
      long windowTimestamp = next.key;
      PeriodGameTransactionStore periodGameTransactionStore = next.value;
      PeriodGameTransaction periodGameTransaction =
          transform(windowTimestamp, periodGameTransactionStore);
      log.info(
          "'game transaction' @ time "
              + LocalDateTime.fromDateFields(new Date(windowTimestamp)).toString(formatter)
              + " CashBet is "
              + periodGameTransaction);
      list.add(periodGameTransaction);
    }
    return list;
  }

@RestController
@RequestMapping("/user/count/")
public class UserRegisterStoreController {
  @Autowired private QueryableStoreRegistry queryableStoreRegistry;

  @GetMapping("/{date}")
  public Long count(@PathVariable String date) {
    return (Long)
        queryableStoreRegistry
            .getQueryableStoreType(
                UserRegisterAnalyticsBinding.USER_REGISTER_COUNT_MV,
                QueryableStoreTypes.keyValueStore())
            .get(date);
  }

Exception

2018-04-21 06:52:57 [user-ccd841e8-c91b-4f8a-a76c-098292b129fb-StreamThread-1] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [user-ccd841e8-c91b-4f8a-a76c-098292b129fb-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
2018-04-21 06:52:57 [user-ccd841e8-c91b-4f8a-a76c-098292b129fb-StreamThread-1] INFO  o.apache.kafka.streams.KafkaStreams - stream-client [user-ccd841e8-c91b-4f8a-a76c-098292b129fb]State transition from REBALANCING to RUNNING
2018-04-21 06:53:49 [reactor-http-nio-2] WARN  o.s.b.a.w.r.e.DefaultErrorWebExceptionHandler - Failed to handle request [GET http://localhost:8082/user/count/game/2018-04-19]: Response status 404
2018-04-21 06:53:53 [reactor-http-nio-2] ERROR o.s.b.a.w.r.e.DefaultErrorWebExceptionHandler - Failed to handle request [GET http://localhost:8082/user/count/2018-04-19]
org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, urcmv, may have migrated to another instance.
    at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:60)
    at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1043)
    at org.springframework.cloud.stream.binder.kafka.streams.QueryableStoreRegistry.getQueryableStoreType(QueryableStoreRegistry.java:47)
    at com.finnplay.streams.api.UserRegisterStoreController.count(UserRegisterStoreController.java:35)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.refle

btw if the classes are in different projects, no exceptions and it works.

Reproduce codes on https://github.com/MadeInChina/spring-cloud-streams-kafka-streams-example

hanrw commented 6 years ago

The issue with here. If we have two streams A stream store name a-store B stream store name b-store

We are going to query b-store but we got A stream in the loop but A-stream.store(b-store, storeType) will never return null instead of thow InvalidStateStoreException in QueryableStoreProvider

        if (allStores.isEmpty()) {
            throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
        }

so we’d better to catch the InvalidStateStoreException and continue the loop.

public <T> T getQueryableStoreType(String storeName, QueryableStoreType<T> storeType) {

        for (KafkaStreams kafkaStream : kafkaStreams) {
            T store = kafkaStream.store(storeName, storeType);
            if (store != null) {
                return store;
            }
        }
        return null;
    }

 public <T> T getStore(final String storeName, final QueryableStoreType<T> queryableStoreType) {
        final List<T> globalStore = globalStoreProvider.stores(storeName, queryableStoreType);
        if (!globalStore.isEmpty()) {
            return queryableStoreType.create(new WrappingStoreProvider(Collections.<StateStoreProvider>singletonList(globalStoreProvider)), storeName);
        }
        final List<T> allStores = new ArrayList<>();
        for (StateStoreProvider storeProvider : storeProviders) {
            allStores.addAll(storeProvider.stores(storeName, queryableStoreType));
        }
        if (allStores.isEmpty()) {
            throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
        }
        return queryableStoreType.create(
                new WrappingStoreProvider(storeProviders),
                storeName);
    }
hanrw commented 6 years ago

move to https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/366