msemys / esjc

EventStore Java Client
MIT License
108 stars 27 forks source link

Read _all_ events in stream? #23

Closed qwelyt closed 7 years ago

qwelyt commented 7 years ago

Using this, how do I actually read all the events from a stream and re-hydrate them into POJO-events? From what I can tell I can only read 4096 events for each stream, which is not that much for some streams.

We store our events as JSON, and the only way I see how to get the data back is to do some mapping like

List<Event> events = eventStore.readStreamEventsForward(flatStream, 0, 4096, false)
                      .join()
                      .events
                      .stream()
                      .map(e1 -> new String(e1.event.data))
                      .<Optional<Event>>map(s -> {
                         try {
                            // [...] Create objectmapper and get classname and payload
                            final JavaType type = objectMapper.constructType(Class.forName(className));
                            return Optional.of(objectMapper.readValue(payload, type));
                         } catch (Exception e) {
                            e.printStackTrace();
                         }
                         return Optional.empty();
                      })
                      .filter(Optional::isPresent)
                      .map(Optional::get)
                      .collect(Collectors.toList())

Is there an easier way?

msemys commented 7 years ago

StreamEventsSlice currentSlice; int nextSliceStart = StreamPosition.START;

do { currentSlice = eventStore.readStreamEventsForward(flatStream, nextSliceStart, 4096, false).join(); nextSliceStart = currentSlice.nextEventNumber; streamEvents.addAll(currentSlice.events); } while (!currentSlice.isEndOfStream);

qwelyt commented 7 years ago

If that gives you all the events that has ever been put in the stream I must be misunderstanding what 4096 means. Just going by the methods parameter name maxCount. To me this reads as "read this many events". What is the actual meaning?

msemys commented 7 years ago

this means, read maxCount events from the specified position

msemys commented 7 years ago

e.g. if you have 6000 events in your stream, to read all the events in it, you could do it in such ways:

int BATCH_SIZE = 1000;
eventStore.readStreamEventsForward("stream", 0, BATCH_SIZE, false).join().events.forEach(e -> System.out.println(e));
eventStore.readStreamEventsForward("stream", 1000, BATCH_SIZE, false).join().events.forEach(e -> System.out.println(e));
eventStore.readStreamEventsForward("stream", 2000, BATCH_SIZE, false).join().events.forEach(e -> System.out.println(e));
eventStore.readStreamEventsForward("stream", 3000, BATCH_SIZE, false).join().events.forEach(e -> System.out.println(e));
eventStore.readStreamEventsForward("stream", 4000, BATCH_SIZE, false).join().events.forEach(e -> System.out.println(e));
eventStore.readStreamEventsForward("stream", 5000, BATCH_SIZE, false).join().events.forEach(e -> System.out.println(e));

or iterate

eventStore.iterateStreamEventsForward("stream", 0, BATCH_SIZE, false).forEachRemaining(e -> System.out.println(e));

or use java8 stream api

eventStore.streamEventsForward("stream", 0, BATCH_SIZE, false).forEach(e -> System.out.println(e));