hjr3 / weldr

A HTTP 1.1 proxy written in Rust using tokio.
Apache License 2.0
217 stars 20 forks source link

WIP - Streaming body #37

Closed hjr3 closed 7 years ago

hjr3 commented 7 years ago

@yanns here is where i am at with the streaming body. i am working through trying to figure out how tokio proto wants us to handle this. Currently stuck:

DEBUG:tokio_core::reactor: adding a new I/O source
INFO:alacrity::proxy: Listening on: 127.0.0.1:8080
DEBUG:tokio_core::reactor: consuming notification queue
DEBUG:tokio_core::reactor: scheduling direction for: 0
DEBUG:tokio_core::reactor: loop poll - Duration { secs: 2, nanos: 100937434 }
DEBUG:tokio_core::reactor: loop time - Instant { t: 248987127534612 }
TRACE:tokio_core::reactor: event Ready {Readable} Token(2)
DEBUG:tokio_core::reactor: notifying a task handle
DEBUG:tokio_core::reactor: loop process - 1 events, Duration { secs: 0, nanos: 60393 }
DEBUG:tokio_core::reactor: loop poll - Duration { secs: 0, nanos: 187559 }
DEBUG:tokio_core::reactor: loop time - Instant { t: 248987127823219 }
TRACE:tokio_core::reactor: event Ready {Readable} Token(1)
DEBUG:tokio_core::reactor: consuming notification queue
DEBUG:tokio_core::reactor: adding a new I/O source
DEBUG:alacrity::proxy: Incoming connection on 127.0.0.1:57282
TRACE:alacrity::framed: Creating new ProxyFramed transport
DEBUG:tokio_core::reactor: consuming notification queue
DEBUG:tokio_core::reactor: scheduling direction for: 0
DEBUG:tokio_core::reactor: loop process - 1 events, Duration { secs: 0, nanos: 345957 }
DEBUG:tokio_core::reactor: loop poll - Duration { secs: 0, nanos: 198722 }
DEBUG:tokio_core::reactor: loop time - Instant { t: 248987128404431 }
TRACE:tokio_core::reactor: event Ready {Readable | Writable} Token(4)
TRACE:tokio_core::reactor: event Ready {Readable} Token(5)
TRACE:tokio_proto::pipeline::pipeline: Pipeline::tick
TRACE:alacrity::framed: framed transport flushed
TRACE:alacrity::framed: Reading from upstream
TRACE:alacrity::framed: Read 84 bytes
TRACE:alacrity::frontend: Attempting to parse bytes into HTTP Request
DEBUG:alacrity::frontend: Parser created: Request { head: RequestHead { method: "GET", uri: "/large", version: Http11, headers: [Header { name: "Host", value: "www.example.com" }, Header { name: "User-Agent", value: "curl/7.49.1" }, Header { name: "Accept", value: "*/*" }] }, body: [] }
TRACE:tokio_proto::pipeline::pipeline: process_out_frame
TRACE:tokio_proto::pipeline::pipeline: read out message
DEBUG:alacrity::pool::inner: Pool is cloaning (hehe) out 127.0.0.1:12345
DEBUG:alacrity::proxy: Starting backend request to V4(127.0.0.1:12345)
DEBUG:tokio_core::reactor: adding a new I/O source
TRACE:tokio_proto::sender: Sender::send
TRACE:tokio_proto::sender: Sender::poll_ready
TRACE:tokio_proto::sender:    --> polling future sender
TRACE:tokio_proto::sender:    --> ready
TRACE:alacrity::framed: Reading from upstream
DEBUG:tokio_core::reactor: consuming notification queue
DEBUG:tokio_core::reactor: scheduling direction for: 1
TRACE:tokio_proto::pipeline::pipeline: write_in_frames
TRACE:tokio_proto::pipeline::pipeline: write_in_body
DEBUG:tokio_proto::pipeline::pipeline: write in body done
TRACE:alacrity::framed: framed transport flushed
TRACE:tokio_core::reactor: event Ready {Readable} Token(3)
DEBUG:tokio_core::reactor: loop process - 3 events, Duration { secs: 0, nanos: 918081 }
DEBUG:tokio_core::reactor: loop poll - Duration { secs: 0, nanos: 208352 }
DEBUG:tokio_core::reactor: loop time - Instant { t: 248987129573810 }
TRACE:tokio_core::reactor: event Ready {Writable} Token(6)
TRACE:tokio_core::reactor: event Ready {Readable} Token(7)
TRACE:alacrity::framed: Creating new ProxyFramed transport
TRACE:tokio_proto::pipeline::pipeline: Pipeline::tick
TRACE:alacrity::framed: framed transport flushed
TRACE:alacrity::framed: Reading from upstream
DEBUG:tokio_core::reactor: consuming notification queue
DEBUG:tokio_core::reactor: scheduling direction for: 2
TRACE:tokio_proto::pipeline::pipeline: write_in_frames
TRACE:tokio_proto::pipeline::pipeline: write_in_body
DEBUG:tokio_proto::pipeline::pipeline: write in body done
TRACE:tokio_proto::pipeline::client: Dispatch::poll
TRACE:tokio_proto::pipeline::client:    --> received request
TRACE:tokio_proto::pipeline::pipeline:    --> got message
TRACE:tokio_proto::pipeline::pipeline: write_in_message
TRACE:tokio_proto::pipeline::pipeline: got in_flight value without body
TRACE:alacrity::backend: Serializing message frame: Message { message: Request { head: RequestHead { method: "GET", uri: "/large", version: Http11, headers: [Header { name: "Host", value: "www.example.com" }, Header { name: "User-Agent", value: "curl/7.49.1" }, Header { name: "Accept", value: "*/*" }] }, body: [] }, body: false }
TRACE:alacrity::backend: Computed message to send to backend:
GET /large HTTP/1.1
Host: www.example.com
User-Agent: curl/7.49.1
Accept: */*

TRACE:alacrity::backend: Trying to write 84 bytes from request head
TRACE:alacrity::backend: Copied 84 bytes from request head
TRACE:alacrity::framed: writing; remaining=84
TRACE:alacrity::framed: Wrote 84 bytes
TRACE:alacrity::framed: framed transport flushed
DEBUG:tokio_core::reactor: loop process - 2 events, Duration { secs: 0, nanos: 437913 }
DEBUG:tokio_core::reactor: loop poll - Duration { secs: 0, nanos: 291121 }
DEBUG:tokio_core::reactor: loop time - Instant { t: 248987130339360 }
TRACE:tokio_core::reactor: event Ready {Writable} Token(6)
DEBUG:tokio_core::reactor: loop process - 1 events, Duration { secs: 0, nanos: 19854 }
DEBUG:tokio_core::reactor: loop poll - Duration { secs: 0, nanos: 1176229 }
DEBUG:tokio_core::reactor: loop time - Instant { t: 248987131584665 }
TRACE:tokio_core::reactor: event Ready {Readable} Token(6)
DEBUG:tokio_core::reactor: notifying a task handle
DEBUG:tokio_core::reactor: loop process - 1 events, Duration { secs: 0, nanos: 53777 }
DEBUG:tokio_core::reactor: loop poll - Duration { secs: 0, nanos: 198511 }
DEBUG:tokio_core::reactor: loop time - Instant { t: 248987131873681 }
TRACE:tokio_core::reactor: event Ready {Readable} Token(6)
TRACE:tokio_core::reactor: event Ready {Readable} Token(7)
TRACE:tokio_proto::pipeline::pipeline: Pipeline::tick
TRACE:alacrity::framed: framed transport flushed
TRACE:alacrity::framed: Reading from upstream
TRACE:alacrity::framed: Read 8192 bytes
TRACE:alacrity::backend: Attempting to parse [(omit)] into HTTP Response
DEBUG:alacrity::http::parser: Parsed body with 1486296 bytes remaining
DEBUG:alacrity::backend: Parser created: Response { head: ResponseHead { version: Http11, status: 200, reason: "OK", headers: [Header { name: "Server", value: "rustful" }, Header { name: "Content-Type", value: "text/plain" }, Header { name: "Connection", value: "close" }, Header { name: "Content-Length", value: "1494345" }, Header { name: "Date", value: "Wed, 30 Nov 2016 17:33:19 GMT" }] }, body: [Chunk([(omit))] }
TRACE:tokio_proto::pipeline::pipeline: process_out_frame
TRACE:tokio_proto::pipeline::pipeline: read out message with body
DEBUG:tokio_proto::pipeline::pipeline: ready for a send
TRACE:alacrity::framed: Reading from upstream
TRACE:alacrity::framed: Read 0 bytes
DEBUG:alacrity::backend: Buffer length is empty. Done?
TRACE:tokio_proto::pipeline::pipeline: process_out_frame
TRACE:tokio_proto::pipeline::pipeline: read Frame::Done
TRACE:tokio_proto::pipeline::pipeline: write_in_frames
TRACE:alacrity::framed: framed transport flushed
DEBUG:tokio_core::reactor: loop process - 2 events, Duration { secs: 0, nanos: 22679512 }
DEBUG:tokio_core::reactor: loop poll - Duration { secs: 0, nanos: 212742 }
DEBUG:tokio_core::reactor: loop time - Instant { t: 248987154810757 }
TRACE:tokio_core::reactor: event Ready {Readable} Token(0)
DEBUG:tokio_core::reactor: consuming notification queue
DEBUG:tokio_core::reactor: dropping I/O source: 2
TRACE:tokio_core::reactor: event Ready {Readable} Token(5)
TRACE:tokio_proto::pipeline::pipeline: Pipeline::tick
TRACE:alacrity::framed: framed transport flushed
TRACE:alacrity::framed: Reading from upstream
DEBUG:tokio_core::reactor: consuming notification queue
DEBUG:tokio_core::reactor: scheduling direction for: 1
TRACE:tokio_proto::pipeline::pipeline: write_in_frames
TRACE:tokio_proto::pipeline::pipeline: write_in_body
DEBUG:tokio_proto::pipeline::pipeline: write in body done
TRACE:tokio_proto::pipeline::pipeline:    --> got message
TRACE:tokio_proto::pipeline::pipeline: write_in_message
TRACE:tokio_proto::pipeline::pipeline: got in_flight value with body
TRACE:alacrity::frontend: Serializing message frame: Message { message: Response { head: ResponseHead { version: Http11, status: 200, reason: "OK", headers: [Header { name: "Server", value: "rustful" }, Header { name: "Content-Type", value: "text/plain" }, Header { name: "Connection", value: "close" }, Header { name: "Content-Length", value: "1494345" }, Header { name: "Date", value: "Wed, 30 Nov 2016 17:33:19 GMT" }] }, body: [Chunk([(omit)])] }, body: true }
TRACE:alacrity::frontend: Trying to write 143 bytes from response head
TRACE:alacrity::frontend: Copied 143 bytes from response head
TRACE:alacrity::frontend: Trying to write 8049 bytes from response chunk
TRACE:alacrity::frontend: Copied 8049 bytes from response chunk
TRACE:tokio_proto::pipeline::pipeline: write_in_body
DEBUG:alacrity::framed: Buffer is full, trying to flush
TRACE:alacrity::framed: writing; remaining=8192
TRACE:alacrity::framed: Wrote 8192 bytes
TRACE:alacrity::framed: framed transport flushed
TRACE:alacrity::frontend: Serializing message frame: Body { chunk: None }
DEBUG:tokio_proto::pipeline::pipeline: write in body done
TRACE:alacrity::framed: framed transport flushed
DEBUG:tokio_core::reactor: loop process - 2 events, Duration { secs: 0, nanos: 5347757 }
DEBUG:tokio_core::reactor: loop poll - Duration { secs: 0, nanos: 199615 }
DEBUG:tokio_core::reactor: loop time - Instant { t: 248987160397660 }
DEBUG:tokio_core::reactor: loop process - 0 events, Duration { secs: 0, nanos: 2844 }
DEBUG:tokio_core::reactor: loop poll - Duration { secs: 0, nanos: 187569 }
DEBUG:tokio_core::reactor: loop time - Instant { t: 248987160624362 }
TRACE:tokio_core::reactor: event Ready {Writable} Token(4)
DEBUG:tokio_core::reactor: loop process - 1 events, Duration { secs: 0, nanos: 21135 }
hjr3 commented 7 years ago

Making progress on this, just a bit more code to write.

yanns commented 7 years ago

great news!

hjr3 commented 7 years ago

i am now streaming a 1.4mb response from the backend. i have to clean up some stuff before this is ready for merge. all this work because i thought ByteBuf was acting like a ring buffer. it is not!

yanns commented 7 years ago

I quick check your changes:

wget http://localhost:9001/assets/javascripts/jquery-1.7.1.min.js
--2016-12-05 10:08:35--  http://localhost:9001/assets/javascripts/jquery-1.7.1.min.js
Resolving localhost... ::1, 127.0.0.1
Connecting to localhost|::1|:9001... failed: Connection refused.
Connecting to localhost|127.0.0.1|:9001... connected.
HTTP request sent, awaiting response... No data received.
Retrying.

--2016-12-05 10:08:36--  (try: 2)  http://localhost:9001/assets/javascripts/jquery-1.7.1.min.js
Connecting to localhost|127.0.0.1|:9001... connected.
HTTP request sent, awaiting response... No data received.
Retrying.

--2016-12-05 10:08:38--  (try: 3)  http://localhost:9001/assets/javascripts/jquery-1.7.1.min.js
Connecting to localhost|127.0.0.1|:9001... connected.
HTTP request sent, awaiting response... No data received.
Retrying.

--2016-12-05 10:08:41--  (try: 4)  http://localhost:9001/assets/javascripts/jquery-1.7.1.min.js
Connecting to localhost|127.0.0.1|:9001... connected.
HTTP request sent, awaiting response... No data received.
Retrying.

Are you aware of that? Should I dig further?

hjr3 commented 7 years ago

@yanns not that issue specifically, but i have seen some other problems. down the rabbit hole i continue to go.

hjr3 commented 7 years ago

Closing in favor of https://github.com/hjr3/alacrity/pull/38