Kafka Basics

Kafka producer defaults

Since kafka 3.0, producers are “safe” by default. i.e. acks = all (acks = -1) and enable.idempotence = true

If you are on Kafka version 2.8 or lower, following are the defaults and need to be changed for a “safe” producer

acks = 1 and enable.idempotence = false

Change to acks = all to ensure data is properly replcated before an ack is received

Set min.sync.replicas = 2 (broker/topic level) with replication factor = 3, ensures at least 2 brokers in in-sync replica have the data after an ack

Set enable.idempotence = true to ensure deplicates are not introduced due to network issues

Set retries = MAX_INT (producer level) to ensure producer will keep retrying until the delivery.timeout.ms is reached

Set delivery.timeout.ms = 120000 to ensure keep trying for 2 minutes before failing

Set max.in.flight.requests.per.connection = 5 to ensure max performance which keeping message ordering

For a high confidence that leaders and replicas have the data, use “acks=all, replication factor=3,min-insync-replicas=2” which means 2 replicas (including leader) must have the data otherwise producer will not receive an ack

To prevent duplicates in Kafka introduced by the network, should use “enable.idempotence= true”

Kafka Producer Compression

Producers usually send data that is text based, for example JSON. It is important to apply compression. Comparession can be applied at Producer level and does not need to config change at broker or consumer level.

Set compression.type = None (default), gzip, lz4, snappy, zstd

Comparession is more effective the bigger the batch of messages being sent to Kafka.

Advantages of compression – smaller producer request size (records 1 to 100 compressed into one batch), faster transfer of data (reduced latency), better throughput, better disk utilization (messages stored on disk are smaller)

Disadvantages – cpu cylces spent on compression and de-comparession

Start with Snappy or lz4 compression for optimal speed/compression trade-off and test others. Consider tweaking linger.ms and batch.size to have bigger batches for more compression and higher throughput.

Compression can be set at Broker level but default is producer (compression.type = producer) If the compression.type is different between producer and broker, batches will be decompressed at broker and then re-compressed.

Kafka – improve batching mechanism

By default, kafka will send the records as soon as possible when doing .send() It will have max.in.flight.requests.per.connection = 5, will have at most 5 message batches in flight(being sent between producer & broker), this provides parallelism. If there are more messages to be sent, Kafka will start smart batching them before the next batch send() which helps throughput.

Two setting to influence batching

linger.ms – how long to wait before sending a batch at the cost of latency

batch.size – if the batch is filled, before linger.ms period, send the batch. We should increase batch.size at this point. batch size is 16 KB by default. If a sinlg emessage is bigger than batch.size, it will not be batched and sent immediately. You can monitor average batch size using Kafka producer metrics.

Producer Sticky Partitioner

Key hashes are used to determine the mapping of the key to a partition. By default, murmur2 algorithm is used for hashing. The formula for determing the partition is

target_partition = Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1)

which means same key will go the same partition but if we add a new partition, the whole thing is broken. You could override the behavior of the partitioner using custom partitioner.class but it is not recommended except for advanced usage.

When key = null , upto kafka 2.3 and below, we have default partitioner as Round Robin, for 2.4 and above, we have Sticky Partitioner which improves the performance when we have high throughtput.

Round robin keeps sending to partitions in round robin fashion which means more batchs (one batch per partition) and smaller batch size with lets say 100 partitions. Not optimal.

With sticky partitioner, we “stick” to a partition until linger.ms has elapsed or batch.size is full. After the batch is sent, we switch to a new sticky partition. Provides larger batches (batch.size is more likely to be reached)

Delivery Semantics

At Most Once – offsets are committed as soon as message batch is received. If anything goes wrong in processing the message, it will be lost (it will not be read again). messages will be processed at most once, sometimes zero times.

At Least Once (preferred) – offsets are committed after the messages are processed. If the processing goes wrong, messages will be read again.This could result in duplicate processing of messages. The processing must be idempotent.

Exactly Once (dream goal) – messages read from kafka and put back into Kafka using Transactional API (with Kafka Streams API)

Consumer Offset Commit Strategies

Two common patterns for committing offset in consumer –

(Easy) enable.auto.commit = true along with synchronous processing of messages. This is the default when using java consumer API. This enables at least once scenario by default. Offsets are committed when poll() is called and auto.commit.interval.ms has elapsed. We need to make sure all messages are successfully processed before invoking poll(). If not, poll() will commit and you will lose messages.

(Medium) enable.auto.commit = false and manual commit of offsets. You will accumulate the batch on every poll() call and if the batch is big anough or enough time has elapsed, then do something synchronous with the batch, then commit your offsets asynchronously.

For example, accumulate records in a buffer, then flush the buffer to database and commit offsets asynchronously as a single transaction.

(Hard) enable.auto.commit = false and store offsets externally – Assign partitions to consumers manually and use seek() API to start reading, need to model and store offsets in database. Need to handle partition rebalances using ConsumerRebalanceListener interface

Consumer Offset Reset behavior

consumer is expected to read from the log continuously. Due to application bug, your consumer can go down. Kafka has a default retention period of 7 days, which means the offsets become “invalid” after 7 days.

Offset reset policies –

auto.offset.reset = latest will read from the end of the log

auto.offset.reset = earliest will read from the begining of the log

auto.offset.reset = none will throw an exception if an offset is found. maybe you do not want to keep on processing,want to find a away to recover some data before you start processing etc.

Number of days can be controlled by offset.retention.minutes – usually set to a month

Replaying data for consumers

to replay data for a consumer group –

Take all consumers from a specific group down, then use kafka-consumers-group command to set the offset you want to start from and then restart your consumers

Controlling Consumer Liveliness

Consumers in a group talk to Consumer Group Coordinator in a separate “heartbeat” thread, the consumers also talk to the broker in a “poll” thread. These 2 mechanisms help detect consumers that are dead.

heartbeat.interval.ms (default 3 seconds) usually set to 1/3rd of session.timeout.ms

session.timeout.ms (default 45 seconds) if you want faster consumer rebalances, set this to lower number so that dead consumers will be removed faster.

max.poll.interval.ms (default 5 minutes) max amount of time allowed between 2 poll() calls before declaring consumer dead

max.poll.records (default 500) how many records to receive per poll request

fetch.min.bytes (default 1) at least how much data you want to pull on each request

fetch.max.wait.ms (default 500) – max amount of time kafka broker will block before answering the fetch request if there isn’t enough data available specified by fetch.min.bytes

max.partition.fetch.bytes (default 1 MB) max amount of data per partition server will return

fetch.max.bytes – max amount of data per fetch request

Consumer Replica Fetching – Rack Awareness

Consumers by default will read from the leader broker for a partition. If the leader is in a different data center, will cuase higher latency and higher $$ nertwork charges. Data center = AZ in AWS, you will pay cross AZ network charges.

It is possible to configure the consumers to read from closest Replica. You will pay network charges for Replication between leader and ISR replicas.

Specify rack.id for the broker – it is id of the data center, for example, AZ id like USW2-AZ1

Then replica.selector.class must be set to org.apache.kafka.common.replica.RackAwareReplicaSelector

On consumer side, set client.rack to the data center id the consumer is launched on