ReactiveX / RxPY

ReactiveX for Python
https://rxpy.rtfd.io
MIT License
4.81k stars 361 forks source link

Using resources with Observables #69

Closed bumbleblym closed 8 years ago

bumbleblym commented 8 years ago

How do you use resources with Observables?

For example: given an Observable sequence of file paths, construct a sequence of the contents of the files.

dbrattli commented 8 years ago

You can use the map method to transform input from one type to another.

from rx import Observable

def mapper(filename):
  with open(filename) as f:
    return f.read()

Observable.from_(["filename.txt"]).map(mapper).subscribe(print)
bumbleblym commented 8 years ago

i was thinking of something along the lines of

import csv
from rx import Observable

def mapper(filename):
    with open(filename) as f:
        reader = csv.reader(f)
        return Observable.from_(reader)

Observable.from_(["filename.csv"]).map(mapper).concat_all().subscribe(print)

for when I don't want to read the entire file at once?

ValueError: I/O operation on closed file
dbrattli commented 8 years ago

Ok, now I understand what you are trying to do. This does not look that simple. One way would be to wrap the csv reader in an iterable so we can detect StopIteration and do cleanup ourselves:

import csv
from rx import Observable

class CsvIterator:
    def __init__(self, filename):
        self.f = open(filename)
        self.reader = csv.reader(self.f)
        self.filename = filename

    def __next__(self):
        try:
            return next(self.reader)
        except StopIteration:
            print("closing file: %s" % self.filename)
            self.f.close()
            raise

    def __iter__(self):
        return self

def mapper(filename, i):
    return iter(CsvIterator(filename))

Observable.from_(["filename.csv", "filename2.csv"]).flat_map(mapper).subscribe(lambda x: print(x))

Might work, but still smells a little, since it might be used in programs where the files are not read to the end, and then the files will not be closed properly. Not sure how that can be fixed.

bumbleblym commented 8 years ago

could rx.Observable.using be used for this?

dbrattli commented 8 years ago

Yes, good point. Had forgot about that one. Maybe something this would work? That should be as lazy as you need it.

import csv
from rx import Observable
from rx.disposables import Disposable

def mapper(filename, i):
    f = open(filename)
    return Observable.using(lambda: Disposable(lambda: f.close()), lambda x: Observable.from_(csv.reader(f)))

Observable.from_(["filename.csv", "filename2.csv"]).flat_map(mapper).subscribe(lambda x: print(x))
bumbleblym commented 8 years ago

Thanks for the examples so far! I made some changes, and ran into the same error again:

def to_file(filename):
    f = open(filename)
    return Observable.using(
        lambda: Disposable(lambda: f.close()),
        lambda d: Observable.just(f)
    )

def to_reader(f):
    return csv.reader(f)

def print_rows(reader):
    for row in reader:
        print(row)

# This works
Observable.from_(["filename.csv", "filename2.csv"]).flat_map(to_file).map(to_reader).subscribe(print_rows)

def to_rows(f):
    return Observable.from_(csv.reader(f))

# ValueError: I/O operation on closed file :(
Observable.from_(["filename.csv", "filename2.csv"]).flat_map(to_file).flat_map(to_rows).subscribe(print)
dbrattli commented 8 years ago

When the observables from the first flatmap is merged by the second flatmap, the inner subscriptions will be disposed when they complete. Thus the files will be closed, even if the file handles are on_next'ed into the new observable set up by the second flatmap.

bumbleblym commented 8 years ago

Thanks for the explanation!

jdonnerstag commented 7 years ago

By using the AnonymousDisposable I got it working with the lasted rxpy version. I understand why the 2nd flat_map is not working. But I'm struggling to extend the example by creating a stream that (lazily) allows to process (map, filter, etc.) the lines from the csv reader.

Something like: Observable.from_(["filename.csv", "filename2.csv"] ).flat_map(to_file ).map(to_reader ). ??? don't know what to do here ).filter(empty_lines ).filter(comment_lines )....

thanks a lot Juergen

lock[bot] commented 5 years ago

This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.