Reactive-Extensions / RxJS

The Reactive Extensions for JavaScript
http://reactivex.io
Other
19.49k stars 2.1k forks source link

How is my implementation of Observable? #1551

Closed rabbitooops closed 6 years ago

rabbitooops commented 6 years ago
function Observable(subscribe) {
    this.subscribe = function (observer) {
        ['next', 'error', 'complete']
            .filter(key => !observer[key])
            .forEach(key => observer[key] = () => {})
        subscribe(observer)
    }
}
Observable.create = function (subscribe) {
    return new Observable(subscribe)
}

Observable.prototype.filter = function (project) {
    const $subscribe = this.subscribe
    return Observable.create(observer => {
        $subscribe({
            next: value => {
                if (project(value)) {
                    observer.next(value)
                }
            }
        })
    })
}

Observable.prototype.map = function (project) {
    const $subscribe = this.subscribe
    return Observable.create(observer => {
        $subscribe({
            next: value => observer.next(project(value))
        })
    })
}

Observable.prototype.delay = function (project) {
    const $subscribe = this.subscribe
    return Observable.create(observer => {
        $subscribe({
            next: value => {
                setTimeout(() => {
                    observer.next(value)
                }, project);
            }
        })
    })
}

// Observable.prototype.do = function (project) {
//     const $subscribe = this.subscribe
//     return Observable.create(observer => {
//         $subscribe({
//             next: value => {
//                 project(value)
//                 observer.next(value)
//             }
//         })
//     })
// }

function createOperator(name, createObserver) {
    Observable.prototype[name] = function (...params) {
        const $subscribe = this.subscribe
        return Observable.create(observer => {
            $subscribe(createObserver(observer, ...params))
        })
    }
}

createOperator('do', function (observer, cb) {
    return {
        next: value => {
            cb(value)
            observer.next(value)
        }
    }
})

Observable.create(observer => {
    new Array(20).fill(undefined).map((v, i) => i).forEach(observer.next)
})
.filter(x => x % 3 === 0)
.do(console.log)
.delay(2000)
.do(console.log)
.filter(x => x % 6 === 0)
.map(x => x * 10)
.subscribe({
    next: x => console.log('next', x)
})
rabbitooops commented 6 years ago

I find lots of people think the conception of rx is hard, so I want to write this implementation for people who get started rx. but I'm not sure this implementation is correct, or I have some misunderstanding of rx, so I want you to point out my mistakes for me.

paulpdaniels commented 6 years ago

@rabbitmeow have you had a look at https://github.com/staltz/toy-rx?

rabbitooops commented 6 years ago

oops! Haven't yet, I implemented this Observable myself. Seems my code is very similar to toy-rx, Thank you.