精品专区-精品自拍9-精品自拍三级乱伦-精品自拍视频-精品自拍视频曝光-精品自拍小视频

網站建設資訊

NEWS

網站建設資訊

Kafka的使用和錯誤解決-創新互聯

Kafka的使用和錯誤解決

創新互聯建站是一家集網站建設,巨野企業網站建設,巨野品牌網站建設,網站定制,巨野網站建設報價,網絡營銷,網絡優化,巨野網站推廣為一體的創新建站企業,幫助傳統企業提升企業形象加強企業競爭力。可充分滿足這一群體相比中小企業更為豐富、高端、多元的互聯網需求。同時我們時刻保持專業、時尚、前沿,時刻以成就客戶成長自我,堅持不斷學習、思考、沉淀、凈化自己,讓我們為更多的企業打造出實用型網站。

一、下載kafka解壓縮:配置環境變量

vim /etc/profile

export KAFKA_HOME=/root/kafka_2.11-1.0.0
export PATH=$PATH:$KAFKA_HOME/bin

source /etc/profile

二 、kafka中需要使用zookeeper

(一)使用kafka自帶的zookeeper

  1. 先將zookeeper啟動,如果在偽分布式下,kafka已經集成了zk,在kafka中的config目錄下。

    可以編輯config/zookeeper.properties修改zookeeper的端口號。

    后臺啟動zookeeper:
    [root@mail bin]# nohup zookeeper-server-start.sh ../config/zookeeper.properties &

  2. 啟動broker
`[root@mail bin]# nohup kafka-server-start.sh ../config/server.properties &`

3.測試:模擬消息的消費和生產

(1)創建主題

[root@mail bin]# kafka-topics.sh --create --zookeeper localhost:2281 --topic KafkaTestTopic --partitions 1 --replication-factor 1

Created topic "KafkaTestTopic".

(2)創建生產者

[root@mail bin]# kafka-console-producer.sh --topic KafkaTestTopic --broker-list localhost:9092

查看server.properties中的#listeners=PLAINTEXT://:9092,獲取kafka的端口

(3)創建消費者

[root@mail bin]# kafka-console-consumer.sh --topic KafkaTestTopic --zookeeper localhost:2281

(二)使用非kafka自帶的zookeeper

使用zookeeper(非kafka自帶)

[root@mail zookeeper-3.4.10]# bin/zkServer.sh start conf/zoo.cfg 
ZooKeeper JMX enabled by default
Using config: conf/zoo.cfg
Starting zookeeper ... STARTED

(1) 創建主題
[root@mail kafka_2.11-1.0.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic secondTopic --partitions 1 --replication-factor 1
Created topic "secondTopic".

(2)kafka啟動
[root@mail kafka_2.11-1.0.0]# nohup bin/kafka-server-start.sh config/server.properties &

(3)kafka生產者
[root@mail kafka_2.11-1.0.0]# kafka-console-producer.sh --topic KafkaTestTopic --broker-list localhost:9092

(4)kafka消費者
[root@mail kafka_2.11-1.0.0]#  bin/kafka-console-consumer.sh --topic KafkaTestTopic --zookeeper localhost:2181
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

(5)查看kafka中的數據
[root@mail kafka_2.11-1.0.0]# ls
bin  config  libs  LICENSE  logs  logs-kafka  nohup.out  NOTICE  site-docs

[root@mail kafka_2.11-1.0.0]# cd logs-kafka/             #kafka中的數據存儲目錄

##這個目錄是在kafka的config/server.properties文件中進行配置的
log.dirs=/root/kafka/kafka_2.11-1.0.0/logs-kafka

[root@mail logs-kafka]# ls             #查看kafka中的主題

cleaner-offset-checkpoint  __consumer_offsets-20  __consumer_offsets-33  __consumer_offsets-46  kafka_test-0
__consumer_offsets-0       __consumer_offsets-21  __consumer_offsets-34  __consumer_offsets-47  KafkaTestTopic-0
__consumer_offsets-1       __consumer_offsets-22  __consumer_offsets-35  __consumer_offsets-48  log-start-offset-checkpoint
__consumer_offsets-10      __consumer_offsets-23  __consumer_offsets-36  __consumer_offsets-49  meta.properties
__consumer_offsets-11      __consumer_offsets-24  __consumer_offsets-37  __consumer_offsets-5   My_LOVE_TOPIC-0
__consumer_offsets-12      __consumer_offsets-25  __consumer_offsets-38  __consumer_offsets-6   mytopic-0
__consumer_offsets-13      __consumer_offsets-26  __consumer_offsets-39  __consumer_offsets-7   recovery-point-offset-checkpoint
__consumer_offsets-14      __consumer_offsets-27  __consumer_offsets-4   __consumer_offsets-8   replication-offset-checkpoint
__consumer_offsets-15      __consumer_offsets-28  __consumer_offsets-40  __consumer_offsets-9   stock-quotation-0
__consumer_offsets-16      __consumer_offsets-29  __consumer_offsets-41  hello-0                stock-quotation-avro-0
__consumer_offsets-17      __consumer_offsets-3   __consumer_offsets-42  hello-1                stock-quotation-partition-0
__consumer_offsets-18      __consumer_offsets-30  __consumer_offsets-43  hello-2                TEST-TOPIC-0
__consumer_offsets-19      __consumer_offsets-31  __consumer_offsets-44  hello-3
__consumer_offsets-2       __consumer_offsets-32  __consumer_offsets-45  hello-4

[root@mail logs-kafka]# cd KafkaTestTopic-0/       #查看kakfa的主題為KafkaTestTopic的0號分區

[root@mail KafkaTestTopic-0]# ls
00000000000000000000.index  00000000000000000000.timeindex  leader-epoch-checkpoint
00000000000000000000.log    00000000000000000063.snapshot

[root@mail KafkaTestTopic-0]# tail -f 000000000000000000.log      #kafka中的數據存儲文件

(6)修改kafka的分區數,觀察kafka的變化

## 修改kafka分區數
[root@mail kafka_2.11-1.0.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic KafkaTestTopic --partitions 3

WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

[root@mail kafka_2.11-1.0.0]# ls
bin  config  libs  LICENSE  logs  logs-kafka  nohup.out  NOTICE  site-docs

[root@mail kafka_2.11-1.0.0]# cd logs-kafka/

#發現出現kakfa的主題為KafkaTestTopic的0號分區,1號分區,2號分區,總共3個分區
[root@mail logs-kafka]# ls
cleaner-offset-checkpoint  __consumer_offsets-20  __consumer_offsets-33  __consumer_offsets-46  kafka_test-0
__consumer_offsets-0       __consumer_offsets-21  __consumer_offsets-34  __consumer_offsets-47  KafkaTestTopic-0
__consumer_offsets-1       __consumer_offsets-22  __consumer_offsets-35  __consumer_offsets-48  KafkaTestTopic-1
__consumer_offsets-10      __consumer_offsets-23  __consumer_offsets-36  __consumer_offsets-49  KafkaTestTopic-2
__consumer_offsets-11      __consumer_offsets-24  __consumer_offsets-37  __consumer_offsets-5   log-start-offset-checkpoint
__consumer_offsets-12      __consumer_offsets-25  __consumer_offsets-38  __consumer_offsets-6   meta.properties
__consumer_offsets-13      __consumer_offsets-26  __consumer_offsets-39  __consumer_offsets-7   My_LOVE_TOPIC-0
__consumer_offsets-14      __consumer_offsets-27  __consumer_offsets-4   __consumer_offsets-8   mytopic-0
__consumer_offsets-15      __consumer_offsets-28  __consumer_offsets-40  __consumer_offsets-9   recovery-point-offset-checkpoint
__consumer_offsets-16      __consumer_offsets-29  __consumer_offsets-41  hello-0                replication-offset-checkpoint
__consumer_offsets-17      __consumer_offsets-3   __consumer_offsets-42  hello-1                stock-quotation-0
__consumer_offsets-18      __consumer_offsets-30  __consumer_offsets-43  hello-2                stock-quotation-avro-0
__consumer_offsets-19      __consumer_offsets-31  __consumer_offsets-44  hello-3                stock-quotation-partition-0
__consumer_offsets-2       __consumer_offsets-32  __consumer_offsets-45  hello-4                TEST-TOPIC-0

[root@mail KafkaTestTopic-1]# ls                  #查看kakfa的主題為KafkaTestTopic的1號分區

00000000000000000000.index  00000000000000000000.log  00000000000000000000.timeindex  leader-epoch-checkpoint

[root@mail KafkaTestTopic-1]# tail -f 00000000000000000000.log

三、可能出現的錯誤:
(1)
[root@mail bin]# kafka-topics.sh --create --zookeeper localhost:2281 --topic KafkaTestTopic --partitions 1 --replication-factor 1
Error while executing topic command : Replication factor: 1 larger than available brokers: 0.
[2018-11-20 16:44:16,269] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 1 larger than available brokers: 0.
(kafka.admin.TopicCommand$)

解決:修改server.properties中的:zookeeper.connect=localhost:2281,讓2281端口號和zookeeper.properties中的zookeeper端口號一致,然后重啟kafka。**

(2)
kafka.common.KafkaException: fetching topic metadata for topics [Set(KafkaTestTopic)] from broker [ArrayBuffer(BrokerEndPoint(0,123.125.50.7,9092))] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:77)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:98)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:67)
(3)
[2018-11-20 17:28:53,411] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 52 : {KafkaTestTopic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2018-11-20 17:28:53,513] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 53 : {KafkaTestTopic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2018-11-20 17:28:53,617] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 54 : {KafkaTestTopic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2018-11-20 17:28:53,721] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 55 : {KafkaTestTopic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

解決(2)和(3)的錯誤:
修改server.properties中的
I、listeners=PLAINTEXT://localhost:9092
II、 advertised.listeners=PLAINTEXT://localhost:9092

(4) [2018-11-29 09:44:35,275] WARN [Producer clientId=console-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

解決:可能的原因:kafka未啟動,重啟啟動kafka。

kafka中查看zookeeper狀態:

bin/zookeeper-shell.sh localhost:2181 <<< "get /brokers/ids/0"

(5)Failed to find leader for Set(KafkaTestTopic-0) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)

解決:修改kafka配置文件

[root@mail config]# vim server.properties

修改:advertised.host.name=正確的IP地址

四、Kafka相關操作

(1)查看有哪些主題

[root@mail ~]# kafka-topics.sh --describe --zookeeper localhost:2281

Topic:KafkaTestTopic    PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: KafkaTestTopic   Partition: 0    Leader: 0   Replicas: 0 Isr: 0

在kafka中每個分區都有一個編號,從0開始;

在kafka中如果有多個副本的話,就會存在leader與follower的關系;Leader表示領導,Leader:0表示當前這個副本為leader所在的broker是哪一個。

(2)只看主題名稱

[root@mail kafka_2.11-1.0.0]# kafka-topics.sh --list --zookeeper localhost:2281
KafkaTestTopic

(3)查看指定主題的信息

[root@mail kafka_2.11-1.0.0]# kafka-topics.sh --describe --zookeeper localhost:2281 --topic KafkaTestTopic

(4)查看指定的topic是否存在

[root@mail kafka_2.11-1.0.0]# kafka-topics.sh --list --zookeeper localhost:2281 --topic KafkaTestTopic
或
[root@mail kafka_2.11-1.0.0]# kafka-topics.sh --list --zookeeper localhost:2281 | grep KafkaTestTopic

(5) 修改主題的分區數

[root@mail kafka_2.11-1.0.0]# kafka-topics.sh --zookeeper localhost:2281 --alter --topic KafkaTestTopic --partitions 3

WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

[root@mail kafka_2.11-1.0.0]# kafka-topics.sh --describe --zookeeper localhost:2281 --topic KafkaTestTopic

Topic:KafkaTestTopic    PartitionCount:3    ReplicationFactor:1 Configs:
    Topic: KafkaTestTopic   Partition: 0    Leader: 0   Replicas: 0 Isr: 0
    Topic: KafkaTestTopic   Partition: 1    Leader: 0   Replicas: 0 Isr: 0
    Topic: KafkaTestTopic   Partition: 2    Leader: 0   Replicas: 0 Isr: 0

(6)修改配置項

[root@mail kafka_2.11-1.0.0]# kafka-topics.sh --zookeeper localhost:2281 --alter --topic KafkaTestTopic --config flush.messages=1

WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
         Going forward, please use kafka-configs.sh for this functionality
Updated config for topic "KafkaTestTopic".

[root@mail kafka_2.11-1.0.0]# kafka-topics.sh --describe --zookeeper localhost:2281 --topic KafkaTestTopic

Topic:KafkaTestTopic    PartitionCount:3    ReplicationFactor:1 Configs:flush.messages=1
    Topic: KafkaTestTopic   Partition: 0    Leader: 0   Replicas: 0 Isr: 0
    Topic: KafkaTestTopic   Partition: 1    Leader: 0   Replicas: 0 Isr: 0
    Topic: KafkaTestTopic   Partition: 2    Leader: 0   Replicas: 0 Isr: 0

(7)刪除配置項
[root@mail kafka_2.11-1.0.0]# kafka-topics.sh --zookeeper localhost:2281 --alter --topic KafkaTestTopic --delete-config flush.messages

WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
         Going forward, please use kafka-configs.sh for this functionality
Updated config for topic "KafkaTestTopic".

[root@mail kafka_2.11-1.0.0]# kafka-topics.sh --describe --zookeeper localhost:2281 --topic KafkaTestTopic

Topic:KafkaTestTopic    PartitionCount:3    ReplicationFactor:1 Configs:
    Topic: KafkaTestTopic   Partition: 0    Leader: 0   Replicas: 0 Isr: 0
    Topic: KafkaTestTopic   Partition: 1    Leader: 0   Replicas: 0 Isr: 0
    Topic: KafkaTestTopic   Partition: 2    Leader: 0   Replicas: 0 Isr: 0

(8)刪除主題

[root@mail kafka_2.11-1.0.0]# kafka-topics.sh --zookeeper localhost:2281  --delete  --topic KafkaTestTopic

注意:刪除只是標記刪除;當服務器重啟就會刪除已標記的topic,這個和kafka的版本有關。
五、Java中Kakfa配置
(1).Java中使用kafka的Producer端配置

        Properties props=new Properties();

        // Kafka服務端的主機名和端口號,多個的話,使用逗號分隔
        props.put("bootstrap.servers","ip:9092");

        // 等待所有副本節點的應答
        props.put("acks", "all");

        // 消息發送大嘗試次數
        props.put("retries",0);

        // 一批消息處理大小
        props.put("batch.size","16384");

        // 請求延時
        props.put("linger.ms",1);
        // 發送緩存區內存大小
        props.put("buffer.memory", 33554430);

        // key序列化
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // value序列化
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

(2).Java中使用kafka的Consumer端配置

        Properties props=new Properties();

        // 定義kakfa 服務的地址,不需要將所有broker指定上
        props.put("bootstrap.servers","ip:9092");

        // 制定consumer group
        props.put("group.id","test1");

        // 是否自動確認offset
        props.put("enable.auto.commit", "true");

        // 自動確認offset的時間間隔
        props.put("auto.commit.interval.ms", "1000");

        // key的序列化類
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // value的序列化類
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

另外有需要云服務器可以了解下創新互聯scvps.cn,海內外云服務器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務器、裸金屬服務器、高防服務器、香港服務器、美國服務器、虛擬主機、免備案服務器”等云主機租用服務以及企業上云的綜合解決方案,具有“安全穩定、簡單易用、服務可用性高、性價比高”等特點與優勢,專為企業上云打造定制,能夠滿足用戶豐富、多元化的應用場景需求。


網站欄目:Kafka的使用和錯誤解決-創新互聯
瀏覽路徑:http://m.jcarcd.cn/article/cepods.html
主站蜘蛛池模板: 国产乱子伦视频观看 | 国产理论片免费观看 | 麻花影视最 | 国产精品偷伦视频免 | 欧美在线成人怡红院 | 国产自产拍在线观看 | 国产精品有码 | 精品e本大 | 日本女优中文字幕 | 91偷拍精品一 | 日本免费综 | 日韩男女性爱视频 | 欧美三级免费观看 | 三年片大全在线观看 | 国产亚洲精品一二区 | 97亚洲精华液 | 国产福利91精品 | 国产一区丝袜高 | 飘花在线影院 | 日韩欧美国产综合 | 国产精品亚洲一 | 国产精美| 91九色成人| 狠狠撸福利导航 | 国产精品另| 91视频网| 精品国产a| 无码精品毛片成人影院 | 国产不卡在线二区 | 乱码一区二区三区 | 国产视频自拍91 | 国产精品小视频网站 | 动漫精品一区二区 | 国产日韩欧 | 日韩午夜三级视频 | 91免费在线 | 国产污污免费网站 | 97伦理网| 日韩电影在线天堂 | 区二区免费网站 | 成人午夜影院在線 |