Kafka producer defaults
Since kafka 3.0, producers are “safe” by default. i.e. acks = all (acks = -1) and enable.idempotence = true
If you are on Kafka version 2.8 or lower, following are the defaults and need to be changed for a “safe” producer
acks = 1 and enable.idempotence = false
Change to acks = all to ensure data is properly replcated before an ack is received
Set min.sync.replicas = 2 (broker/topic level) with replication factor = 3, ensures at least 2 brokers in in-sync replica have the data after an ack
Set enable.idempotence = true to ensure deplicates are not introduced due to network issues
Set retries = MAX_INT (producer level) to ensure producer will keep retrying until the delivery.timeout.ms is reached
Set delivery.timeout.ms = 120000 to ensure keep trying for 2 minutes before failing
Set max.in.flight.requests.per.connection = 5 to ensure max performance which keeping message ordering
For a high confidence that leaders and replicas have the data, use “acks=all, replication factor=3,min-insync-replicas=2” which means 2 replicas (including leader) must have the data otherwise producer will not receive an ack
To prevent duplicates in Kafka introduced by the network, should use “enable.idempotence= true”
Kafka Producer Compression
Producers usually send data that is text based, for example JSON. It is important to apply compression. Comparession can be applied at Producer level and does not need to config change at broker or consumer level.
Set compression.type = None (default), gzip, lz4, snappy, zstd
Comparession is more effective the bigger the batch of messages being sent to Kafka.
Advantages of compression – smaller producer request size (records 1 to 100 compressed into one batch), faster transfer of data (reduced latency), better throughput, better disk utilization (messages stored on disk are smaller)
Disadvantages – cpu cylces spent on compression and de-comparession
Start with Snappy or lz4 compression for optimal speed/compression trade-off and test others. Consider tweaking linger.ms and batch.size to have bigger batches for more compression and higher throughput.
Compression can be set at Broker level but default is producer (compression.type = producer) If the compression.type is different between producer and broker, batches will be decompressed at broker and then re-compressed.
Kafka – improve batching mechanism
By default, kafka will send the records as soon as possible when doing .send() It will have max.in.flight.requests.per.connection = 5, will have at most 5 message batches in flight(being sent between producer & broker), this provides parallelism. If there are more messages to be sent, Kafka will start smart batching them before the next batch send() which helps throughput.
Two setting to influence batching
linger.ms – how long to wait before sending a batch at the cost of latency
batch.size – if the batch is filled, before linger.ms period, send the batch. We should increase batch.size at this point. batch size is 16 KB by default. If a sinlg emessage is bigger than batch.size, it will not be batched and sent immediately. You can monitor average batch size using Kafka producer metrics.
Producer Sticky Partitioner
Key hashes are used to determine the mapping of the key to a partition. By default, murmur2 algorithm is used for hashing. The formula for determing the partition is
target_partition = Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1)
which means same key will go the same partition but if we add a new partition, the whole thing is broken. You could override the behavior of the partitioner using custom partitioner.class but it is not recommended except for advanced usage.
When key = null , upto kafka 2.3 and below, we have default partitioner as Round Robin, for 2.4 and above, we have Sticky Partitioner which improves the performance when we have high throughtput.
Round robin keeps sending to partitions in round robin fashion which means more batchs (one batch per partition) and smaller batch size with lets say 100 partitions. Not optimal.
With sticky partitioner, we “stick” to a partition until linger.ms has elapsed or batch.size is full. After the batch is sent, we switch to a new sticky partition. Provides larger batches (batch.size is more likely to be reached)
Delivery Semantics
At Most Once – offsets are committed as soon as message batch is received. If anything goes wrong in processing the message, it will be lost (it will not be read again). messages will be processed at most once, sometimes zero times.
At Least Once (preferred) – offsets are committed after the messages are processed. If the processing goes wrong, messages will be read again.This could result in duplicate processing of messages. The processing must be idempotent.
Exactly Once (dream goal) – messages read from kafka and put back into Kafka using Transactional API (with Kafka Streams API)
Consumer Offset Commit Strategies
Two common patterns for committing offset in consumer –
(Easy) enable.auto.commit = true along with synchronous processing of messages. This is the default when using java consumer API. This enables at least once scenario by default. Offsets are committed when poll() is called and auto.commit.interval.ms has elapsed. We need to make sure all messages are successfully processed before invoking poll(). If not, poll() will commit and you will lose messages.
(Medium) enable.auto.commit = false and manual commit of offsets. You will accumulate the batch on every poll() call and if the batch is big anough or enough time has elapsed, then do something synchronous with the batch, then commit your offsets asynchronously.
For example, accumulate records in a buffer, then flush the buffer to database and commit offsets asynchronously as a single transaction.
(Hard) enable.auto.commit = false and store offsets externally – Assign partitions to consumers manually and use seek() API to start reading, need to model and store offsets in database. Need to handle partition rebalances using ConsumerRebalanceListener interface
Consumer Offset Reset behavior
consumer is expected to read from the log continuously. Due to application bug, your consumer can go down. Kafka has a default retention period of 7 days, which means the offsets become “invalid” after 7 days.
Offset reset policies –
auto.offset.reset = latest will read from the end of the log
auto.offset.reset = earliest will read from the begining of the log
auto.offset.reset = none will throw an exception if an offset is found. maybe you do not want to keep on processing,want to find a away to recover some data before you start processing etc.
Number of days can be controlled by offset.retention.minutes – usually set to a month
Replaying data for consumers
to replay data for a consumer group –
Take all consumers from a specific group down, then use kafka-consumers-group command to set the offset you want to start from and then restart your consumers
Controlling Consumer Liveliness
Consumers in a group talk to Consumer Group Coordinator in a separate “heartbeat” thread, the consumers also talk to the broker in a “poll” thread. These 2 mechanisms help detect consumers that are dead.
heartbeat.interval.ms (default 3 seconds) usually set to 1/3rd of session.timeout.ms
session.timeout.ms (default 45 seconds) if you want faster consumer rebalances, set this to lower number so that dead consumers will be removed faster.
max.poll.interval.ms (default 5 minutes) max amount of time allowed between 2 poll() calls before declaring consumer dead
max.poll.records (default 500) how many records to receive per poll request
fetch.min.bytes (default 1) at least how much data you want to pull on each request
fetch.max.wait.ms (default 500) – max amount of time kafka broker will block before answering the fetch request if there isn’t enough data available specified by fetch.min.bytes
max.partition.fetch.bytes (default 1 MB) max amount of data per partition server will return
fetch.max.bytes – max amount of data per fetch request
Consumer Replica Fetching – Rack Awareness
Consumers by default will read from the leader broker for a partition. If the leader is in a different data center, will cuase higher latency and higher $$ nertwork charges. Data center = AZ in AWS, you will pay cross AZ network charges.
It is possible to configure the consumers to read from closest Replica. You will pay network charges for Replication between leader and ISR replicas.
Specify rack.id for the broker – it is id of the data center, for example, AZ id like USW2-AZ1
Then replica.selector.class must be set to org.apache.kafka.common.replica.RackAwareReplicaSelector
On consumer side, set client.rack to the data center id the consumer is launched on