Kafka - 生产者

文章目录结构

  1. 生产者工作流程
    • 初始化生产者配置
    • 创建kafka producer client
    • 创建消息
    • 发送消息
  2. Kafka Java 生产者是如何管理TCP连接的?

基于 Kafka 2.2.0 版本

1. 初始化生产者配置

用户自定义配置:

1
2
3
4
5
6
Properties props = new Properties();
//设置broker地址。 因为是本地测试,所以只填kafka集群中的一个可用broker地址即可
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
//设置key和value的序列化形式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

2. 创建kafka producer client
1
Producer<String, String> producer = new KafkaProducer<>(props);
  • client.id: 客户端id
    如果没有传入,则通过自增方式创建唯一的clientId

    1
    2
    3
    4
    String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
    if (clientId.length() <= 0)
    clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
    this.clientId = clientId;
  • Metrics: Metric配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
    .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
    .recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
    .tags(metricTags);
    List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
    MetricsReporter.class,
    Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
    reporters.add(new JmxReporter(JMX_PREFIX));
    this.metrics = new Metrics(metricConfig, reporters, time);

可以看到默认添加了JmxReporter。

  • 设置分区规则
    1
    this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);

通过partitioner.class 参数指定,如果不传入该参数则使用默认的分区规则。我们可以实现接口Partitioner 来自定义分区规则。

  • 设置key 和 value的序列化形式
    通过key.serializervalue.serializer参数指定。需要实现接口Serializer
    下面是常用的StringSerializer实现:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    public class StringSerializer implements Serializer<String> {
    private String encoding = "UTF8";

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
    Object encodingValue = configs.get(propertyName);
    if (encodingValue == null)
    encodingValue = configs.get("serializer.encoding");
    if (encodingValue instanceof String)
    encoding = (String) encodingValue;
    }

    @Override
    public byte[] serialize(String topic, String data) {
    try {
    if (data == null)
    return null;
    else
    return data.getBytes(encoding);
    } catch (UnsupportedEncodingException e) {
    throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
    }
    }

    @Override
    public void close() {
    // nothing to do
    }
    }

使用时,只需要像这样指定即可,我们可以参照着实现自己的序列化类。

1
2
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

  • 设置 Producer的拦截器(interceptor)
    1
    2
    3
    4
    5
    6
    List<ProducerInterceptor<K, V>> interceptorList = (List) configWithClientId.getConfiguredInstances(
    ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);
    if (interceptors != null)
    this.interceptors = interceptors;
    else
    this.interceptors = new ProducerInterceptors<>(interceptorList);

Producer的拦截器只需要实现接口ProducerInterceptor即可。可以指定多个分区实现类,中间用”,”分隔; 按照先后顺序分别执行。
如下面的例子,InterceptorB会在InterceptorA已经执行的基础上执行:

1
props.put("interceptor.classes", "com.dmx.kafka.interceptor.InterceptorA,com.dmx.kafka.interceptor.InterceptorB");

还有一些较重要的参数说明:

  • max.request.size: 该参数用来限制生产者客户端能发送的消息的最大值,默认值 1048576B,即 1MB。注意该参数的值应该小于或等于broker端的参数message.max.bytes的值。否则broker会无法接收生产者发送的大于message.max.bytes值的消息。

  • buffer.memory: 生产者客户端用于缓存消息到缓冲区大小。如果生产者发送消息到速度超过发送到服务器的速度,则会导致生产者空间不足。需要大于等于batch.size

  • batch.size: 用于指定ProducerBatch可以复用内存区域的大小。默认值(16384, 即16kb)

  • linger.ms: 该参数用来指定生产者发送 ProducerBatch 之前等待更多的消息(ProducerRecord)加入 ProducerBatch的时间,默认值(0)。生产者客户端会在ProducerBatch被填满或等待时间超过 linger.ms值时发送出去。增大这个参数会增加消息的延迟,但是同时能提升一定的吞吐量。

  • max.block.ms: 用来控制KafkaProducer中的send()方法和partitionFor()方法的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。默认值(60000)

  • compression.type: 用来指定消息到压缩方式,默认值为”none”,即默认情况下,消息不会被压缩。该参数还可以是 “gzip”、”snappy” 和 “lz4”等。对消息进行压缩可以极大地减少网络传输量,降低网络I/O,从而提高整体性能。消息压缩是一种以时间换空间的优化方式,如果对时延有一定要求,则不推荐对消息进行压缩。

3. 创建消息
1
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "kafka msg!");

我们在这儿只指定 topic名字和消息内容。

4. 发送消息
1
producer.send(record);

KafkaProducer中有2个消息发送的方法(如下)。我们使用的是不带回调函数的。

1
2
3
4
5
6
7
8
9
10
11
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return send(record, null);
}

@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}

  • 1.执行拦截器

    1
    2
    3
    4
    5
    6
    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
    // intercept the record, which can be potentially modified; this method does not throw exceptions
    ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
    return doSend(interceptedRecord, callback);
    }
  • 2.producer 的实例检查

    1
    2
    3
    4
    private void throwIfProducerClosed() {
    if (ioThread == null || !ioThread.isAlive())
    throw new IllegalStateException("Cannot perform operation after producer has been closed");
    }
  • 3.检查topic的metadata是否可用

    1
    2
    3
    4
    5
    6
    7
    8
    ClusterAndWaitTime clusterAndWaitTime;
    try {
    clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
    } catch (KafkaException e) {
    if (metadata.isClosed())
    throw new KafkaException("Producer closed while send in progress", e);
    throw e;
    }
  • 4.序列化消息的key和value

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    byte[] serializedKey;
    try {
    serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
    } catch (ClassCastException cce) {
    throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
    " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
    " specified in key.serializer", cce);
    }
    byte[] serializedValue;
    try {
    serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
    } catch (ClassCastException cce) {
    throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
    " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
    " specified in value.serializer", cce);
    }

将String转为byte[]。

  • 5.获取partition编号

    1
    2
    3
    4
    5
    6
    7
    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
    Integer partition = record.partition();
    return partition != null ?
    partition :
    partitioner.partition(
    record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
    }
  • 5.1 如果已指定分区,则使用指定的分区号。

  • 5.2 如果未指定分区,且未指定自定义的分区规则实现类,则使用默认的分区规则

我们来看下默认的分区规则:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}

private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}

  • 5.2.1 如果消息的keyBytes不为空,则对keyBytes进行hash方式获取分区编号
  • 5.2.2 如果消息的keyBytes为空,可用分区数大于0,则使用轮询方式从可用分区中获取分区编号
  • 5.2.3 如果消息的keyBytes为空,可用分区数不大于0,则使用轮询方式从所有分区中获取分区编号

  • 6.将消息放进消息累加器的双端队列(Deque, 可以参考这篇文章Deque)中,如果双端队列中不止一个 ProducerBatch,或者最后一个 ProducerBatch 满了,或者有创建新的 ProducerBatch,都会唤醒 Sender 线程发送消息。

    RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
          serializedValue, headers, interceptCallback, remainingWaitMs);
    if (result.batchIsFull || result.newBatchCreated) {
      log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
      this.sender.wakeup();
    }
    

Sender 线程主要是有两个作用:
1是将生产者的请求发送到kafka集群
2是更新kafka集群的视图以便确定将生产者的请求发送到集群中的具体哪一个节点

整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender线程(发送线程)。在主线程中由KafkaProducer创建消息,然后依次通过可能的拦截器、key/value序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator)中。Sender线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。

主线程中发送过来的消息都会被追加到RecordAccumulator的某个双端队列(Deque)中,在 RecordAccumulator 内部为每个分区都维护了一个双端队列,队列中的内容就是 ProducerBatch,即 Deque。消息写入缓存时,追加到双端队列的尾部; Sender读取消息时,从双端队列的头部读取。注意这里的 ProducerBatch 不是 ProducerRecord,ProducerBatch中可以包含一至多个ProducerRecord。这样不仅可以使字节的使用更为紧凑, 而且可以减少请求的次数以提升整体的吞吐量。

Kafka Java 生产者是如何管理TCP连接的?

  • 为什么不选择HTTP作为请求通信的底层协议?
    因为在开发客户端时,我们能够利用TCP本身提供的一些高级功能,比如多路复用请求以及同时轮询多个连接的能力;并且目前已知的HTTP库在很多编程语言中都略显简陋。

  • TCP连接是何时创建的?
    TCP连接除了在创建KafkaProducer实例会创建之外,还可能在两个地方被创建,一个是在更新元数据后,另一个是在消息发送时。为什么说可能呢?因为这两个地方并非总是创建TCP连接。当Producer更新了集群的元数据信息后,如果发现与某些Broker当前没有连接,那么它就会创建一个TCP连接。同样地,当要发送消息时,Producer发现尚不存在与目标Broker的连接,也会创建一个。

  • 何时关闭TCP连接?
    Producer端关闭TCP连接的方式有两种:一种是用户主动关闭; 一种是Kafka自动关闭。
    用户主动关闭: a. 调用 producer.close() b. kill -9 主动”杀掉”Producer应用。
    Kafka自动关闭: 这与Producer端的参数 connections.max.idle.ms的值有关。默认参数为9分钟,即9分钟内没有任何请求”流过”某个TCP连接,那么Kafka会主动帮你把该TCP连接关闭。如果将改参数设置为 -1,那么这种回收机制将被禁用。TCP连接将成为永久长连接。当然这只是软件层面的”长连接”机制,由于Kafka创建的这些Socket连接都开启了keepalive,因此keepalive探活机制还是会遵守的。值得注意的是,在第二种方式中,TCP连接是在Broker端被关闭的,而这个TCP连接的发起方是客户端,因此在TCP看来,这属于被动关闭的场景, 即 passive close。被动关闭的后果就是会产生大量的CLOSE_WAIT连接,因此Producer端没有机会显示地观测到此连接已被中断。

参考资料

  • 《深入理解Kafka》
  • 《极客时间-Kafka核心技术与实战》