kafka使用笔记

 : jank    :   : 2383    : 2017-09-20 18:35  linux


一、下载安装

1.下载地址:https://www.apache.org/dyn/closer.cgi?path=/kafka/0.11.0.1/kafka_2.11-0.11.0.1.tgz

2.安装解压:tar -xzf kafka_2.11-0.11.0.1.tgz

  cd kafka_2.11-0.11.0.1

二、启动服务

1.Kafka使用ZooKeeper,所以你需要先启动一个ZooKeeper服务器,如果你还没有,可以使用kafka自带的便捷脚本来快速获取一个单节点ZooKeeper实例。

2.启动zookeeper服务:

bin/zookeeper-server-start.sh config/zookeeper.properties

3.启动kafka服务:

bin/kafka-server-start.sh config/server.properties

三、创建主题

1.我们用单个分区创建一个名为“test”的主题,只有一个副本:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test        #--partitions 1,只设置一个消费者,--topic test,创建名为test的主题

2.查看主题列表:

bin/kafka-topics.sh --list --zookeeper localhost:2181

四、发送接收消息

1.运行生产者,开启生产者控制台

Kafka附带一个命令行客户端,它将从文件或标准输入中输入,并将其作为消息发送到Kafka集群。默认情况下,每行将作为单独的消息发送。运行生产者,然后在控制台中输入一些消息以发送到服务器

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

2.在控制台中输入一些消息发送到服务器。

This is a message

3.运行消费者

卡夫卡还有一个命令行消费者将把消息转储到标准输出。

>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

This is a message

如果您将上述每个命令都运行在不同的终端中,那么您现在应该可以在生产者终端中输入消息,并看到它们出现在消费者终端中。所有命令行工具都有其他选项; 运行没有参数的命令将显示更详细的记录它们的使用信息。


五、设置多代理群集

1.到目前为止,我们一直在运行一个单一的broker(代理),但这没有乐趣。对于Kafka,单个代理只是一个大小为1的集群,所以没有什么改变,除了启动更多的代理实例。但是为了让它感觉到,让我们将集群扩展到三个节点(仍然在本地机器上)。


2.首先我们为每个代理设置一个配置文件(在Windows上使用copy命令):

>cp config/server.properties config/server-1.properties

>cp config/server.properties config/server-2.properties

3.broker.id属性是集群中每个节点的唯一和永久名称。我们必须覆盖端口和日志目录,只因为我们在同一台机器上运行这些目录,我们希望让经纪人不要在同一个端口上注册或覆盖对方的数据。


我们已经有Zookeeper,我们的单节点启动,所以我们只需要启动两个新节点:


> bin/kafka-server-start.sh config/server-1.properties &

...

> bin/kafka-server-start.sh config/server-2.properties &

4.现在创建一个复制因子为三的新主题:

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic test1

5.现在我们有一个集群,我们怎么知道个主/代理节点在做什么呢?要看到运行“describe topics”命令:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test1

Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:

   Topic: my-replicated-topic  Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0

  

以下是输出的说明。第一行给出了所有分区的摘要,每个附加行提供有关一个分区的信息。因为这个主题只有一个分区,只有一行。


“leader”是负责给定分区的所有读取和写入的节点。每个节点将成为随机选择的分区部分的主节点。

“replicas”是复制此分区的日志的节点列表,无论它们是主节点还是现在都是活着的。

“isr”是一组“同步”副本。这是副本列表的子集,它目前是生存和追赶主节点的。

请注意,在我的示例中,节点1是主题唯一分区的主节点。


6.我们可以在我们创建的原始主题上运行相同的命令来查看它的位置:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

Topic:test  PartitionCount:1    ReplicationFactor:1 Configs:

   Topic: test Partition: 0    Leader: 0   Replicas: 0 Isr: 0

所以没有什么惊喜,原来的主题没有复制品,而是在服务器0,我们创建它的集群中唯一的服务器。


7.我们发布一些消息到我们的新主题:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic

...

my test message 1

my test message 2


8,现在我们来看看这些消息:


> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

...

my test message 1

my test message 2


9.现在我们来测试容错。我们杀掉目前节点为1的主节点:


> ps aux | grep server-1.properties

7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...

> kill -9 7564


在Windows上使用:

> wmic process get processid,caption,commandline | find "java.exe" | find "server-1.properties"

java.exe    java  -Xmx1G -Xms1G -server -XX:+UseG1GC ... buildlibskafka_2.11-0.11.0.1.jar"  kafka.Kafka configserver-1.properties    644

> taskkill /pid 644 /f


主节点已经切换到其中一个从节点,节点1不再处于同步副本集中如下所示:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:

   Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 1,2,0 Isr: 2,0


不过消息仍然可以用于消费,即使最初为主节点也是如此,如下所示:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

...

my test message 1

my test message 2





   

备案编号:赣ICP备15011386号

联系方式:qq:1150662577    邮箱:1150662577@qq.com