Open rainit2006 opened 7 years ago
If not using Kafka
When using Kafka
About kafka
Kafka クラスタは、コンシュームされたかどうかに拘わらず、発行されたすべてのメッセージを保存しています。 保持する期間は設定で変更可能です。 例えばログ保存期間が2日間に設定されている場合、あるメッセージが発行されてから2日間はコンシューム可能ですが、 それ以降は容量確保のために破棄されます。 Kafkaの性能はデータサイズに関しては実質定数のため、大量のデータを保存することは問題ありません。
Core Concept
Cluster: a group of computers sharing workload for a common purpose.
Topic : a unique name for Kafka stream. Consumer用topic从Kafka里得到想要的数据(数据来源自Producer)
Partitions KafkaはTopicごとに1つ以上のPartitionという単位でメッセージを保存する.メッセージはそれぞれのPartitionの末尾に追記される.これによりPartitionごとにメッセージの順序性が担保される.例えば以下の図はあるTopicの3つのPartitionにメッセージが追記されていることを示す.
Offset Global unique identifier of message: Topic Name -> Partition number -> Offset
Consumer groups: A group of consumers acting as a single logical unit.
Demo steps
Fault Tolerance 3 replication factor : Number of total copies is 3.
Leader & Follower
Broker configuration Key config: 1, Broker.id 2, log.dirs 3, zookeeper.connect 4, port
API
public class KafkaProducer<K,V>
extends java.lang.Object
implements Producer<K,V>
创建Producer发送消息给Kafka
//设定kafka属性
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:4242");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//根据属性props生成一个Kafka对象 Producer<String, String> producer = new KafkaProducer<>(props);
//发送Record,使用ProducerRecord对象。 for(int i = 0; i < 100; i++) producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();
The send() method is asynchronous.
整个Producer的动作流程
![image](https://user-images.githubusercontent.com/12871721/27322107-c53b093a-55d7-11e7-979d-0f5e20025982.png)
- synchronously send
Invoking get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
If you want to simulate a simple blocking call you can call the get() method immediately:
byte[] key = "key".getBytes(); byte[] value = "value".getBytes(); ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value) RecordMetadata metadata = producer.send(record).get(); System.out.printIn(metadata.partition() 和 metadata.offset());
Fully non-blocking usage can make use of the Callback parameter to provide a callback that will be invoked when the request is complete.
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value); producer.send(myRecord, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if(e != null) e.printStackTrace(); System.out.println("The offset of the record we just sent is: " + metadata.offset()); } });
serializer and a deserializer 1,Create a supplier class 2, Create a producer 3, Create a serializer 4, Create a deserializer 5, Create a consumer
Consumer Group -- 概念: 1,Consumer Group : used to read and process data parallel. 2, Group coordinator: responsible for managing the members of the group as well as their partition assignments. 3,Rebalance: initiate when the list of consumers is modified. 4, Group Leader: executes relance activity.
リーダーの場合 -> Partitionの割当を行う。 TopicMetadataで取得したTopic/Partitionの情報と、JoinGroupで取得したメンバーの情報を元にPartitionの割当を行いGroupAssignmentに指定してRequestを投げる。 その他の場合 -> Partitionの割当を取得する。 リーダーがRequestを投げてくるまでResponseがブロックされる。
-- あるConsumerグループに所属するConsumerに対してIDである「group_id」を共有する。 -- Consumerグループに所属するConsumerプロセス達はあるトピックに対するパーティションを 出来るだけ均等になるように分け合う。1パーティションは1Consumerプロセスによって消費される。 -- ConsumerIDレジストリ: 「group_id」の他にConsumerにはConsumerの識別のために「consumer_id」(UUID形式のホスト名)が一時的に割り当てられる。 -- Consumerのオフセットトレース: Consumerプロセスはオフセットの最大値を各パーティションごとにどこまでメッセージを消費したかを判別するために記録する。 -- パーティションオーナーのレジストリ: BrokerのパーティションはConsumerグループ毎にある1Consumerによって消費される。
关于Kafka的offset管理 offset: offset是consumer position,Topic的每个Partition都有各自的offset.
消费者需要自己保留一个offset,从kafka 获取消息时,只拉去当前offset 以后的消息。Kafka 的scala/java 版的client 已经实现了这部分的逻辑,将offset 保存到zookeeper 上.
offset相关的设置项: 1, auto.offset.reset:
2,auto.commit.enable(例如true,表示offset自动提交到Zookeeper) If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer. This committed offset will be used when the process fails as the position from which the new consumer will begin。
3,auto.commit.interval.ms(例如60000,每隔1分钟offset提交到Zookeeper) 4, offsets.storage :Select where offsets should be stored (zookeeper or kafka).默认是Zookeeper。
Schema 利用Avro
ストリーム処理を支えるキューイングシステムの選び方 https://www.slideshare.net/laclefyoshi/ss-67658888
何时需要消息队列 https://zhuanlan.zhihu.com/p/21479556 做业务解耦/最终一致性/广播/错峰流控等。
解耦: 基于消息的模型,关心的是“通知”,而非“处理”。
最终一致性: 指的是两个系统的状态保持一致,要么都成功,要么都失败。当然有个时间限制,理论上越快越好,但实际上在各种异常的情况下,可能会有一定延迟达到最终一致状态,但最后两个系统的状态是一样的。 所有不保证100%不丢消息的消息队列,理论上无法实现最终一致性。好吧,应该说理论上的100%,排除系统严重故障和bug。 像Kafka一类的设计,在设计层面上就有丢消息的可能(比如定时刷盘,如果掉电就会丢消息)。哪怕只丢千分之一的消息,业务也必须用其他的手段来保证结果正确。
消息队列不是万能的。对于需要强事务保证而且延迟敏感的,RPC是优于消息队列的。 对于一些无关痛痒,或者对于别人非常重要但是对于自己不是那么关心的事情,可以利用消息队列去做。 支持最终一致性的消息队列,能够用来处理延迟不那么敏感的“分布式事务”场景,而且相对于笨重的分布式事务,可能是更优的处理方式。 当上下游系统处理能力存在差距的时候,利用消息队列做一个通用的“漏斗”。在下游有能力处理的时候,再进行分发。 如果下游有很多系统关心你的系统发出的通知的时候,果断地使用消息队列吧。
ActiveMQ消息数据持久化 http://www.jianshu.com/p/deb1816271d1 场景问题:服务器断电重启,未被消费的消息是否会在重启之后被继续消费? 两种选择:非持久性模式/持久性模式。
ActiveMQ的几种存储模式选择:
ActiveMQ 構成 activemq.xml ファイルに依存します。 ActiveMQ の構成パラメーターは、以下のとおりです。
broker この ActiveMQ のメッセージ・ブローカーは、トランスポート・コネクター、ネットワーク・コネクター、およびブローカーの構成に使用されるプロパティーから構成されます。 属性は以下のとおりです。 brokerName="localhost" - ブローカーの名前。 dataDirectory="./ActivemqDataStore" - ActiveMQ のデータの保管に使用されるディレクトリー。 useShutdownHook="true" - JVM が終了された場合に、ブローカーを閉じるためにシャットダウン・ハンドラーを使用するかどうかを設定します。 useJmx="true" - ブローカーのサービスを JMX 内で公開するかどうかを設定します。
managementContext このパラメーターでは、ActiveMQ を JMX 内で公開する方法を構成します。 属性は以下のとおりです。 createConnector="true" - ActiveMQ でそれ独自の JMX コネクターを作成するかどうかを設定します。 connectorPort="1099" - コネクターのポート。この値はデフォルトでは 1099 です。
persistenceAdapter/kahaDB このパラメーターでは、ブローカーのメッセージ・パーシスタンスを構成します。 属性は以下のとおりです。 journalMaxFileLength="32mb" - メッセージ・データ・ログの最大サイズを設定します。 checksumJournalFiles="true" - 壊れたジャーナルの有無の検査を可能にするために、ジャーナル・ファイルのチェックサムを作成します。 checkForCorruptJournalFiles="true" - 有効にした場合、始動時に壊れたジャーナル・ファイルがないかを検査し、 壊れたジャーナル・ファイルの復旧を試みます。
transportConnectors このパラメーターは、ActiveMQ で listen するトランスポート・コネクターから構成されます。属性は以下のとおりです。 name="openwire" - トランスポート・コネクターの名前。 uri="tcp://localhost:61616" - トランスポート・コネクターのアドレス。
Spring 4 MVC + JMS + ActiveMQ annotation based Example http://websystique.com/springmvc/spring-4-mvc-jms-activemq-annotation-based-example/
项目里ActiveMQ的使用
ActiveMQ.xml文件
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<broker xmlns="http://activemq.apache.org/schema/core"
useJmx="false"
brokerName="localhost"
dataDirectory="${activemq.base}/data">
<persistenceAdapter>
<kahaDB directory="${activemq.base}/data/kahadb"/>
</persistenceAdapter>
<systemUsage>
<systemUsage>
<storeUsage>
<storeUsage limit="150 gb"/>
</storeUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://127.0.0.1:61616"/>
</transportConnectors>
</broker>
</beans>
JmsUtilsFactory
import javax.jms.JMSException;
public class JmsUtilsFactory {
public static JmsUtils create() throws JMSException {
JmsUtils jmsUtils = new JmsUtils();
jmsUtils.setJmsBrokerUrl("failover:(tcp://localhost:61616)");
jmsUtils.afterPropertiesSet();
return jmsUtils;
}
}
消息的发送(项目中用它来把log文件发送到Brocker里,然后等待camel处理)
QueueSession queueSession = getQueueConnection().createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = queueSession.createQueue(queueName);
QueueSender queuesender = queueSession.createSender(queue);
TextMessage textmessage = queueSession.createTextMessage();
textmessage.setStringProperty(JmsConstants.JMS_UUID_PROPERTY_NAME, uuid);
textmessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
if (jmsExpiration > 0) {
textmessage.setJMSExpiration(jmsExpiration * 1000);
}
if (jmsXGroupID != null) {
textmessage.setStringProperty(JmsConstants.JMS_JMSXGROUPID_PROPERTY_NAME, jmsXGroupID);
}
textmessage.setText(stringMessage);
queuesender.send(textmessage);
queuesender.close();
queueSession.close();
消息的接收(项目里没有使用)
QueueSession queueSession = getQueueConnection().createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = queueSession.createQueue(queueName);
QueueReceiver queuereceiver = queueSession.createReceiver(queue, filterQuery);
TextMessage textMessage;
if (timeout == 0) {
textMessage = (TextMessage) queuereceiver.receiveNoWait();
} else {
textMessage = (TextMessage) queuereceiver.receive(timeout * 1000);
}
queuereceiver.close();
queueSession.close();
return textMessage;
项目中实际应用
jmsUtils.sendText(JmsConstants.JMS_QUEUE_PRODUCTION, 原始数据, new UID().toString(), null, -1);
Apache Camel
public class FileMoveWithCamel {
public static void main(String args[]) throws Exception {
CamelContext context = new DefaultCamelContext();
context.addRoutes(new RouteBuilder() {
public void configure() {
//from("file:d:/temp/inbox?noop=true").to("file:d:/temp/outbox");
from("file:d:/temp/inbox/?delay=30000").to("file:d:/temp/outbox");
}
});
context.start();
boolean loop =true;
while(loop){
Thread.sleep(25000);
}
context.stop();
}
}
上面例子体现了一个最简单的路由功能,比如d:/temp/inbox/是某一个系统FTP到Camel所在的系统的一个接收目录. d:/temp/outbox为Camel要发送的另一个系统的接收目录. from/to可以是如下别的形式,比如: from("file:d:/temp/inbox/?delay=30000").to("jms:queue:order");
再给出一个从from到to有中间流程process处理的例子:
public class FileProcessWithCamel {
public static void main(String args[]) throws Exception {
CamelContext context = new DefaultCamelContext();
context.addRoutes(new RouteBuilder() {
public void configure() {
FileConvertProcessor processor = new FileConvertProcessor();
from("file:d:/temp/inbox?noop=true").process(processor).to("file:d:/temp/outbox");
}
});
context.start();
boolean loop =true;
while(loop){
Thread.sleep(25000);
}
context.stop();
}
}
其中Process处理的代码例子如下:
这里的处理只是简单的把接收到的文件多行转成一行 。
public class FileConvertProcessor implements Processor{
@Override
public void process(Exchange exchange) throws Exception {
try {
InputStream body = exchange.getIn().getBody(InputStream.class);
BufferedReader in = new BufferedReader(new InputStreamReader(body));
StringBuffer strbf = new StringBuffer("");
String str = null;
str = in.readLine();
while (str != null) {
System.out.println(str);
strbf.append(str + " ");
str = in.readLine();
}
exchange.getOut().setHeader(Exchange.FILE_NAME, "converted.txt");
// set the output to the file
exchange.getOut().setBody(strbf.toString());
} catch (IOException e) {
e.printStackTrace();
}
}
}
项目里的应用:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ImportResource;
@SpringBootApplication
@ImportResource("CollectorContext.xml")
public class CollectionServerCamelApplication {
public static void main(String[] args) {
SpringApplication.run(CollectionServerCamelApplication.class, args);
}
}
其中CollectorContext.xml文件内容如下:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring" autoStartup="true">
<route id="multicaster" autoStartup="true">
<description>Collection Server</description>
<from uri="activemq:queue/production"/>
<to uri="Processor1"/>
<multicast>
<to uri="direct:dailyup2"/>
</multicast>
</route>
<route id="dailyup2" autoStartup="true">
<description>Daily Up2</description>
<from uri="direct:dailyup2"/>
<to uri="Processor2"/>
</route>
<route id="upload" autoStartup="true">
<from uri="file://s3?recursive=true&delete=true&filter=%23excludeTodayFilter"/>
<to uri="multiProcessor3"/>
</route>
</camelContext>
<bean id="multiProcessor3"
class="com.myDemo.xxxxxProcessor">
</bean>
<bean id="excludeTodayFilter" class="com.myDemo.DateFileFilter">
<property name="prefix" value=""/>
<property name="postfix" value=""/>
<property name="dateformat" value="yyyy-MM-dd"/>
</bean>
<bean id="activemq"
class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
<bean id="Processor1"
class="com.myDemo.Processor1">
<property name="xxxxx" value="xxxxxxxxxx"/>
<property name="xxxxx" value="xxxxx"/>
</bean>
<bean id="Processor2"
class="com.myDemo.Processor2">
<property name="prefix" value="s3/${target.xxxx}/schema-04/Log-"/>
</bean>
</beans>
https://www.youtube.com/watch?v=dq-ZACSt_gA&list=PLkz1SCf5iB4enAR00Z46JwY9GGkaS2NON&index=2