WW-Digital / reactive-kinesis

A Scala & Akka based implementation for working with Amazon Kinesis Streams
Apache License 2.0
34 stars 13 forks source link

Fix #25 by providing an Akka Stream Source graph stage for Kinesis. #39

Closed aquamatthias closed 6 years ago

aquamatthias commented 6 years ago

Initial PR #36 has been closed in favour of automated integration testing setup.

This graph stage provides an akka source graph based on the actor model provided by the reactive kinesis consumer.

If a consumer is configured, an akka stream Source can be obtained via a simple Kinesis.source("consumer-name"). Every message that flows through the stream needs to be committed explicitly.

An integration test is added that relies on localstack running on the same node (I did not add the localstack startup logic as part of this PR).

Steps to run the test:

Set this env var, since the localstack does not support CBOR: export AWS_CBOR_DISABLE=true Start localstack (I use the docker-compose with: docker-compose -f localstack.yml up) in default configuration (Kinesis on port 4568, Dynamo on port 4569) sbt it:test It was not as easy to come up with a good integration test, since a Kinesis Source usually never finishes. To have a finite test, I use Flow.take which ends the stream after a defined amount of entries. The interplay of different readers/shard consumers could be different between runs, but the test expectation should be met.

@markglh tests and integration tests have passed several successive test runs on my machine.

Let me know what you think.

markglh commented 6 years ago

Thanks @aquamatthias !

Was the Producer Spec failing locally or in CI? That one relies on the initialized localstack image for the setup.

aquamatthias commented 6 years ago

@markglh locally. I just started withdocker-compose up And started it:test. What is missing?

markglh commented 6 years ago

Yeh that should be it really, if you bash into the container (docker exec -it container_id /bin/bash) then run awslocal kinesis list-streams what do you see?

These should exist: https://github.com/WW-Digital/reactive-kinesis/blob/master/localstack/templates/cftemplate.yml

If not, I've seen this before where the localstack container has been cached so it doesn't initialise, in that case try deleting the cached container (no need to delete the image).

markglh commented 6 years ago

I'll also update that test to use your method of creating what it needs as part of the test.

aquamatthias commented 6 years ago

Hey @markglh. This is what I see in my local docker setup, after starting docker-comppse up:

➔ docker exec -it 8f18d6e353fb /bin/bash
bash-4.3# awslocal kinesis list-streams
{
    "StreamNames": []
}

This seems to be the problem with the failing test. It failed before I did any change. Something you can look into?

1) Regarding the travis build: 201.1 fails, but sbt has finished successfully. https://travis-ci.org/WW-Digital/reactive-kinesis/jobs/314856826 . What is missing?

2) I still see timeouts in the CI build: Timed out processing batch. I will increase the settings.

markglh commented 6 years ago

That's weird with the travis build failing!

Locally, did you try clearing the containers?

docker stop $(docker ps -a -q)
docker rm $(docker ps -a -q)

To elaborate: this version of localstack uses the healthcheck to init the container. However it seems that if the container already exists (default localstack) that it's considered healthy and doesn't run the custom healthcheck. It's not an issue in CI because it's always clean. Swapping over to create the streams if they don't exist will fix this in all cases - which I'll do, but clearing the container should work.

aquamatthias commented 6 years ago

Done as suggested. I get some strange logs:

localstack_1     | 2017-12-11T18:08:54:ERROR:localstack.services.generic_proxy: Error forwarding request: HTTPConnectionPool(host='127.0.0.1', port=4559): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f6e2352f490>: Failed to establish a new connection: [Errno 111] Connection refused',)) Traceback (most recent call last):
localstack_1     |   File "/opt/code/localstack/localstack/services/generic_proxy.py", line 201, in forward
localstack_1     |     headers=forward_headers)
localstack_1     |   File "/opt/code/localstack/.venv/lib/python2.7/site-packages/requests/api.py", line 112, in post
localstack_1     |     return request('post', url, data=data, json=json, **kwargs)
localstack_1     |   File "/opt/code/localstack/.venv/lib/python2.7/site-packages/requests/api.py", line 58, in request
localstack_1     |     return session.request(method=method, url=url, **kwargs)
localstack_1     |   File "/opt/code/localstack/.venv/lib/python2.7/site-packages/requests/sessions.py", line 508, in request
localstack_1     |     resp = self.send(prep, **send_kwargs)
localstack_1     |   File "/opt/code/localstack/.venv/lib/python2.7/site-packages/requests/sessions.py", line 618, in send
localstack_1     |     r = adapter.send(request, **kwargs)
localstack_1     |   File "/opt/code/localstack/.venv/lib/python2.7/site-packages/requests/adapters.py", line 508, in send
localstack_1     |     raise ConnectionError(e, request=request)
localstack_1     | ConnectionError: HTTPConnectionPool(host='127.0.0.1', port=4559): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f6e2352f490>: Failed to establish a new connection: [Errno 111] Connection refused',))
localstack_1     | 

but I see the streams locally:

bash-4.3# awslocal kinesis list-streams
{
    "StreamNames": [
        "KinesisSourceSpec", 
        "int-test-stream-1", 
        "int-test-stream-2", 
        "int-test-stream-3", 
        "int-test-stream-4"
    ]
}

Reenabled the test, which runs green now.

Next step: try to see if we can make Travis happy.

aquamatthias commented 6 years ago

@markglh The sbt side of things is green as far as I can see:

[success] Total time: 90 s, completed Dec 11, 2017 6:19:10 PM
[success] Total time: 101 s, completed Dec 11, 2017 6:19:30 PM

But the build is not successful:

$ docker-compose -f localstack/docker-compose.yml down
Done. Your build exited with 1.

Something you can look into?

markglh commented 6 years ago

Thanks @aquamatthias - Some progress at least :D

I will look into the Travis weirdness!

markglh commented 6 years ago

@aquamatthias Sorted the travis issues - it was just formatting, but the structure of the build made it confusing so I've tweaked it.

I've applied the changes and refactored the integration test setup and producer spec to all create the streams on the fly.

I'll have a proper run through the Stream Source implementation tomorrow hopefully so we can finally get this merged in!

The branch lives here: https://github.com/WW-Digital/reactive-kinesis/tree/feature/akka-streams-review - feel free to just rebase that into yours. Any concerns let me know.

I'm not sure whether my tweaks affected anything - but I have seen that same test fail again in Travis. I'll also look into it. Haven't managed to reproduce locally yet.

coveralls commented 6 years ago

Coverage Status

Coverage increased (+0.1%) to 86.923% when pulling 6ed6fa121ccdc2d89d78027d66aa0db4959eb52f on aquamatthias:feature/aquamatthias/akka-streams into 2b258399e07d592be14e911fac8cae4fdea933b6 on WW-Digital:master.

coveralls commented 6 years ago

Coverage Status

Coverage increased (+0.1%) to 86.923% when pulling 6ed6fa121ccdc2d89d78027d66aa0db4959eb52f on aquamatthias:feature/aquamatthias/akka-streams into 2b258399e07d592be14e911fac8cae4fdea933b6 on WW-Digital:master.

aquamatthias commented 6 years ago

@markglh I cherry picked your commits on top of this branch - thanks for the improvements. Travis seems to be happy. Waiting for your review.

coveralls commented 6 years ago

Coverage Status

Coverage increased (+1.01%) to 88.101% when pulling 460651a87c6f6fe0ebb574ffea750374cf4f894d on aquamatthias:feature/aquamatthias/akka-streams into b4e85ebed6ecac0371a044b12163e16d71147eb8 on WW-Digital:master.

coveralls commented 6 years ago

Coverage Status

Coverage increased (+2.3%) to 89.421% when pulling e3f58e3b21aa51aeabe95717fe998cf6656684a6 on aquamatthias:feature/aquamatthias/akka-streams into b4e85ebed6ecac0371a044b12163e16d71147eb8 on WW-Digital:master.

coveralls commented 6 years ago

Coverage Status

Coverage increased (+2.3%) to 89.421% when pulling e3f58e3b21aa51aeabe95717fe998cf6656684a6 on aquamatthias:feature/aquamatthias/akka-streams into b4e85ebed6ecac0371a044b12163e16d71147eb8 on WW-Digital:master.

aquamatthias commented 6 years ago

@markglh Thanks for good review comments. I added a section in the Readme: https://github.com/aquamatthias/reactive-kinesis/tree/feature/aquamatthias/akka-streams#akka-stream-source. Since I added a unit test for the source graph stage, coveralls seems to be happy too. Let me know what you think.

coveralls commented 6 years ago

Coverage Status

Coverage increased (+1.3%) to 88.413% when pulling 357efd39bd98018aa6564db5a1854bd5aa0e990c on aquamatthias:feature/aquamatthias/akka-streams into b4e85ebed6ecac0371a044b12163e16d71147eb8 on WW-Digital:master.

markglh commented 6 years ago

That's great @aquamatthias - the changes make sense too, thanks again for this!!