Open kittencup opened 8 years ago
在Angular中,我们可以使用Observables作为骨干构建我们应用的数据架构,使用Observables来组织我们的数据被称为响应式编程(Reactive Programming).
但是到底什么是Observables,及什么是响应式编程呢?响应式编程是一种使用异步数据流来工作的方法,Observables就是我们用来实现响应式编程的主要数据结构。但是我承认,这些术语可能不是那么明确。我们在这一章来看看具体的例子,应该更有启发性的。
我想指出的这本书并不会重点介绍响应式编程。还有其他一些很好的资源,可以教你响应式编程的基础知识,你应该阅读。我们在下面列出了一些。
考虑到这一章是如何使用RxJS和Angular,而不是一个面面俱到的介绍RxJS和响应式编程的教程。
在这一章中,我将详细讲解RxJS概念和我们遇到的API解释。但是要知道,如果RxJS对你来任然是新的东西,你可能需要在这里补充内容和其他资源。
在本章中使用Underscore.js
Underscore.js是一个流行的库,它提供了对于Javascript数据结构,类似于Array和Object的操作函数,我们在本章和一堆RxJS使用它,如果你在代码中看到,类似于.map或_.sortBy,需要知道我们使用的是Underscore.js库,你可以在这找到Underscore.js文档
如果你刚开始学习RxJS我建议您先看看这篇文章:
当你熟悉RxJS背后的概念后,这里有一些更多的链接,可以帮助您:
在这一章我将提供RxJS的API文档的链接。RxJS文档有很多很好的例子代码,揭示不同的流是如何工作的。
我必须在Angular 2中使用RxJS?- 不,你肯定不喜欢。Observables只是一个模式,你可以在Angular 2中使用更多的数据模式。
我想给你一个合理的警告:学习RxJS有点令人费解的。但是请相信我,你会得到它的窍门,这是值得的。以下是您可能会发现有关于流有用的一些想法:
在本章,我们开始使用RxJS来构建一个聊天应用,下面是它的截图
通常我们试图在这本书内展示每一行代码。然而,这个聊天应用有许多组成部分,因此在本章中,我们不会有所有的代码。你可以在文件夹/rxjs/chat找到在本章中的聊天应用示例代码。
在这个应用中我们提供了一些机器人可以与你聊天。找到代码,并尝试它:
cd code/rxjs/chat
npm install
npm run go
现在打开浏览器访问http://localhost:8080.
如果这个URL出现问题,请尝试URL : http://localhost:8080/webpack-dev-server/index.html
请注意几件事情:
让我们来看看这个应用是如何构造的。我们有
让我们来一起看看它们:
该页面被分解成三个顶级组件:
这个应用也有3个model
在这个应用中,我们每一个model都有对应的service。service是一个单例对象,该对象扮演2个角色:
例如,UserService:
在更高层次上,应用的数据结构是简单的:
例如,ChatThreads组件监听从ThreadService来的最新主题帖列表,ChatWindow订阅最新的消息列表
在本章的其余部分,我们要去深入了解我们如何实现这一点使用Angular 2和RxJS。我们将通过实现我们的model开始,然后看看我们是如何创建service来管理我们的流,然后最后实现了组件。
让我们先从简单的东西开始,先来看看model
我们的User类很简单。我们有一个id,name,和avatarSrc
export class User {
id: string;
constructor(public name: string,
public avatarSrc: string) {
this.id = uuid();
}
}
注意上面,我们在constructor中使用了一个TypeScript的简写语法,当我们声明
public name: string
,我们会告诉TypeScript, 1.我们要在这个类声明一个公开的name属性,2.当创建一个实例时,将参数分配给这个属性
同样,Thread
也是一个简单的TypeScript类:
export class Thread {
id: string;
lastMessage: Message;
name: string;
avatarSrc: string;
constructor(id?: string,
name?: string,
avatarSrc?: string) {
this.id = id || uuid();
this.name = name;
this.avatarSrc = avatarSrc;
}
}
注意,在我们的Thread类里我们存储了lastMessage的对象。这让我们在主题帖列表中显示最新消息的预览。
Message类也是个简单的TypeScript类,但是在这种情况下,我们使用一个稍微不同的构造函数形式:
export class Message {
id: string;
sentAt: Date;
isRead: boolean;
author: User;
text: string;
thread: Thread;
constructor(obj?: any) {
this.id = obj && obj.id || uuid();
this.isRead = obj && obj.isRead || false;
this.sentAt = obj && obj.sentAt || new Date();
this.author = obj && obj.author || null;
this.text = obj && obj.text || null;
this.thread = obj && obj.thread || null;
}
}
你看到这里的构造函数的模式允许我们使用关键字参数的构造函数来模拟。使用这种模式,我们可以使用任何可用的数据来创建一个新的消息,我们不必担心参数的顺序,例如,我们可以这样做:
let msg1 = new Message();
# or this
let msg2 = new Message({
text: "Hello Nate Murray!"
})
现在,我们已经看到了我们的model,让我们来看看我们的第一个service:UserService。
UserService的重点是提供了一个让我们应用了解当前的用户及如果当前用户有变更。则通知其余的应用。
我们需要做的第一件事是创建一个TypeScript类,使用@Injectable注解后,该类可被注入
# code/rxjs/chat/app/ts/services/UserService.ts
@Injectable()
export class UserService {
当我们创建一些injectable,则意味着在我们的应用中我们可以在其他组件中依赖它。简单的说,依赖注入的两大好处是 1.我们让angular来处理对象的生命周期,2.更容易测试注入的组件。
我们会在依赖注入这章更多的讨论@Injectable,现在在我们的组件中,我们可以作为一个依赖注入它,如下所示:
class MyComponent{
constructor(public userSerice:UserService){
// do something with `userService` here
}
}
接下去 我们创建一个用来管理我们当前用户的流
# code/rxjs/chat/app/ts/services/UserService.ts
currentUser: Rx.Subject<User> = new Rx.BehaviorSubject<User>(null);
在这里有很多事情,让我们把它分解:
如果你还没有接触到过多RxJS,那么你可能不知道什么是Subject或BehaviorSubject。你可以把Subject认为是“读/写”流。
从技术上讲,一个Subject是从Observable和Observer继承的
流的一个相关问题是,由于当前用户是在订阅前被设置的,一个新的订阅会丢失在订阅前设置的内容。 BehaviourSubject则弥补这一点。
BehaviourSubject有一个特殊的属性,它保存的最后一个值。这意味着任何订阅该流都将收到的上一次的值。这对我们很有用处,因为这意味着我们的应用任何地方都可以订阅UserService.currentUser流,并立即知道谁是当前用户。
我们需要一种方法用来只要当前用户的变化(如登录)就发布一个新用户到流。
有2种方法,我们可以公开一个接口来做这个
更新当前用户的最直接的方法是在客户端使用UserService只需简单发布新的用户到流,像这样:
userService.subscribe((newUser) => {
console.log('New User is: ', newUser.name);
})
// => New User is: originalUserName
let u = new User('Nate', 'anImgSrc'); userService.currentUser.onNext(u);
// => New User is: Nate
注意,在这里,我们使用Subject上的onNext方法将一个新的值推到流
这里的好处是我们能够直接使用流现有的API,所以我们不会引入任何新代码或API
我们可以更新当前用户的另一种方法是在UserService创建一个辅助方法 像这样:
public setCurrentUser(newUser: User): void {
this.currentUser.onNext(newUser);
}
你会发现,我们仍然使用currentUser流上的onNext方法,为何这样做呢?
因为这个操作从流上实现解耦到了currentUser上实现,通过在setCurrentUser上包装onNext,我们可以在UserService中改变实现而不破坏我们客户端的代码
在这种情况下,我不太推荐第一种方式,但是,它可以在较大项目的可维护性有很大差异
第三种选择是更新自己暴露出的流(也就是说,我们把一个流放在改变当前用户的方法里)。我们会在下面的MessagesService探索这种模式。
把它们连起来,UserService看起来是这样的
import {Injectable, provide} from 'angular2/core';
import {Subject, BehaviorSubject} from 'rxjs';
import {User} from '../models';
/**
* UserService manages our current user
*/
@Injectable()
export class UserService {
// `currentUser` contains the current user
currentUser:Subject<User> = new BehaviorSubject<User>(null);
public setCurrentUser(newUser:User):void {
this.currentUser.next(newUser);
}
}
export var userServiceInjectables:Array<any> = [
provide(UserService).toClass(UserService)
];
MessagesService 是这个应用的重要组成部分,在我们的应用中,所有的消息都会经过MessagesService。
相比我们的UserService,我们的MessagesService具有更复杂stream,有5个stream来构成我们的MessagesService:3个 数据管理(data management”) stream 和 2个 行为(action) stream
这三个数据管理流是:
newMessages是一个Subject,将发布每一个新的消息,但只发一次。
export class MessagesService {
// a stream that publishes new messages only once
newMessages: Rx.Subject<Message> = new Rx.Subject<Message>();
如果我们愿意,我们可以定义一个辅助方法,添加信息到该流中:
addMessage(message: Message): void {
this.newMessages.onNext(message);
}
它也将有助于一个stream,从不是来自一个特定的用户的主题帖里获取所有的消息,例如,想一想 Echo Bot:
当我们正在实现Echo Bot,我们不希望进入一个无限循环和重复回到此bot的本身的消息。
要实现这一点,我们可以订阅newMessages流,并筛选出 1.属于这个主题帖, 2.不是此BOT写的 所有的消息
你可以这样认为,对于一个给定的主题帖我想要当前用户的消息流
messagesForThreadUser(thread: Thread, user: User): Rx.Observable<Message> {
return this.newMessages
.filter((message: Message) => {
// belongs to this thread
return (message.thread.id === thread.id) &&
// and isn't authored by this user
(message.author.id !== user.id);
});
}
messagesForThreadUser接受一个Thread和一个User,并返回一个新的消息流,这个流过滤掉了所有非当前主题帖和用户的消息,也就是说,是这个主题帖中其他人的消息流
鉴于newMessages发出单个信息,messages则是一个保存所有消息的流。
messages: Rx.Observable<Message[]>;
该 Message[] 类型 同Array
是一样的,另一种一样的写法是, Rx.Observable<Array<Message>>
,当我们定义消息类型为Rx.Observable<Message[]>
时,我们的意思是这个流包含了所有发出的组数(消息),而不是单个消息。
那么,消息是如何被填充的呢?我们需要讨论的update流和一个新的模式:操作stream。
这里的想法:
你可以认为它是这样的:这个操作函数是指,被放在updates流将改变当前的消息列表。一个放在updates数据流上的函数应该接受一个消息列表,然后返回一个消息列表。让我们通过在代码中创建一个接口来正式化这个想法:
interface IMessagesOperation extends Function {
(messages: Message[]): Message[];
}
让我们定义我们的updates流
updates: Rx.Subject<any> = new Rx.Subject<any>();
记住,updates接收操作将应用于我们的消息列表。但是,我们如何作出这样的连接?我们是这样的(在我们的MessagesService的构造函数):
constructor() {
this.messages = this.updates
// watch the updates and accumulate operations on the messages
.scan(initialMessages, (messages: Message[],
operation: IMessagesOperation) => {
return operation(messages); })
这段代码引入了一个新的流功能:scan。如果你熟悉函数式编程,scan很像reduce,为输入流中的每个元素运行该函数,并积累值。scan的特殊之处在于它将为中间每一个结果发出一个值。也就是说,它不会等待流完成后,发射出一个结果,这正是我们想要的。 当我们调用this.updates.scan时,我们创建一个新的流,它将订阅updates流,在循环每一个时,我们将:
并且我们返回一个新的Message[]
有一点需要知道的是流默认不是可共享的,也就是说,如果一个用户从一个流中读取值,这个值就可以被永远地消失。在我们的信息的情况下,我们需要1,在许多订阅之间共享同一个流,2.对于那些迟到的订阅,重播最后个值。
要做到这一点,我们使用shareReplay方法,如下所示:
constructor() {
this.messages = this.updates
// watch the updates and accumulate operations on the messages
.scan(initialMessages, (messages: Message[],
operation: IMessagesOperation) => {
return operation(messages); })
// make sure we can share the most recent list of messages across anyone // who's interested in subscribing and cache the last known list of
// messages
.shareReplay(1);
现在,我们可以添加一条消息到message流如下所示:
var myMessage = new Message(/* params here... */);
updates.onNext( (messages: Message[]): Message[] => { return messages.concat(myMessage);
})
上面,在update流中我们增加一个操作。消息订阅该流并因此将应用该操作,这将将我们的消息合并到原先的累加的消息列表里。
如果这需要几分钟的时间来考虑它的好。如果你不习惯这种编程方式,它可以感觉到一点点外国话。
上述方法的一个问题是,它的使用有点啰嗦。不需要每次都写内部函数就好了。我们可以做类似的事情:
addMessage(newMessage: Message) {
updates.onNext( (messages: Message[]): Message[] => {
return messages.concat(newMessage);
})
}
// somewhere else
var myMessage = new Message(/* params here... */);
MessagesService.addMessage(myMessage);
这是一个更好的方式,但它不是“反应式的方式”. 在某种程度上,因为创建消息的这个动作是不可与其他流组合的。(另外这个方法绕过我们的newMessages流。更在它之后。)
创建一个新的消息的反应式的方式是,有一个接受消息添加到该列表中的流。同样,这可能是一个有点新的,如果你不习惯这样的想法。这里是你如何实现它:
首先,我们创建一个"操作流"称为create. (术语“操作流”这个词只是用来描述,流本身仍然是一个常规的Subject):
// action streams
create: Rx.Subject<Message> = new Rx.Subject<Message>();
接下来,在我们的构造函数中,我们配置了create流:
this.create
.map( function(message: Message): IMessagesOperation {
return (messages: Message[]) => { return messages.concat(message);
}; })
该map运算符是很像javascrpt内置的Array.map功能,但这里它只适用于流。
在这种情况下,我们说“因为我们接收每一个输入的消息,返回IMessagesOperation类型的操作,它将此消息添加到消息列表中”。换句话说,这个流将发出一个函数,它接受消息列表,并将新的消息添加到该消息列表
现在,我们有create流,我们还有一件事没有做:我们需要真正地把它挂到更新流上。我们通过使用subscribe。
this.create
.map( function(message: Message): IMessagesOperation {
return (messages: Message[]) => { return messages.concat(message);
}; })
.subscribe(this.updates);
我们在这里所做的是update流订阅create流。这意味着,如果create收到消息时,它会发出IMessagesOperation将由update接收执行,然后信息会被添加到消息列表中的。
下面是一个图,说明我们目前的情况:
这是伟大的,因为这意味着我们得到了几件事情:
在我们代码的任何地方,如果我想要获得当消息的最新列表,我们只需要进入messages流,但我们还有一个问题,我们还没有将流流向连接上newMessages流
this.newMessages.subscribe(this.create);
现在我们的图看起来是这样的:
现在我们的流向就完成了!这是两全其美的:我们能够通过订阅newMessages流获得独立的消息流,但是如果我们只是想要最新消息列表,我们可以订阅message流。
值得指出的是这个设计的一些影响:如果你直接订阅newMessages,你必须要小心下游(downstream)可能发生的变化。这里有三件事情要考虑: 首先,你显然不会得到任何下游应用于信息的更新。 第二,在这种情况下,我们有可变Message对象。所以,如果你订阅newMessages,并存储一个Message引用,该Message的属性可能会改变。 第三, 在这种情况下您可能无法利用我们Message可变性的优势,考虑一下这种情况:我们可以在这个update队列上使用一个操作,这个操作会复制每一个Message,然后去改变这个复制的Message (这可能是比我们在这里做的更好的设计),在这种情况下,你不能让任何直接从newMessages发出的消息处于“最终”状态。
也就是说,只要你记住这些注意事项,你不应该有太多的麻烦。
下面是完整的MessagesService的样子:
# code/rxjs/chat/app/ts/services/MessagesService.ts
import {Injectable, bind} from 'angular2/core';
import {Subject, Observable} from 'rxjs';
import {User, Thread, Message} from '../models';
let initialMessages: Message[] = [];
interface IMessagesOperation extends Function {
(messages: Message[]): Message[];
}
@Injectable()
export class MessagesService {
// a stream that publishes new messages only once
newMessages: Subject<Message> = new Subject<Message>();
// `messages` is a stream that emits an array of the most up to date messages
messages: Observable<Message[]>;
// `updates` receives _operations_ to be applied to our `messages`
// it's a way we can perform changes on *all* messages (that are currently
// stored in `messages`)
updates: Subject<any> = new Subject<any>();
// action streams
create: Subject<Message> = new Subject<Message>();
markThreadAsRead: Subject<any> = new Subject<any>();
constructor() {
this.messages = this.updates
// watch the updates and accumulate operations on the messages
.scan((messages: Message[],
operation: IMessagesOperation) => {
return operation(messages);
},
initialMessages)
// make sure we can share the most recent list of messages across anyone
// who's interested in subscribing and cache the last known list of
// messages
.publishReplay(1)
.refCount();
// `create` takes a Message and then puts an operation (the inner function)
// on the `updates` stream to add the Message to the list of messages.
//
// That is, for each item that gets added to `create` (by using `next`)
// this stream emits a concat operation function.
//
// Next we subscribe `this.updates` to listen to this stream, which means
// that it will receive each operation that is created
//
// Note that it would be perfectly acceptable to simply modify the
// "addMessage" function below to simply add the inner operation function to
// the update stream directly and get rid of this extra action stream
// entirely. The pros are that it is potentially clearer. The cons are that
// the stream is no longer composable.
this.create
.map( function(message: Message): IMessagesOperation {
return (messages: Message[]) => {
return messages.concat(message);
};
})
.subscribe(this.updates);
this.newMessages
.subscribe(this.create);
// similarly, `markThreadAsRead` takes a Thread and then puts an operation
// on the `updates` stream to mark the Messages as read
this.markThreadAsRead
.map( (thread: Thread) => {
return (messages: Message[]) => {
return messages.map( (message: Message) => {
// note that we're manipulating `message` directly here. Mutability
// can be confusing and there are lots of reasons why you might want
// to, say, copy the Message object or some other 'immutable' here
if (message.thread.id === thread.id) {
message.isRead = true;
}
return message;
});
};
})
.subscribe(this.updates);
}
// an imperative function call to this action stream
addMessage(message: Message): void {
this.newMessages.next(message);
}
messagesForThreadUser(thread: Thread, user: User): Observable<Message> {
return this.newMessages
.filter((message: Message) => {
// belongs to this thread
return (message.thread.id === thread.id) &&
// and isn't authored by this user
(message.author.id !== user.id);
});
}
}
export var messagesServiceInjectables: Array<any> = [
bind(MessagesService).toClass(MessagesService)
];
如果你还没有准备好,这将是一个好时机去开拓代码和感受下MessagesService它是如何工作的。我们有一个例子,你可以在test/services/MessagesService.spec.ts启动。
要运行这个项目的测试,那么打开你的终端:
cd/path/to/code/rxjs/chat//<--your path will vary
npm install
karma start
让我们开始为我们的模型创建一些实例:
# code/rxjs/chat/test/services/MessagesService.spec.ts
let user: User = new User('Nate', '');
let thread: Thread = new Thread('t1', 'Nate', ''); let m1: Message = new Message({
author: user, text: 'Hi!', thread: thread
});
let m2: Message = new Message({ author: user,
text: 'Bye!',
thread: thread
})
接下来,让我订们阅了我们的几个流:
# code/rxjs/chat/test/services/MessagesService.spec.ts
let messagesService = new MessagesService();
// listen to each message indivdually as it comes in
messagesService.newMessages
.subscribe( (message: Message) => {
console.log("=> newMessages: " + message.text);
});
// listen to the stream of most current messages
messagesService.messages
.subscribe( (messages: Message[]) => {
console.log("=> messages: " + messages.length);
});
messagesService.addMessage(m1);
messagesService.addMessage(m2);
// => messages: 0
// => messages: 1
// => newMessages: Hi!
// => messages: 2
// => newMessages: Bye!
这里要注意两件事情:
测试下MessagesService及感受一下那里的流。我们在下一节中建立了ThreadsService,来使用它们。
在我们的ThreadsService上定义4个流用于发射,分别为:
让我们通过如何建立这些流,我们将沿途学习更多关于RxJS。
让我们先来定义我们的ThreadsService类和实例变量,将发出Threads:
import {Injectable, bind} from 'angular2/core';
import {Subject, BehaviorSubject, Observable} from 'rxjs';
import {Thread, Message} from '../models';
import {MessagesService} from './MessagesService';
import * as _ from 'underscore';
@Injectable()
export class ThreadsService {
// `threads` is a observable that contains the most up to date list of threads
threads: Observable<{ [key: string]: Thread }>;
注意:这个流将发射一个map(object),map的key是Thread的字符串id,value则是这个Thread本身
要创建一个流保持的当前主题列表,我们首先附着在messagesService.messages流:
# code/rxjs/chat/app/ts/services/ThreadsService.ts
this.threads = messagesService.messages
回想一下,一个新的消息被添加到流,message将发射当前消息数组。我们要看看每个消息,并我们想返回帖子的唯一列表。
# code/rxjs/chat/app/ts/services/ThreadsService.ts
this.threads = messagesService.messages
.map( (messages: Message[]) => {
let threads: {[key: string]: Thread} = {};
// Store the message's thread it in our accumulator `threads` messages.map((message: Message) => {
threads[message.thread.id] = threads[message.thread.id] || message.thread;
注意上面,每次我们将创建帖子的一个新的列表。这样做的原因是因为我们可能会删除一些消息(如离开谈话)。因为我们每一次重新计算帖子列表,如果它没有消息,我们自然会“删除”帖子。
在帖子列表中,我们希望通过在该帖子显示聊天的文字预览使用最新的消息。
为了做到这一点,我们为每个帖子将存储最新的消息。我们通过比较sentAt时间知道这消息是最新的:
# code/rxjs/chat/app/ts/services/ThreadsService.ts
// Cache the most recent message for each thread
let messagesThread: Thread = threads[message.thread.id];
if (!messagesThread.lastMessage ||
messagesThread.lastMessage.sentAt < message.sentAt) {
messagesThread.lastMessage = message;
});
return threads;
})
全部放在一起,threads是这样的:
# code/rxjs/chat/app/ts/services/ThreadsService.ts
this.threads = messagesService.messages
.map( (messages: Message[]) => {
let threads: {[key: string]: Thread} = {};
// Store the message's thread in our accumulator `threads`
messages.map((message: Message) => {
threads[message.thread.id] = threads[message.thread.id] ||
message.thread;
// Cache the most recent message for each thread
let messagesThread: Thread = threads[message.thread.id];
if (!messagesThread.lastMessage ||
messagesThread.lastMessage.sentAt < message.sentAt) {
messagesThread.lastMessage = message;
}
});
return threads;
});
让我们尝试我们的ThreadsService。首先,我们将创建一个模型来工作:
# code/rxjs/chat/test/services/ThreadsService.spec.ts
let nate:User = new User('Nate Murray', '');
let felipe:User = new User('Felipe Coury', '');
let t1:Thread = new Thread('t1', 'Thread 1', '');
let t2:Thread = new Thread('t2', 'Thread 2', '');
let m1:Message = new Message({
author: nate,
text: 'Hi!',
thread: t1
});
let m2:Message = new Message({
author: felipe,
text: 'Where did you get that hat?', thread: t1
});
let m3:Message = new Message({
author: nate,
text: 'Did you bring the briefcase?', thread: t2
});
现在让我们创建一个服务实例:
# code/rxjs/chat/test/services/ThreadsService.spec.ts
let messagesService: MessagesService = new MessagesService();
let threadsService: ThreadsService = new ThreadsService(messagesService);
注意这里我们通过messagesService作为参数传递给我们的threadsService构造函数。一般来说,我们让依赖注入系统来处理这个问题。但在我们的测试中,我们可以自己提供依赖。
让我们订阅threads,然后通过:
# code/rxjs/chat/test/services/ThreadsService.spec.ts
threadsService.threads
.subscribe( (threadIdx: { [key: string]: Thread }) => {
let threads: Thread[] = _.values(threadIdx);
let threadNames: string = _.map(threads, (t: Thread) => t.name)
.join(', ');
console.log(`=> threads (${threads.length}): ${threadNames} `);
});
messagesService.addMessage(m1);
messagesService.addMessage(m2);
messagesService.addMessage(m3);
// => threads (1): Thread 1
// => threads (1): Thread 1
// => threads (2): Thread 1, Thread 2
}); });
threads提供了一个map,它作为我们的帖子列表的“索引”。但我们希望帖子视图以最新消息为排列顺序的。
让我们创建一个新的流来返回按最近消息时间排序的threads数组:我们将开始定义orderedThreads实例属性:
# code/rxjs/chat/app/ts/services/ThreadsService.ts
// `orderedThreads` contains a newest-first chronological list of threads
orderedThreads: Rx.Observable<Thread[]>;
其次,在构造函数中我们定义orderedThreads来通过订阅threads对消息按照时间进行排序:
# code/rxjs/chat/app/ts/services/ThreadsService.ts
this.orderedThreads = this.threads
.map((threadGroups: { [key: string]: Thread }) => {
let threads: Thread[] = _.values(threadGroups);
return _.sortBy(threads, (t: Thread) => t.lastMessage.sentAt).reverse();
})
.shareReplay(1);
我们的应用程序需要知道哪些thread是当前选定的thread。这让我们知道:
让我们创建BehaviorSubject将存储currentThread:
# code/rxjs/chat/app/ts/services/ThreadsService.ts
// `currentThread` contains the currently selected thread
currentThread: Rx.Subject<Thread> =
new Rx.BehaviorSubject<Thread>(new Thread());
注意,我们正在发布一个空Thread作为默认值。我们并不需要任何进一步配置currentThread。
要设置当前thread,我们可以在客户端或者1.通过onNext直接提交新的thread或2.添加一个辅助的方法来做到这一点。 让我们定义一个setCurrentThread辅助方法,我们可以使用它来设置下一个thread:
# code/rxjs/chat/app/ts/services/ThreadsService.ts
setCurrentThread(newThread: Thread): void {
this.currentThread.onNext(newThread);
}
我们要跟踪未读邮件的数量。如果我们切换到一个新的thread,那我们要标记所有在该thread的消息的为已读。我们需要做的部分:
所以,我们需要做的把它们勾在一起:
# code/rxjs/chat/app/ts/services/ThreadsService.ts
this.currentThread.subscribe(this.messagesService.markThreadAsRead);
现在,我们有了当前选定的Thread,我们需要确保我们可以显示该thread中的消息列表。
实现这是可能看起来比表面的有点复杂得多。假设我们实现这样的:
var theCurrentThread: Thread;
this.currentThread.subscribe((thread: Thread) => { theCurrentThread = thread;})
this.currentThreadMessages.map((mesages: Message[]) => {
return _.filter(messages, (message: Message) => {
return message.thread.id == theCurrentThread.id; })
})
这种方法有什么不好?恩,如果currentThread有变化,currentThreadMessages不会知道这件事,所以在currentThreadMessages我们将有一个过时的名单!
如果我们推翻它,存储当前消息列表在一个变量中,并订阅currentThread的变化?我们会有同样的问题,只有这个时候,我们知道Thread的变化,而不是一个新的消息进来。
我们怎样才能解决这个问题?
原来,RxJS拥有一套可以结合多个流的操作符。在这种情况下,我们想说“如果currentThread或messagesService.messages改变,我们要发射的某些内容“。 在这里我们使用combineLatest操作。
# code/rxjs/chat/app/ts/services/ThreadsService.ts
this.currentThreadMessages = this.currentThread
.combineLatest(messagesService.messages,
(currentThread: Thread, messages: Message[]) => {
当我们将两者相结合的时候,一个或另一个会先到达,并没有保证我们将有一个值在两个流,所以我们需要检查,以确保什么是我们需要,否则我们将返回一个空列表。
现在,我们有两个当前thread和消息,我们可以过滤掉只是我们不感兴趣的消息:
# code/rxjs/chat/app/ts/services/ThreadsService.ts
this.currentThreadMessages = this.currentThread
.combineLatest(messagesService.messages,
(currentThread:Thread, messages:Message[]) => {
if (currentThread && messages.length > 0) {
return _.chain(messages)
.filter((message:Message) =>
(message.thread.id === currentThread.id))
另外一个细节,因为我们已经在寻找对于当前Thread的消息,这是一个方便的地方为这些邮件标记已读
# code/rxjs/chat/app/ts/services/ThreadsService.ts
.filter((message: Message) =>
(message.thread.id === currentThread.id))
.map((message: Message) => {
message.isRead = true;
return message; })
.value();
我们是否应该在这里为已读消息标志是有争议的。最大的缺点是,我们改变的对象是什么,从本质上讲,一个"读"的thread。即这是一个具有副作用的读操作,这通常是一个坏主意 这就是说,在本应用的currentThreadMessages只适用于currentThread,该currentThread应该始终将它的邮件标记为已读。这就是说,我建议一般“读有副作用的”不是一个模式。
将其组合在一起,这里是currentThreadMessages样子:
# code/rxjs/chat/app/ts/services/ThreadsService.ts
this.currentThreadMessages = this.currentThread
.combineLatest(messagesService.messages,
(currentThread: Thread, messages: Message[]) => {
if (currentThread && messages.length > 0) {
return _.chain(messages)
.filter((message: Message) =>
(message.thread.id === currentThread.id))
.map((message: Message) => {
message.isRead = true;
return message; })
.value();
} else {
return [];
}
})
下面是我们的ThreadService的样子:
# code/rxjs/chat/app/ts/services/ThreadsService.ts
import {Injectable, provide} from 'angular2/core';
import {Subject, BehaviorSubject, Observable} from 'rxjs';
import {Thread, Message} from '../models';
import {MessagesService} from './MessagesService';
import * as _ from 'underscore';
@Injectable()
export class ThreadsService {
// `threads` is a observable that contains the most up to date list of threads
threads: Observable<{ [key: string]: Thread }>;
// `orderedThreads` contains a newest-first chronological list of threads
orderedThreads: Observable<Thread[]>;
// `currentThread` contains the currently selected thread
currentThread: Subject<Thread> =
new BehaviorSubject<Thread>(new Thread());
// `currentThreadMessages` contains the set of messages for the currently
// selected thread
currentThreadMessages: Observable<Message[]>;
constructor(public messagesService: MessagesService) {
this.threads = messagesService.messages
.map( (messages: Message[]) => {
let threads: {[key: string]: Thread} = {};
// Store the message's thread in our accumulator `threads`
messages.map((message: Message) => {
threads[message.thread.id] = threads[message.thread.id] ||
message.thread;
// Cache the most recent message for each thread
let messagesThread: Thread = threads[message.thread.id];
if (!messagesThread.lastMessage ||
messagesThread.lastMessage.sentAt < message.sentAt) {
messagesThread.lastMessage = message;
}
});
return threads;
});
this.orderedThreads = this.threads
.map((threadGroups: { [key: string]: Thread }) => {
let threads: Thread[] = _.values(threadGroups);
return _.sortBy(threads, (t: Thread) => t.lastMessage.sentAt).reverse();
});
this.currentThreadMessages = this.currentThread
.combineLatest(messagesService.messages,
(currentThread: Thread, messages: Message[]) => {
if (currentThread && messages.length > 0) {
return _.chain(messages)
.filter((message: Message) =>
(message.thread.id === currentThread.id))
.map((message: Message) => {
message.isRead = true;
return message; })
.value();
} else {
return [];
}
});
this.currentThread.subscribe(this.messagesService.markThreadAsRead);
}
setCurrentThread(newThread: Thread): void {
this.currentThread.next(newThread);
}
}
export var threadsServiceInjectables: Array<any> = [
provide(ThreadsService).toClass(ThreadsService)
];
我们的数据模型和服务都完成了的!现在,我们现在需要将这一切绑在我们的视图组件!在接下来的章节中,我们将建立我们的3个主要组件来渲染,并与这些流交互。
该issue关闭讨论,如有问题请去 https://github.com/kittencup/angular2-ama-cn/issues/43 提问
目录