kafka消費者客戶端
Kafka消費者
1.1 消費者與消費者組
消費者與消費者組之間的關係
每一個消費者都隸屬於某一個消費者組,一個消費者組可以包含一個或多個消費者,每一條消息只會被消費者組中的某一個消費者所消費。不同消費者組之間消息的消費是互不干擾的。
為什麼會有消費者組的概念
消費者組出現主要是出於兩個目的:
(1) 使整體的消費能力具備橫向的伸縮性。可以適當增加消費者組中消費者的數量,來提高整體的消費能力。但是每一個分區至多被消費者組的中一個消費者所消費,因此當消費者組中消費者數量超過分區數時,多出的消費者不會分配到任何一個分區。當然這是默認的分區分配策略,可通過partition.assignment.strategy進行配置。
(2) 實現消息消費的隔離。不同消費者組之間消息消費互不干擾,從而實現發布訂閱這種消息投遞模式。
注意:
消費者隸屬的消費者組可以通過group.id進行配置。消費者組是一個邏輯上的概念,但消費者並不是一個邏輯上的概念,它可以是一個線程,也可以是一個進程。同一個消費者組內的消費者可以部署在同一台機器上,也可以部署在不同的機器上。
1.2 消費者客戶端開發
一個正常的消費邏輯需要具備以下幾個步驟:
-
配置消費者客戶端參數及創建相應的消費者實例。
-
訂閱主題
-
拉取消息並消費
-
提交消費位移
-
關閉消費者實例
public class KafkaConsumerAnalysis {
public static final String brokerList="node112:9092,node113:9092,node114:9092";
public static final String topic = "topic-demo";
public static final String groupId = "group.demo";
public static final AtomicBoolean isRunning = new AtomicBoolean(true);
public static Properties initConfig() {
Properties prop = new Properties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
prop.put(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG, "consumer.client.di.demo");
return prop;
}
public static void main(String[] args) {
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(initConfig());
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic = " + record.topic() + ", partition =" + record.partition() + ", offset = " + record.offset());
System.out.println("key = " + record.key() + ", value = " + record.value());
}
}
} catch (Exception e) {
e.printStackTrace();
}finally {
consumer.close();
}
}
}
1.2.1 訂閱主題和分區
先來說一下消費者訂閱消息的粒度:一個消費者可以訂閱一個主題、多個主題、或者多個主題的特定分區。主要通過subsribe和assign兩個方法實現訂閱。
(1)訂閱一個主題:
public void subscribe(Collection<String> topics),當集合中有一個主題時。
(2)訂閱多個主題:
public void subscribe(Collection<String> topics),當集合中有多個主題時。
public void subscribe(Pattern pattern),通過正則表達式實現消費者主題的匹配。通過這種方式,如果在消息消費的過程中,又添加了新的能夠匹配到正則的主題,那麼消費者就可以消費到新添加的主題。 consumer.subscribe(Pattern.compile(“topic-.*”));
(3)多個主題的特定分區
public void assign(Collection<TopicPartition> partitions),可以實現訂閱某些特定的主題分區。TopicPartition包括兩個屬性:topic(String)和partition(int)。
如果事先不知道有多少分區該如何處理,KafkaConsumer中的partitionFor方法可以獲得指定主題分區的元數據信息:
public List<PartitionInfo> partitionsFor(String topic)
PartitionInfo的屬性如下:
public class PartitionInfo {
private final String topic;//主題
private final int partition;//分區
private final Node leader;//分區leader
private final Node[] replicas;//分區的AR
private final Node[] inSyncReplicas;//分區的ISR
private final Node[] offlineReplicas;//分區的OSR
}
因此也可以通過這個方法實現某個主題的全部訂閱。
需要指出的是,subscribe(Collection)、subscirbe(Pattern)、assign(Collection)方法分別代表了三種不同的訂閱狀態:AUTO_TOPICS、AUTO_PATTREN和USER_ASSIGN,這三種方式是互斥的,消費者只能使用其中一種,否則會報出IllegalStateException。
subscirbe方法可以實現消費者自動再平衡的功能。多個消費者的情況下,可以根據分區分配策略自動分配消費者和分區的關係,當消費者增加或減少時,也能實現負載均衡和故障轉移。
如何實現取消訂閱:
consumer.unsubscribe()
1.2.2 反序列化
KafkaProducer端生產消息進行序列化,同樣消費者就要進行相應的反序列化。相當於根據定義的序列化格式的一個逆序提取數據的過程。
import com.gdy.kafka.producer.Company;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;
public class CompanyDeserializer implements Deserializer<Company> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public Company deserialize(String topic, byte[] data) {
if(data == null) {
return null;
}
if(data.length < 8) {
throw new SerializationException("size of data received by Deserializer is shorter than expected");
}
ByteBuffer buffer = ByteBuffer.wrap(data);
int nameLength = buffer.getInt();
byte[] nameBytes = new byte[nameLength];
buffer.get(nameBytes);
int addressLen = buffer.getInt();
byte[] addressBytes = new byte[addressLen];
buffer.get(addressBytes);
String name,address;
try {
name = new String(nameBytes,"UTF-8");
address = new String(addressBytes,"UTF-8");
}catch (UnsupportedEncodingException e) {
throw new SerializationException("Error accur when deserializing");
}
return new Company(name, address);
}
@Override
public void close() {
}
}
實際生產中需要自定義序列化器和反序列化器時,推薦使用Avro、JSON、Thrift、ProtoBuf或者Protostuff等通用的序列化工具來包裝。
1.2.3 消息消費
Kafka中消息的消費是基於拉模式的,kafka消息的消費是一個不斷輪旋的過程,消費者需要做的就是重複的調用poll方法。
public ConsumerRecords<K, V> poll(final Duration timeout)
這個方法需要注意的是,如果消費者的緩衝區中有可用的數據,則會立即返回,否則會阻塞至timeout。如果在阻塞時間內緩衝區仍沒有數據,則返回一個空的消息集。timeout的設置取決於應用程序對效應速度的要求。如果應用線程的位移工作是從Kafka中拉取數據並進行消費可以將這個參數設置為Long.MAX_VALUE。
每次poll都會返回一個ConsumerRecords對象,它是ConsumerRecord的集合。對於ConsumerRecord相比於ProducerRecord多了一些屬性:
private final String topic;//主題
private final int partition;//分區
private final long offset;//偏移量
private final long timestamp;//時間戳
private final TimestampType timestampType;//時間戳類型
private final int serializedKeySize;//序列化key的大小
private final int serializedValueSize;//序列化value的大小
private final Headers headers;//headers
private final K key;//key
private final V value;//value
private volatile Long checksum;//CRC32校驗和
另外我們可以按照分區維度對消息進行消費,通過ConsumerRecords.records(TopicPartiton)方法實現。
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
Set<TopicPartition> partitions = records.partitions();
for (TopicPartition tp : partitions) {
for (ConsumerRecord<String, String> record : records.records(tp)) {
System.out.println(record.partition() + " ," + record.value());
}
}
另外還可以按照主題維度對消息進行消費,通過ConsumerRecords.records(Topic)實現。
for (String topic : topicList) {
for (ConsumerRecord<String, String> record : records.records(topic)) {
System.out.println(record.partition() + " ," + record.value());
}
}
1.2.4 消費者位移提交
首先要 明白一點,消費者位移是要做持久化處理的,否則當發生消費者崩潰或者消費者重平衡時,消費者消費位移無法獲得。舊消費者客戶端是將位移提交到zookeeper上,新消費者客戶端將位移存儲在Kafka內部主題_consumer_offsets中。
KafkaConsumer提供了兩個方法position(TopicPatition)和commited(TopicPartition)。
public long position(TopicPartition partition)—–獲得下一次拉取數據的偏移量
public OffsetAndMetadata committed(TopicPartition partition)—–給定分區的最後一次提交的偏移量。
還有一個概念稱之為lastConsumedOffset,這個指的是最後一次消費的偏移量。
在kafka提交方式有兩種:自動提交和手動提交。
(1)自動位移提交
kafka默認情況下採用自動提交,enable.auto.commit的默認值為true。當然自動提交並不是沒消費一次消息就進行提交,而是定期提交,這個定期的周期時間由auto.commit.intervals.ms參數進行配置,默認值為5s,當然這個參數生效的前提就是開啟自動提交。
自動提交會造成重複消費和消息丟失的情況。重複消費很容易理解,因為自動提交實際是延遲提交,因此很容易造成重複消費,然後消息丟失是怎麼產生的?
(2)手動位移提交
開始手動提交的需要配置enable.auto.commit=false。手動提交消費者偏移量,又可分為同步提交和異步提交。
同步提交:
同步提交很簡單,調用commitSync() 方法:
while (isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
//consume message
consumer.commitSync();
}
}
這樣,每消費一條消息,提交一個偏移量。當然可用過緩存消息的方式,實現批量處理+批量提交:
while (isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBaches) {
for (ConsumerRecord<String, String> record : records) {
//consume message
}
consumer.commitSync();
buffer.clear();
}
}
還可以通過public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets)這個方法實現按照分區粒度進行同步提交。
while (isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(tp);
for (ConsumerRecord record : partitionRecords) {
//consume message
}
long lastConsumerOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(tp,new OffsetAndMetadata(lastConsumerOffset+1)));
}
}
異步提交:
commitAsync異步提交的時候消費者線程不會被阻塞,即可能在提交偏移量的結果還未返回之前,就開始了新一次的拉取數據操作。異步提交可以提升消費者的性能。commitAsync有三個重載:
public void commitAsync()
public void commitAsync(OffsetCommitCallback callback)
public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback )
對照同步提交的方法參數,多了一個Callback回調參數,它提供了一個異步提交的回調方法,當消費者位移提交完成后回調OffsetCommitCallback的onComplement方法。以第二個方法為例:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
//consume message
}
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
if (e == null) {
System.out.println(offsets);
}else {
e.printStackTrace();
}
}
});
1.2.5 控制和關閉消費
kafkaConsumer提供了pause()和resume() 方法分別實現暫停某些分區在拉取操作時返回數據給客戶端和恢復某些分區向客戶端返回數據的操作:
public void pause(Collection<TopicPartition> partitions)
public void resume(Collection<TopicPartition> partitions)
優雅停止KafkaConsumer退出消費者循環的方式:
(1)不要使用while(true),而是使用while(isRunning.get()),isRunning是一個AtomicBoolean類型,可以在其他地方調用isRunning.set(false)方法退出循環。
(2)調用consumer.wakup()方法,wakeup方法是KafkaConsumer中唯一一個可以從其他線程里安全調用的方法,會拋出WakeupException,我們不需要處理這個異常。
跳出循環后一定要显示的執行關閉動作和釋放資源。
1.2.6 指定位移消費
KafkaConsumer可通過兩種方式實現實現不同粒度的指定位移消費。第一種是通過auto.offset.reset參數,另一種通過一個重要的方法seek。
(1)auto.offset.reset
auto.offset.reset這個參數總共有三種可配置的值:latest、earliest、none。如果配置不在這三個值當中,就會拋出ConfigException。
latest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset或位移越界時,消費新產生的該分區下的數據
earliest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset或位移越界時,從頭開始消費
none:topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset或位移越界,則拋出NoOffsetForPartitionException異常
消息的消費是通過poll方法進行的,poll方法對於開發者來說就是一個黑盒,無法精確的掌控消費的起始位置。即使通過auto.offsets.reset參數也只能在找不到位移或者位移越界的情況下粗粒度的從頭開始或者從末尾開始。因此,Kafka提供了另一種更細粒度的消費掌控:seek。
(2)seek
seek可以實現追前消費和回溯消費:
public void seek(TopicPartition partition, long offset)
可以通過seek方法實現指定分區的消費位移的控制。需要注意的一點是,seek方法只能重置消費者分配到的分區的偏移量,而分區的分配是在poll方法中實現的。因此在執行seek方法之前需要先執行一次poll方法獲取消費者分配到的分區,但是並不是每次poll方法都能獲得數據,所以可以採用如下的方法。
consumer.subscribe(topicList);
Set<TopicPartition> assignment = new HashSet<>();
while(assignment.size() == 0) {
consumer.poll(Duration.ofMillis(100));
assignment = consumer.assignment();//獲取消費者分配到的分區,沒有獲取返回一個空集合
}
for (TopicPartition tp : assignment) {
consumer.seek(tp, 10); //重置指定分區的位移
}
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
//consume record
}
如果對未分配到的分區執行了seek方法,那麼會報出IllegalStateException異常。
在前面我們已經提到,使用auto.offsets.reset參數時,只有當消費者分配到的分區沒有提交的位移或者位移越界時,才能從earliest消費或者從latest消費。seek方法可以彌補這一中情況,實現任意情況的從頭或從尾部消費。
Set<TopicPartition> assignment = new HashSet<>();
while(assignment.size() == 0) {
consumer.poll(Duration.ofMillis(100));
assignment = consumer.assignment();
}
Map<TopicPartition, Long> offsets = consumer.endOffsets(assignment);//獲取指定分區的末尾位置
for (TopicPartition tp : assignment) {
consumer.seek;
}
與endOffset對應的方法是beginningOffset方法,可以獲取指定分區的起始位置。其實kafka已經提供了一個從頭和從尾消費的方法。
public void seekToBeginning(Collection<TopicPartition> partitions)
public void seekToEnd(Collection<TopicPartition> partitions)
還有一種場景是這樣的,我們並不知道特定的消費位置,卻知道一個相關的時間點。為解決這種場景遇到的問題,kafka提供了一個offsetsForTimes()方法,通過時間戳來查詢分區消費的位移。
Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
for (TopicPartition tp : assignment) {
timestampToSearch.put(tp, System.currentTimeMillis() - 24 * 3600 * 1000);
}
//獲得指定分區指定時間點的消費位移
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestampToSearch);
for (TopicPartition tp : assignment) {
OffsetAndTimestamp offsetAndTimestamp = offsets.get(tp);
if (offsetAndTimestamp != null) {
consumer.seek(tp, offsetAndTimestamp.offset());
}
}
由於seek方法的存在,使得消費者的消費位移可以存儲在任意的存儲介質中,包括DB、文件系統等。
1.2.7 消費者的再均衡
再均衡是指分區的所屬權從一個消費者轉移到另一消費者的行為,它為消費者組具備高可用和伸縮性提高保障。不過需要注意的地方有兩點,第一是消費者發生再均衡期間,消費者組中的消費者是無法讀取消息的。第二點就是消費者發生再均衡可能會引起重複消費問題,所以一般情況下要盡量避免不必要的再均衡。
KafkaConsumer的subscribe方法中有一個參數為ConsumerRebalanceListener,我們稱之為再均衡監聽器,它可以用來在設置發生再均衡動作前後的一些準備和收尾動作。
public interface ConsumerRebalanceListener {
void onPartitionsRevoked(Collection<TopicPartition> partitions);
void onPartitionsAssigned(Collection<TopicPartition> partitions);
}
onPartitionsRevoked方法會在再均衡之前和消費者停止讀取消息之後被調用。可以通過這個回調函數來處理消費位移的提交,以避免重複消費。參數partitions表示再均衡前分配到的分區。
onPartitionsAssigned方法會在再均衡之後和消費者消費之間進行調用。參數partitons表示再均衡之後所分配到的分區。
consumer.subscribe(topicList);
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
consumer.subscribe(topicList, new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
consumer.commitSync(currentOffsets);//提交偏移量
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
//do something
}
});
try {
while (isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
//process records
//記錄當前的偏移量
currentOffsets.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata( record.offset() + 1));
}
consumer.commitAsync(currentOffsets, null);
}
} catch (Exception e) {
e.printStackTrace();
}finally {
consumer.close();
}
1.2.8 消費者攔截器
消費者攔截器主要是在消費到消息或者提交消費位移時進行一些定製化的操作。消費者攔截器需要自定義實現org.apache.kafka.clients.consumer.ConsumerInterceptor接口。
public interface ConsumerInterceptor<K, V> extends Configurable {
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
public void close();
}
onConsume方法是在poll()方法返回之前被調用,比如修改消息的內容、過濾消息等。如果onConsume方法發生異常,異常會被捕獲並記錄到日誌中,但是不會向上傳遞。
Kafka會在提交位移之後調用攔截器的onCommit方法,可以使用這個方法來記錄和跟蹤消費的位移信息。
public class ConsumerInterceptorTTL implements ConsumerInterceptor<String,String> {
private static final long EXPIRE_INTERVAL = 10 * 1000; //10秒過期
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
long now = System.currentTimeMillis();
Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>();
for (TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
List<ConsumerRecord<String, String>> newTpRecords = records.records(tp);
for (ConsumerRecord<String, String> record : tpRecords) {
if (now - record.timestamp() < EXPIRE_INTERVAL) {//判斷是否超時
newTpRecords.add(record);
}
}
if (!newRecords.isEmpty()) {
newRecords.put(tp, newTpRecords);
}
}
return new ConsumerRecords<>(newRecords);
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
offsets.forEach((tp,offset) -> {
System.out.println(tp + ":" + offset.offset());
});
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
使用這種TTL需要注意的是如果採用帶參數的位移提交方式,有可能提交了錯誤的位移,可能poll拉取的最大位移已經被攔截器過濾掉。
1.2.9 消費者的多線程實現
KafkaProducer是線程安全的,然而KafkaConsumer是非線程安全的。KafkaConsumer中的acquire方法用於檢測當前是否只有一個線程在操作,如果有就會拋出ConcurrentModifiedException。acuqire方法和我們通常所說的鎖是不同的,它不會阻塞線程,我們可以把它看做是一個輕量級的鎖,它通過線程操作計數標記的方式來檢測是否發生了併發操作。acquire方法和release方法成對出現,分表表示加鎖和解鎖。
//標記當前正在操作consumer的線程
private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
//refcount is used to allow reentrant access by the thread who has acquired currentThread,
//大概可以理解我加鎖的次數
private final AtomicInteger refcount = new AtomicInteger(0);
private void acquire() {
long threadId = Thread.currentThread().getId();
if (threadId != currentThread.get()&&!currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
refcount.incrementAndGet();
}
private void release() {
if (refcount.decrementAndGet() == 0)
currentThread.set(NO_CURRENT_THREAD);
}
kafkaConsumer中的每個共有方法在調用之前都會執行aquire方法,只有wakeup方法是個意外。
KafkaConsumer的非線程安全並不意味着消費消息的時候只能以單線程的方式執行。可以通過多種方式實現多線程消費。
(1)Kafka多線程消費第一種實現方式——–線程封鎖
所謂線程封鎖,就是為每個線程實例化一個KafkaConsumer對象。這種方式一個線程對應一個KafkaConsumer,一個線程(可就是一個consumer)可以消費一個或多個分區的消息。這種消費方式的併發度受限於分區的實際數量。當線程數量超過分分區數量時,就會出現線程限制額的情況。
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class FirstMutiConsumerDemo {
public static final String brokerList="node112:9092,node113:9092,node114:9092";
public static final String topic = "topic-demo";
public static final String groupId = "group.demo";
public static Properties initConfig() {
Properties prop = new Properties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
prop.put(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG, "consumer.client.di.demo");
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return prop;
}
public static void main(String[] args) {
Properties prop = initConfig();
int consumerThreadNum = 4;
for (int i = 0; i < 4; i++) {
new KafkaCoosumerThread(prop, topic).run();
}
}
public static class KafkaCoosumerThread extends Thread {
//每個消費者線程包含一個KakfaConsumer對象。
private KafkaConsumer<String, String> kafkaConsumer;
public KafkaCoosumerThread(Properties prop, String topic) {
this.kafkaConsumer = new KafkaConsumer<String, String>(prop);
this.kafkaConsumer.subscribe(Arrays.asList(topic));
}
@Override
public void run() {
try {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
//處理消息模塊
}
}
} catch (Exception e) {
e.printStackTrace();
}finally {
kafkaConsumer.close();
}
}
}
}
這種實現方式和開啟多個消費進程的方式沒有本質的區別,優點是每個線程可以按照順序消費消費各個分區的消息。缺點是每個消費線程都要維護一個獨立的TCP連接,如果分區數和線程數都很多,那麼會造成不小的系統開銷。
(2)Kafka多線程消費第二種實現方式——–多個消費線程同時消費同一分區
多個線程同時消費同一分區,通過assign方法和seek方法實現。這樣就可以打破原有消費線程個數不能超過分區數的限制,進一步提高了消費的能力,但是這種方式對於位移提交和順序控制的處理就會變得非常複雜。實際生產中很少使用。
(3)第三種實現方式——-創建一個消費者,records的處理使用多線程實現
一般而言,消費者通過poll拉取數據的速度相當快,而整體消費能力的瓶頸也正式在消息處理這一塊。基於此
考慮第三種實現方式。
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThirdMutiConsumerThreadDemo {
public static final String brokerList="node112:9092,node113:9092,node114:9092";
public static final String topic = "topic-demo";
public static final String groupId = "group.demo";
public static Properties initConfig() {
Properties prop = new Properties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
prop.put(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG, "consumer.client.di.demo");
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return prop;
}
public static void main(String[] args) {
Properties prop = initConfig();
KafkaConsumerThread consumerThread = new KafkaConsumerThread(prop, topic, Runtime.getRuntime().availableProcessors());
consumerThread.start();
}
public static class KafkaConsumerThread extends Thread {
private KafkaConsumer<String, String> kafkaConsumer;
private ExecutorService executorService;
private int threadNum;
public KafkaConsumerThread(Properties prop, String topic, int threadNum) {
this.kafkaConsumer = new KafkaConsumer<String, String>(prop);
kafkaConsumer.subscribe(Arrays.asList(topic));
this.threadNum = threadNum;
executorService = new ThreadPoolExecutor(threadNum, threadNum, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
}
@Override
public void run() {
try {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
executorService.submit(new RecordHandler(records));
}
}
} catch (Exception e) {
e.printStackTrace();
}finally {
kafkaConsumer.close();
}
}
}
public static class RecordHandler implements Runnable {
public final ConsumerRecords<String,String> records;
public RecordHandler(ConsumerRecords<String, String> records) {
this.records = records;
}
@Override
public void run() {
//處理records
}
}
}
KafkaConsumerThread類對應一個消費者線程,裏面通過線程池的方式調用RecordHandler處理一批批的消息。其中線程池採用的拒絕策略為CallerRunsPolicy,當阻塞隊列填滿時,由調用線程處理該任務,以防止總體的消費能力跟不上poll拉取的速度。這種方式還可以進行橫向擴展,通過創建多個KafkaConsumerThread實例來進一步提升整體的消費能力。
這種方式還可以減少TCP連接的數量,但是對於消息的順序處理就變得困難了。這種方式需要引入一個共享變量Map<TopicPartition,OffsetAndMetadata> offsets參與消費者的偏移量提交。每一個RecordHandler類在處理完消息后都將對應的消費位移保存到共享變量offsets中,KafkaConsumerThread在每一次poll()方法之後都要進讀取offsets中的內容並對其進行提交。對於offsets的讀寫要採用加鎖處理,防止出現併發問題。並且在寫入offsets的時候需要注意位移覆蓋的問題。針對這個問題,可以將RecordHandler的run方法做如下改變:
public void run() {
for (TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> tpRecords = this.records.records(tp);
//處理tpRecords
long lastConsumedOffset = tpRecords.get(tpRecords.size() - 1).offset();
synchronized (offsets) {
if (offsets.containsKey(tp)) {
offsets.put(tp, new OffsetAndMetadata(lastConsumedOffset + 1));
}else {
long positioin = offsets.get(tp).offset();
if(positioin < lastConsumedOffset + 1) {
offsets.put(tp, new OffsetAndMetadata(lastConsumedOffset + 1));
}
}
}
}
}
對應的位移提交代碼也應該在KafkaConsumerThread的run方法中進行體現
public void run() {
try {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
executorService.submit(new RecordHandler(records));
synchronized (offsets) {
if (!offsets.isEmpty()) {
kafkaConsumer.commitSync(offsets);
offsets.clear();
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
}finally {
kafkaConsumer.close();
}
}
}
其實這種方式並不完美,可能造成數據丟失。可以通過更為複雜的滑動窗口的方式進行改進。
1.2.10 消費者重要參數
-
fetch.min.bytes
kafkaConsumer一次拉拉取請求的最小數據量。適當增加,會提高吞吐量,但會造成額外延遲。
-
fetch.max.bytes
kafkaConsumer一次拉拉取請求的最大數據量,如果kafka一條消息的大小超過這個值,仍然是可以拉取的。
-
fetch.max.wait.ms
一次拉取的最長等待時間,配合fetch.min.bytes使用
-
max.partiton.fetch.bytes
每個分區里返回consumer的最大數據量。
-
max.poll.records
一次拉取的最大消息數
-
connection.max.idle.ms
多久之後關閉限制的連接
-
exclude.internal.topics
這個參數用於設置kafka中的兩個內部主題能否被公開:consumer_offsets和transaction_state。如果設為true,可以使用Pattren訂閱內部主題,如果是false,則沒有這種限制。
-
receive.buffer.bytes
socket接收緩衝區的大小
-
send.buffer.bytes
socket發送緩衝區的大小
-
request.timeout.ms
consumer等待請求響應的最長時間。
-
reconnect.backoff.ms
重試連接指定主機的等待時間。
-
max.poll.interval.ms
配置消費者等待拉取時間的最大值,如果超過這個期限,消費者組將剔除該消費者,進行再平衡。
-
auto.offset.reset
自動偏移量重置
-
enable.auto.commit
是否允許偏移量的自動提交
-
auto.commit.interval.ms
自動偏移量提交的時間間隔
【精選推薦文章】
自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象
網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!
評比前十大台北網頁設計、台北網站設計公司知名案例作品心得分享
台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"