remesh-js / remesh

A CQRS-based DDD framework for large and complex TypeScript/JavaScript applications
https://remesh-js.github.io/remesh/dist/index.html
MIT License
653 stars 40 forks source link

event和effect的使用问题 #87

Closed ScarboroughCoral closed 4 months ago

ScarboroughCoral commented 1 year ago

现在有这样一个业务方法

const downloadId = downloader.startDownload(url, {
 onProgress: () => {},
 onFinish: () => {},
 onFailed: () => {},
})
download.cancelDownload(downloadId)

使用remesh定义如下模型 downloadCommand cancelDownloadCommand downloadStartedEvent progressUpdatedEvent downloadSuccessEvent downloadFailedEvent

按我的理解 download.startDownload发生发生在downloadStartedEvent的effect,如果下载的过程中发起了cancelDownloadCommand,此时downloadId应该在state里面了但是不清楚什么时机保存的,是应该在effect触发一个额外的事件downloadIdGotEvent吗?

github-actions[bot] commented 1 year ago

Thank feedback. We will check it later:-)

Lucifier129 commented 1 year ago

可以将 downloader 封装成 rxjsObservable 内部管理 cancel 逻辑.

大概像下面这样处理:

import { Observable, takeUntil, switchMap } from 'rxjs'

type DownloadState = {
    type: 'progress'
    progress: number
} | {
    type: 'error'
    error: string
} | {
    type: 'success'
    file: Blob
}

const download = (url: string) => {
    return new Observable<DownloadState>(subscriber => {
        const downloadId = downloader.startDownload(url, {
            onProgress: (progress) => {
                subscriber.next({ type: 'progress', progress })
            },
            onFinish: (file: Blob) => {
                subscriber.next({ type: 'success', file })
                subscriber.complete()
            },
            onFailed: (message: string) => {
                subscriber.next({ type: 'error', error: message })
                subscriber.complete()
            },
        })

        const unsubscribe = () => {
            downloader.cancelDownload(downloadId)
        }

        return unsubscribe
    })
}

domain.effect({
    name: 'DownloadEffect',
    impl: ({ fromEvent }) => {
        return fromEvent(DownloadStartedEvent).pipe(
            switchMap(event => {
                // cancel download if user cancels it
                return download(event.url).pipe(
                    takeUntil(fromEvent(DownloadCancelledEvent)),
                )
            }),
            map(state => {
                switch (state.type) {
                    case 'progress':
                        return DownloadProgressEvent(state.progress)
                    case 'error':
                        return DownloadFailedEvent(state.error)
                    case 'success':
                        return DownloadFinishedEvent(state.file)
                }
            })
        )
    }
})
ScarboroughCoral commented 1 year ago

可以将 downloader 封装成 rxjsObservable 内部管理 cancel 逻辑.

大概像下面这样处理:

import { Observable, takeUntil, switchMap } from 'rxjs'

type DownloadState = {
    type: 'progress'
    progress: number
} | {
    type: 'error'
    error: string
} | {
    type: 'success'
    file: Blob
}

const download = (url: string) => {
    return new Observable<DownloadState>(subscriber => {
        const downloadId = downloader.startDownload(url, {
            onProgress: (progress) => {
                subscriber.next({ type: 'progress', progress })
            },
            onFinish: (file: Blob) => {
                subscriber.next({ type: 'success', file })
                subscriber.complete()
            },
            onFailed: (message: string) => {
                subscriber.next({ type: 'error', error: message })
                subscriber.complete()
            },
        })

        const unsubscribe = () => {
            downloader.cancelDownload(downloadId)
        }

        return unsubscribe
    })
}

domain.effect({
    name: 'DownloadEffect',
    impl: ({ fromEvent }) => {
        return fromEvent(DownloadStartedEvent).pipe(
            switchMap(event => {
                // cancel download if user cancels it
                return download(event.url).pipe(
                    takeUntil(fromEvent(DownloadCancelledEvent)),
                )
            }),
            map(state => {
                switch (state.type) {
                    case 'progress':
                        return DownloadProgressEvent(state.progress)
                    case 'error':
                        return DownloadFailedEvent(state.error)
                    case 'success':
                        return DownloadFinishedEvent(state.file)
                }
            })
        )
    }
})

感谢! 太优雅了!

Lucifier129 commented 1 year ago

如果要支持同时下载很多个,可以将 switchMap 替换成 mergeMap,并且根据 DownloadCancelledEvent 携带的 url 进行过滤。以及拓展 DownloadState使之携带 url等数据

大概像下面这样:

domain.effect({
    name: 'DownloadEffect',
    impl: ({ fromEvent }) => {
        return fromEvent(DownloadStartedEvent).pipe(
            // for each download event, create a stream of progress events
            // merge them together instead of switching
            mergeMap(event => {
                // create a stream of cancel events for this download
                const cancel$ = fromEvent(DownloadCancelledEvent).pipe(
                    filter(cancelledEvent => cancelledEvent.url === event.url)
                )
                // cancel download if user cancels it
                return download(event.url).pipe(takeUntil(cancel$))
            }),
            map(state => {
                switch (state.type) {
                    case 'progress':
                        return DownloadProgressEvent(state)
                    case 'error':
                        return DownloadFailedEvent(state)
                    case 'success':
                        return DownloadFinishedEvent(state)
                }
            })
        )
    }
})