microsoft / Trill

Trill is a single-node query processor for temporal or streaming data.
MIT License
1.25k stars 132 forks source link

Second query on Streamable causes exception #46

Closed nick-waterhouse closed 5 years ago

nick-waterhouse commented 5 years ago

In the following code I am saving a Streamable to disk, then restoring it such that I can execute queries on it. The first one is fine, but I get an exception with the second.

All I am doing here is issuing a Count query twice. Doing some digging, I found that the position on the Stream is at the end after the first query. If you uncomment the Position manipulation, the second query succeeds.

Is the behaviour I am seeing intended? I feel like this is going to make Streambles backed by on-disk Streams very difficult to use.

 internal struct Payload
 {
    public long field1;
    public long field2;
 }

public static void Test()
{
    Random random = new Random(Guid.NewGuid().GetHashCode());
    string filePath = @"C:\temp\streamtest.bin";
    var dataset = Observable.Range(0, TotalInputEvents)
        .Select(e => StreamEvent.CreatePoint(0, new Payload { field1 = e, field2 = random.Next(100, 250) }))
        .ToStreamable();

    using (var stream = File.Create(filePath))
    {
        dataset.ToBinaryStream(stream, writePropertiesToStream: true);
    }

    using (var stream = File.OpenRead(filePath))
    {
        var streamable = stream.ToStreamable<Payload>(readPropertiesFromStream: true);
        // long position = stream.Position;

        streamable.ToStreamEventObservable().Where(e => e.IsPoint).Count().ForEachAsync(e => Console.WriteLine(e)).Wait();

        // stream.Seek(position, SeekOrigin.Begin);

        streamable.ToStreamEventObservable().Where(e => e.IsPoint).Count().ForEachAsync(e => Console.WriteLine(e)).Wait();
    }
}

Unhandled Exception: System.AggregateException: One or more errors occurred. (Invalid integer long in the input stream.) ---> System.Runtime.Serialization.SerializationException: Invalid integer long in the input stream.
   at Microsoft.StreamProcessing.Serializer.BinaryDecoder.DecodeLong() in D:\a\1\s\Sources\Core\Microsoft.StreamProcessing\Serializer\Encoders\BinaryDecoder.cs:line 88
   at lambda_method(Closure , BinaryDecoder )
   at Microsoft.StreamProcessing.BinaryIngressReader`2.Ingress(IStreamObserver`2 observer) in D:\a\1\s\Sources\Core\Microsoft.StreamProcessing\Ingress\Binary\BinaryIngressReader.cs:line 68
   --- End of inner exception stack trace ---
   at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, CancellationToken cancellationToken)
   at System.Threading.Tasks.Task.Wait()
   at PerformanceTest.Program.Test() in C:\Git\TrillSamples\TrillSamples\PerformanceTest\Program.cs:line 87
   at PerformanceTest.Program.Main(String[] args) in C:\Git\TrillSamples\TrillSamples\PerformanceTest\Program.cs:line 148
peterfreiling-zz commented 5 years ago

Thanks @nick-waterhouse . I will make a change to support multiple subsequent subscriptions from a single BinaryIngressStreamable for streams that support Seek, see #47 . Unfortunately I don't see a way to support multiple concurrent subscriptions with the current Stream API.