一、kafka 介紹 && kafka-client_台中搬家

台中搬家遵守搬運三大原則,讓您的家具不再被破壞!

台中搬家公司推薦超過30年經驗,首選台中大展搬家

一、kafka 介紹

1.1、kafka 介紹

Kafka 是一個分佈式消息引擎與流處理平台,經常用做企業的消息總線、實時數據管道,有的還把它當做存儲系統來使用。
早期 Kafka 的定位是一個高吞吐的分佈式消息系統,目前則演變成了一個成熟的分佈式消息引擎,以及流處理平台。

Kafka 主要起到削峰填谷(緩衝)、系統解構以及冗餘的作用,主要特點有:

  • 高吞吐、低延時:這是 Kafka 顯著的特點,Kafka 能夠達到百萬級的消息吞吐量,延遲可達毫秒級;
  • 持久化存儲:Kafka 的消息最終持久化保存在磁盤之上,提供了順序讀寫以保證性能,並且通過 Kafka 的副本機制提高了數據可靠性。
  • 分佈式可擴展:Kafka 的數據是分佈式存儲在不同 broker 節點的,以 topic 組織數據並且按 partition 進行分佈式存儲,整體的擴展性都非常好。
  • 高容錯性:集群中任意一個 broker 節點宕機,Kafka 仍能對外提供服務。

使用消息隊列的好處:
解耦、冗餘(每個分區都有副本)、提高擴展性、靈活性 & 峰值處理能力、可恢復性(有副本)、順序保證、緩衝、異步通信

1.2、kafka術語

  • 生產者(Producer):

    • 向 broker 發布消息的應用程序。
    • 生產者也負責選擇發布到Topic上的哪一個分區。最簡單的方式從分區列表中輪流選擇。也可以根據某種算法依照權重選擇分區。
  • 消費者(Consumer):

    • 從消息隊列中獲取消息的客戶端應用程序。

    • 一個 topic 可以讓若干個消費者進行消費,若干個消費者組成一個 Consumer Group 即消費組,一條消息只能被消費組中一個 Consumer 消費。

    • 假如所有的消費者都在一個組中,那麼這就變成了 queue 模型。 假如所有的消費者都在不同的組中,那麼就完全變成了發布-訂閱模型。

    • 更通用的,我們可以創建一些消費者組作為邏輯上的訂閱者。每個組包含數目不等的消費者,一個組內多個消費者可以用來擴展性能和容錯。如下圖所示:

      台中搬家公司費用怎麼算?

      擁有20年純熟搬遷經驗,提供免費估價且流程透明更是5星評價的搬家公司

    • 2個 kafka 集群託管4個分區(P0-P3),2個消費者組,消費組 A 有2個消費者實例,消費組 B 有4個。

    • kafka 中消費者組有兩個概念:隊列:消費者組(consumer group)允許消費者組成員瓜分處理。發布訂閱:允許你廣播消息給多個消費者組(不同名)。

    • 傳統的消息有兩種模式:隊列和發布訂閱。

    • 在隊列模式中,消費者池從服務器讀取消息(每個消息只被其中一個讀取),優點是允許多個消費者瓜分處理數據,這樣可以擴展處理。;

    • 發布訂閱模式:消息廣播給所有的消費者。允許你廣播數據到多個消費者,由於每個訂閱者都訂閱了消息,所以沒辦法縮放處理。

  • broker:

    • Kafka 實例,多個 broker 組成一個 Kafka 集群,Kafka 以集群方式運行,集群中每個服務器稱為 broker。
    • 通常一台機器部署一個 Kafka 實例,一個實例掛了不影響其他實例
  • 主題(Topic):

    • 一組消息的歸納(代表不同的業務,如超市辦會員,付款)。
    • 服務端消息的邏輯存儲單元。一個 topic 通常包含若干個 Partition 分區。
  • 分區(Partition):

    • 一個 Topic 中的消息數據按照多個分區組織,分區是 Kafka 消息隊列組織的最小單位,一個分區可以看作是一個隊列。
    • 分佈式存儲在各個 broker 中, 實現發布與訂閱的負載均衡。
    • 若干個分區可以被若干個 Consumer 同時消費,達到消費者高吞吐量。
    • 單個 partition 有序,整體無序,整體有序就將數據都放到一個 partition 中,但是效率極低。
    • 每個分區有一個 leader,零或多個 follower。Leader 處理此分區的所有的讀寫請求,而follower被動的複製數據。如果leader宕機,其它的一個follower會被推舉為新的leader。
    • 一台服務器可能同時是一個分區的 leader,另一個分區的 follower。 這樣可以平衡負載,避免所有的請求都只讓一台或者某幾台服務器處理。
  • message:

    • 消息,或稱日誌消息,是 Kafka 服務端實際存儲的數據,每一條消息都由一個 key、一個 value 以及消息時間戳 timestamp 組成
  • offset:

    • 偏移量,分區中的消息位置,由 Kafka 自身維護,Consumer 消費時也要保存一份 offset 以維護消費過的消息位置。

1.3、四個核心 API

  • Producer API 發布消息到1個或多個 topic(主題)中。
  • Consumer API 來訂閱一個或多個topic,並處理產生的消息。
  • Streams API 充當一個流處理器,從1個或多個 topic 消費輸入流,並生產一個輸出流到1個或多個輸出 topic,有效地將輸入流轉換到輸出流。
  • Connector API 可構建或運行可重用的生產者或消費者,將 topic 連接到現有的應用程序或數據系統。例如,連接到關係數據庫的連接器可以捕獲表的每個變更。

二、kafka 客戶端

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>${kafka.version}</version>
</dependency>

2.1、 KafkaProduce

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * @author xiandongxie  2020-06-04
 */
public class KafkaProducerTest {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "nn1.hadoop:9092,nn2.hadoop:9092,s1.hadoop:9092");
        // 判別請求是否為完整的條件(判斷是不是成功發送了)。指定了“all”將會阻塞消息,這種設置性能最低,但是是最可靠的
        props.put("acks", "all");
        // 如果請求失敗,生產者會自動重試,我們指定是0次,如果啟用重試,則會有重複消息的可能性
        props.put("retries", 0);
        // 生產者緩存每個分區未發送的消息。緩存的大小是通過 batch.size 配置指定的。值較大的話將會產生更大的批。並需要更多的內存(每個“活躍”的分區都有1個緩衝區)
        props.put("batch.size", 16384);
        // 默認緩衝可立即發送,即便緩衝空間還沒有滿,但是,如果想減少請求的數量,可以設置 linger.ms 大於0。
        // 這將指示生產者發送請求之前等待一段時間,希望更多的消息填補到未滿的批中。這類似於TCP的算法,例如,可能100條消息在一個請求發送,因為我們設置了linger(逗留)時間為1毫秒,然後,如果我們沒有填滿緩衝區,這個設置將增加1毫秒的延遲請求以等待更多的消息。需要注意的是,在高負載下,相近的時間一般也會組成批,即使是 linger.ms=0。在不處於高負載的情況下,如果設置比0大,以少量的延遲代價換取更少的,更有效的請求。
        props.put("linger.ms", 1);
        // 控制生產者可用的緩存總量,如果消息發送速度比其傳輸到服務器的快,將會耗盡這個緩存空間。當緩存空間耗盡,其他發送調用將被阻塞,阻塞時間的閾值通過 max.block.ms 設定,之後它將拋出一個TimeoutException。
        props.put("buffer.memory", 33554432);
        // key.serializer 和 value.serializer,將用戶提供的 key 和 value 對象 ProducerRecord 轉換成字節,可以使用附帶的ByteArraySerializaer或StringSerializer處理簡單的string或byte類型。
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 100; i < 500; i++)
            // send()方法是異步的,添加消息到緩衝區等待發送,並立即返回。生產者將單個的消息批量在一起發送來提高效率
            producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

        producer.close();

    }
}

2.2、KafkaConsumer

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

/**
 * 【自動提交偏移量】的簡單的kafka消費者API
 *
 * @author xiandongxie
 */
public class KafkaConsumerTest {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "nn1.hadoop:9092,nn2.hadoop:9092,s1.hadoop:9092");
        // 消費者組名稱
        props.put("group.id", "test");
        // 設置 enable.auto.commit,偏移量由 auto.commit.interval.ms 控制自動提交的頻率。
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 指定訂閱 topic 名稱
        consumer.subscribe(Arrays.asList("my-topic"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }

    }
}

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

台中搬家公司費用怎麼算?

擁有20年純熟搬遷經驗,提供免費估價且流程透明更是5星評價的搬家公司

您可能也會喜歡…