square / okio

A modern I/O library for Android, Java, and Kotlin Multiplatform.
https://square.github.io/okio/
Apache License 2.0
8.8k stars 1.18k forks source link

Writing a Source to multiple Okhttp request. #255

Open RoryKelly opened 8 years ago

RoryKelly commented 8 years ago

Im aware this is a strange requirement but hear me out. Lets imagine I'm reading binary data from a hardware sensor, like a microphone, so I wrap that in a Source. Then I'm sending this data across the wire via Okhttp.

Source source = AudioSource.getThreeSeconds();
server.sendDataToMicrosoftVoiceRecogniser(source, callback);

Because we want to stream the data when the web request starts. we use the Recipe for post streaming from Okhttp.

void sendDataToMicrosoftVoiceRecogniser(Source sources, Callback callback){
   RequestBody requestBody = new RequestBody() {
               @Override public MediaType contentType() {
                    return MediaType.parse("application/octet-stream");
                }

                @Override public void writeTo(final BufferedSink sink) throws IOException {
                    sink.write(source);
                }
    };

    Request request = new Request.Builder()
        .url("https://api.github.com/markdown/raw")
        .post(requestBody)
        .build();

    Response response = client.newCall(request). enqueue(callback);
...
}

How can I stream the same data to multiple API's in parallel. I.E

Source source = AudioSource.getSource();

server.sendDataToMicrosoftVoiceRecogniser(source, callback);
server.sendDataToMyServer(source, callback);

The first request consumes the bytes then the second receives random data. I have managed to get this to work by:

public static Source tee(final Source source, final Buffer copy) {
        return new Source() {
            @Override public long read(final Buffer sink, final long byteCount) throws IOException {
                final long bytesRead = source.read(sink, byteCount);
                if (bytesRead > 0) {
                    sink.copyTo(copy, sink.size() - bytesRead, bytesRead);
                }
                return bytesRead;
            }

            @Override public Timeout timeout() {
                return source.timeout();
            }

            @Override public void close() throws IOException {
                source.close();
            }
        };
    }

final Source source = AudioSource.getSource();

final Buffer buffer = new Buffer();
final Source speechToText = tee(source, trigger);

server.sendDataToMicrosoftVoiceRecogniser(speechToText, callback);
server.sendDataToMyServer(source, buffer);

This has its problems, as if the web requests consume data at different speeds the buffer can be emptied, I get round this by knowing how much data "server.sendDataToMyServer" expects.

Any ideas on how I could do this? I suppose what I really need is a Mirrored source that buffers the bytes emitted from another Source until they are consumed again.

swankjesse commented 8 years ago

Would this work? It isn’t public API yet! It buffers using a File on the file system to avoid exhausting memory. https://github.com/square/okhttp/blob/master/okhttp/src/main/java/okhttp3/internal/cache2/Relay.java

RoryKelly commented 8 years ago

I'm having a look at this today I'll let you know if it works out for me. Thanks for your help.

RoryKelly commented 8 years ago

I went with something slightly different, but based on he relay idea without the disc write. I'm only recording max 4 seconds of audio. It has a very small memory footprint (Ish). So I just figured that I can used the below class. It only works for one mirror but thats all I need ATM.

public class MirroredSource {

    private final Buffer buffer = new Buffer();
    private final Source source;
    private final AtomicBoolean sourceExhausted = new AtomicBoolean();

    public MirroredSource(final Source source) {
        this.source = source;
    }

    public Source original() {
        return new okio.Source() {

            @Override public long read(final Buffer sink, final long byteCount) throws IOException {
                final long bytesRead = source.read(sink, byteCount);
                if (bytesRead > 0) {
                    sink.copyTo(buffer, sink.size() - bytesRead, bytesRead);
                } else {
                    sourceExhausted.set(true);
                }
                return bytesRead;
            }

            @Override public Timeout timeout() {
                return source.timeout();
            }

            @Override public void close() throws IOException {
                source.close();
                sourceExhaused.set(true);
            }
        };
    }

    public Source mirror() {
        return new okio.Source() {

            @Override public long read(final Buffer sink, final long byteCount) throws IOException {
                while (!sourceExhausted.get()){
                    if (buffer.request(byteCount)) {
                        return buffer.read(sink, byteCount);
                    }
                }
                return buffer.read(sink, byteCount);
            }

            @Override public Timeout timeout() {
                return source.timeout();
            }

            @Override public void close() throws IOException { /* */ }
        };
    }
}

I use it Like this.

MirroredSource mirroredSource = new MirroredSource(AudioSource.getSource());

server.sendDataToMicrosoftVoiceRecogniser(mirroredSource.original(), callback);
server.sendDataToMyServer(mirroredSource.mirror(), callback);

Seems to work so far.

swankjesse commented 8 years ago

Cool. I'd be careful to defend against the mirror outrunning the original. Looks like the code now might busy loop in that case. Also make sure read() doesn't return more bytes than it should!

swankjesse commented 8 years ago

And one last thing: Buffer is not thread safe, so you need synchronization somewhere.

RoryKelly commented 8 years ago

I figured there was some threading issues somewhere as I was having some issues reading data, I'll post a gist when I have stable performance.

I'm not sure how read can return more bytes than it should But I'll look into it.

Yeah it does busy loop, I'll avoid that when I get it stable. But deadlines looming and its just a tech demo so not production code. Thanks for the pointers.

swankjesse commented 8 years ago

I think you're good. I misread your while loop!

RoryKelly commented 8 years ago

https://gist.github.com/RoryKelly/e34d07b1eb34b289f0f6d00f628a7c47

I added some synchronisation its not pretty but it works for my purpose, Let me know if anyone has any ideas or improvements.

@swankjesse Thanks for your help!

swankjesse commented 8 years ago

Almost perfect. Just this line needs to be inside the synchronized block:

 return buffer.read(sink, byteCount);
swankjesse commented 8 years ago

(putting synchronized outside of while is probably okay)

RoryKelly commented 8 years ago

Github should have a "buy me a beer option". Thanks!