Open flashmob opened 5 years ago
Update: Development ongoing. Got a little side-tracked and developed a streaming parsing library for mime. (it's parsing the headers as the input is read in. In the old version, it had to wait until everything was read in. Just thought it would be nicer if it could also produce a mime tree too, so that took a bit longer, but hopefully the effort was worth it! Re-written it 3 times, lol!). Stream buffer is now configurable, and works with buffers on any size, eg 64 bytes is even OK. Can't wait to try it out in prod, currently only tested on about 1000 sample emails. It would be good to try it out on a few million. Will post the latest source soon...
Now working on a "chunk saver" - a streaming backend that breaks up the emails in to chunks as they get saved. Chunks are broken according to the following rule: at the end of headers, at the end of mime part boundaries, or if the chunk's size reaches a limit. Chunks can be checked if they are being duplicated & basic data-deduplication can be baked in.
Just found out that text compresses significantly much better if it's not base64 encoded! (Makes sense... base64 inflates the size by at most ~33%)
TODO New configuration layout:
named backend configurations (not just one)
Each named backend configurations can also defines its own named processors
Processors define their configuration options as fields of the named processor It would be useful for:
A post-processor: This will get called after the completing the message transfer (stream based)
1 - 3 done. Now working on 4.
Do you need any help?
Hi @pkalemba thanks for your interest! What kind of help do you offer? Right now, it seems like the bulk of this PR has been completed. Would need some help with testing, reviewing the code and getting some feedback. Any help always welcome. Thanks
4 done
Still have a lot of small problems, but once that's fixed then real testing can start!
mysql driver taking shape.
@flashmob Are you still planning to finish this PR? I see you have invested quite some time but then the motivation died down? 😄 We need you back.
Problem
The current 'backend' processes the emails by buffering the entire message in memory, then applying each Processor using the decorator pattern. This approach works OK - most emails are never over 1MB anyway, and the buffers are recycled after processing, keeping them allocated for the next message, (being nice the garbage collector). However, some things can be done more efficiently - for example the message could be compressed while it's being saved on-the-fly. This is the main idea of 'streaming'.
Solution
The go Go way of streaming is to use the
io.Writer
andio.Reader
interfaces. What if we could use the current decorator pattern that we use for the backends, extend that by making the processors implement theio.Writer
interface? 🤔Yes, we can do just that. A little bit of background first: Did you know that the
io.Writer
way is usually a decorator pattern? When we make an instance of a writer, we usually pass some underlying writer to it, allowing us to wire multiple writers together. Some people call this pattern 'chaining'Normally, when using io.Writer, if you would like to create a chain, you need to manually wire them with a few lines of code. This solution takes it further, by allowing you to wire the Writers by configuration.
Technical Details
Each Writer is an instance of a
StreamDecorator
, it's a struct that implementsio.Writer
. Additionally, the struct contains two callback functionsOpen
andClose
, both could be set when theStreamDecorator
is being initialized, and called back at the start and end of the stream. TheOpen
callback is also used to pass down the*mail.Envelope
which can be used to keep the state for the email currently being processed.in gateway.go there's a new
newStreamStack
method that instantiates the StreamDecorator structs and wires them up.A new method was added to the
Backend
interfaceA new configuration option was also added to the config file:
stream_save_process
. The value is a string with the names of each StreamDecorator to chain, delimited by a pipe |.This is how the io.Reader is passed from the DATA command down to the backend. The ProcessStream method calls all the
Open
methods on our writers, and then begins the streaming of data usingio.Copy
. At the end of the stream, it calls Close() on our decorators in the other they were wired.Examples
Perhaps the best way to understand this is to look at some example code.
There are 3 examples of
StreamDecorator
implementations in thebackends
dir:You will notice that each of the files contain a function that looks just like the
io.Writer
interface, without theWrite
keyword. I.eStreamProcessWith(func(p []byte) (int, error)
This is an anonymous function which is converted to anio.Writer
when it is returned. Here is the code of s_debug.goThe most important detail here is that the
sp
identifier refers to the nextio.Writer
in the chain. In other words,sp
contains a reference to the underlying writer.(The
sd.Open
statement does nothing, it's just there here as an example / to be used as a template.)In the api_test.go file, there is a test called `TestStreamProcessor'. The writers are chained with the following config setting:
"stream_save_process": "Header|compress|Decompress|debug"
Which means it will call the
Write
method on the Header first, and then down to each underlying writer in the stream.Todo: