kafka初级教程

1.kafka环境配置

1.1 下载kafka

kafka官方网站

kafka官方下载页面

我下载的是kafka_2.12-1.0.1.tgz(点击下载链接即可下载)

下载完成后解压到/usr/local/下

1
2
3
4
$ mv kafka_2.12-1.0.1.tgz /usr/local/
$ cd /usr/local/
$ tar -zxvf kafka_2.12-1.0.1.tgz
$ mv kafka_2.12-1.0.1 kafka

1.2 下载zookeeper

zookeeper下载地址

下载完成后解压到/usr/local

1.3 配置文件修改

到zookeeper/conf下修改zoo.cfg文件

修改内容如下

1
2
3
4
5
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper
clientPort=2181 #端口

然后修改kafka的配置文件

kafka/config/server.properties文件,内容如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
broker.id=0
advertised.listeners=PLAINTEXT://master:9092
listeners=PLAINTEXT://master:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181 #要和zookeeper配置的端口一致
zookeeper.connection.timeout.ms=6000

创建2个新文件夹并授权

1
2
3
4
[root@localhost config]# mkdir /data/kafka-logs
[root@localhost config]# mkdir /data/zookeeper_data
[root@localhost config]# chown lzq996298643:lzq996298643 -R /data/kafka-logs
[root@localhost config]# chown lzq996298643:lzq996298643 -R /data/zookeeper_data

启动kafka自带的zookeeper和kafka启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
[root@localhost bin]# cd /usr/local/kafka/bin
[root@localhost bin]# ll
总用量 128
-rwxr-xr-x. 1 root root 1335 2月 22 06:26 connect-distributed.sh
-rwxr-xr-x. 1 root root 1332 2月 22 06:26 connect-standalone.sh
-rwxr-xr-x. 1 root root 861 2月 22 06:26 kafka-acls.sh
-rwxr-xr-x. 1 root root 873 2月 22 06:26 kafka-broker-api-versions.sh
-rwxr-xr-x. 1 root root 864 2月 22 06:26 kafka-configs.sh
-rwxr-xr-x. 1 root root 945 2月 22 06:26 kafka-console-consumer.sh
-rwxr-xr-x. 1 root root 944 2月 22 06:26 kafka-console-producer.sh
-rwxr-xr-x. 1 root root 871 2月 22 06:26 kafka-consumer-groups.sh
-rwxr-xr-x. 1 root root 948 2月 22 06:26 kafka-consumer-perf-test.sh
-rwxr-xr-x. 1 root root 869 2月 22 06:26 kafka-delete-records.sh
-rwxr-xr-x. 1 root root 863 2月 22 06:26 kafka-log-dirs.sh
-rwxr-xr-x. 1 root root 862 2月 22 06:26 kafka-mirror-maker.sh
-rwxr-xr-x. 1 root root 886 2月 22 06:26 kafka-preferred-replica-election.sh
-rwxr-xr-x. 1 root root 959 2月 22 06:26 kafka-producer-perf-test.sh
-rwxr-xr-x. 1 root root 874 2月 22 06:26 kafka-reassign-partitions.sh
-rwxr-xr-x. 1 root root 868 2月 22 06:26 kafka-replay-log-producer.sh
-rwxr-xr-x. 1 root root 874 2月 22 06:26 kafka-replica-verification.sh
-rwxr-xr-x. 1 root root 7579 2月 22 06:26 kafka-run-class.sh
-rwxr-xr-x. 1 root root 1376 2月 22 06:26 kafka-server-start.sh
-rwxr-xr-x. 1 root root 975 2月 22 06:26 kafka-server-stop.sh
-rwxr-xr-x. 1 root root 870 2月 22 06:26 kafka-simple-consumer-shell.sh
-rwxr-xr-x. 1 root root 945 2月 22 06:26 kafka-streams-application-reset.sh
-rwxr-xr-x. 1 root root 863 2月 22 06:26 kafka-topics.sh
-rwxr-xr-x. 1 root root 958 2月 22 06:26 kafka-verifiable-consumer.sh
-rwxr-xr-x. 1 root root 958 2月 22 06:26 kafka-verifiable-producer.sh
-rwxr-xr-x. 1 root root 1722 2月 22 06:26 trogdor.sh
drwxr-xr-x. 2 root root 4096 2月 22 06:26 windows
-rwxr-xr-x. 1 root root 867 2月 22 06:26 zookeeper-security-migration.sh
-rwxr-xr-x. 1 root root 1393 2月 22 06:26 zookeeper-server-start.sh
-rwxr-xr-x. 1 root root 978 2月 22 06:26 zookeeper-server-stop.sh
-rwxr-xr-x. 1 root root 968 2月 22 06:26 zookeeper-shell.sh

bin目录下面有zookeeper和kafka的启动脚本文件

可以直接使用kafka自带的zookeeper

1
2
[root@localhost bin]# ./zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
[root@localhost bin]# ./kafka-server-start.sh /usr/local/kafka/config/server.properties

当然也可以启动你自己下载的zookeeper

1.4 运行kafka遇到的问题

server.properties配置文件中下面两行最好设置本机ip地址不要设置localhost

advertised.listeners=PLAINTEXT://master:9092

listeners=PLAINTEXT://master:9092

运行./kafka-server-start.sh /usr/local/kafka/config/server.properties
然后访问http://master:9092没有报错就说明启动成功了

2.使用kafka

2.1 命令行中测试kafka

2.1.1创建一个消息创建者
1
[root@master bin]# ./kafka-console-producer.sh --broker-list 192.168.0.128:9092 --top test
2.1.2创建一个消息消费者
1
[root@master bin]# ./kafka-console-consumer.sh --zookeeper 192.168.0.128:2181 --topic test --from-beginning

2.2 kafka命令脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#!/bin/bash

source /etc/profile

#前台启动zk

if [ $1 = "zk" ]
then
zookeeper-server-start.sh /data/kafka_2.11-0.10.1.0/config/zookeeper.properties


#后台启动zk

elif [ $1 = "zkbg" ]
then
nohup zookeeper-server-start.sh /data/kafka_2.11-0.10.1.0/config/zookeeper.properties >/dev/null 2>&1 &


#关闭zk

elif [ $1 = "zkstop" ]
then
zookeeper-server-stop.sh


#前台启动kafka

elif [ $1 = "kafka" ]
then
kafka-server-start.sh /data/kafka_2.11-0.10.1.0/config/server.properties


#后台启动kafka

elif [ $1 = "kafkabg" ]
then
nohup kafka-server-start.sh /data/kafka_2.11-0.10.1.0/config/server.properties >/dev/null 2>&1 &


#关闭kafka

elif [ $1 = "kafkastop" ]
then
kafka-server-stop.sh

elif [ $1 = "consumer" ] #创建一个消费者
then
kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic $2
elif [ $1 = "producer" ] #创建一个生产者
then
kafka-console-producer.sh --broker-list master:9092 --topic $2
elif [ $1 = "create" ]
then
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic $2 #创建topic
elif [ $1 = "list" ]
then
kafka-topics.sh --list --zookeeper localhost:2181 #查看topic
elif [ $1 = "delete" ]
then
kafka-topics.sh --delete --zookeeper localhost:2181 --topic $2 #删除topic
else
echo "parameter invalid"
fi
本文结束 3Q YOU