`
QING____
  • 浏览: 2234129 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Kafka Broker配置(1.1.0)

 
阅读更多

    kafka目前已升级到1.1.0版本,Broker的在线upgrade比较麻烦,所以我建议大家新项目直接基于最新版本(1.1.0),旧版本不同程度存在一些bug,比较难以解决。

 

    1、broker配置样例(未声明的配置项,建议保持默认):server.properties

 

############################# Server Basics #############################

# brokerId,同一集群id必须唯一,Integer类型
broker.id=0
auto.create.topics.enable=true
#允许客户端直接删除,admin工具可以。
delete.topic.enable=true
auto.leader.rebalance.enable=true
# 单条消息最大尺寸,10M
message.max.bytes=10000120
replica.fetch.max.bytes=10485760
# ISR,当Producer Ack模式为“all”时必须等待ISR列表中replicas全部写入成功(否则抛出异常,无法继续write);
# 此值适用于repicas 可靠性担保。
# 如果replicas为3,此值可以设置为2 + ACK="all"可以极高的确保消息可靠性。
min.insync.replicas=2

# 默认replication个数,broker=3,replication=2
default.replication.factor=2
############################# Socket Server Settings #############################
listeners=PLAINTEXT://10.0.34.121:9092

# 接收客户端请求的IO线程数,建议保持默认
num.network.threads=3

# 处理请求包括磁盘IO的线程数,建议保持默认,调优方式受制于磁盘并发能力、CPU个数等。
num.io.threads=8

# SO_SNDBUF
socket.send.buffer.bytes=102400
# SO_RCVBUF
socket.receive.buffer.bytes=102400

# 单次请求,Server允许接收的最大数据量,避免OOM。默认值为100M(104857600)
socket.request.max.bytes=104857600

# 亟待处理的请求个数,超过阈值将会阻塞网络IO线程(类似于backlog)
queued.max.requests=500

# 客户端等待broker响应最大等待时间(包括重试),Client可配置。默认30000秒。
request.timeout.ms=15000

# 单个IP允许建立的连接数,默认为Integer最大值,此处修正为256
max.connections.per.ip=256

############################# Log Basics #############################
# 底层日志文件保存的文件路径,多个磁盘路径则以“,”分割,多磁盘可以提高IO并发能力。
log.dirs=/data/kafka

# Topic默认的partition个数,此值越大理论上可以提高并发消费能力,不同的partitions将会分布在多个broker上。
# 可以通过AdminClient修改指定Topic的partition数量。
# partition个数,通常不建议大于Broker节点的个数。
num.partitions=1

# 在logs回复或者关闭刷盘时,每个dir所使用的IO线程个数。
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# 目前已知kafka内部状态数据将保存在特定的topic中,
# 比如offsets(由以前的kafka迁移到内部topic中)保存在“__consumer_offsets,
# 每个producer的事务,保存在“__transaction_state”。
# 对于内部topic,其replicas个数,建议为“大多数派”,且最多不要超过3。
# 比如:
# broker<3,replicas=1;
# brokers = 3,replicas=2;
# broker>=5,replicas=3;
# 线上配置,broker=3;事务功能暂时关闭
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# 日志刷盘策略,我们有两个重要参数可以可以权衡,分别适用在“吞吐量优先”、“数据可靠性优先”。
# 需要明确表达,如果你对数据可靠性有一定的要求,那么尚需要开启replication来保障,仅仅依赖刷盘尚且无法完全意义上确保数据的安全性。
# 如果你对系统确实是“吞吐量优先”,则可以关闭replicaiton(=1)、同时启用“按消息量”刷怕策略。

# 如下两个参数满足其一,即可Flush
# 吞吐量优先,默认为关闭(Long最大值)
log.flush.interval.messages=10000

# 可靠性 + 延迟优先,每个1秒刷盘,通用高优策略。
log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# 日志保留策略,存储日志可以根据“保留时间长”、“存量数据大小”两个维度,对于超出阈值的日志将会被清理,以确保存储层面更加可控。
log.cleanup.policy=delete
log.cleaner.enable=true
# 满足retention条件之外的“文件”,保留多久则会被cleanup。默认值为12小时。
log.cleaner.delete.retention.ms=43200000

# 日志保留 7 天:保留时长,取决于每天数据量、消费者回溯需求等。
log.retention.hours=168
## 350G
log.retention.bytes=375809638400

# 底层为LSM树,每个逻辑topic、partition都会有多个segments文件序列构成,每个segments大小,默认为1G。
# 此值建议保持默认,过小、过大都有一定的影响。
log.segment.bytes=1073741824

# 检测周期,5分钟
log.retention.check.interval.ms=300000

############################# Zookeeper #############################
# 可以包含chroot,比如:host:ip/kafka
zookeeper.connect=10.0.11.107:2181,10.0.12.107:2181,10.0.11.108:2181/kafka
zookeeper.connection.timeout.ms=6000


############################# Group Coordinator Settings #############################
# 分组协调 (broker端)
# rebalance通常对于“单组 + 多个消费者”场景,此值适用于“消费者入组后,首次relance之前等待的时长”
# (消费者启动可能逐个进行,等待足够多的consumers入组后进行rebalance)
group.initial.rebalance.delay.ms=3000
group.max.session.timeout.ms=300000
group.min.session.timeout.ms=6000

 

    1)broker.id:如果你手动声明,则整个集群不能重复,数字类型,不能为负数。

    2)min.insync.replicas:即ISR的个数,这个是数据可靠性的保障策略之一,配合Producer端“acks”参数。

        如果你的数据可靠性要求低于producer性能要求,可以设置为1。

        如果可靠性要求较高,建议设置为>=2的值,此值应该应该大于broker节点个数的“一半”;通常我们权衡的策略为,如果broker个数N == 3,此值设置为2;N >= 5,设置为3即可。过大的值,将极其影响producer效率,增加producer响应延迟,而且并不能获得更高的可靠性收益。

    3)日志刷盘策略:kafka的数据保存在本地一序列segement文件中(包括用于提高查询效率的index文件),消息提交给broker之后,最终写入磁盘才能确保数据不丢失;kafka提供了“基于时间间隔”、“基于累计消息条数”两种同步刷盘策略。

     基于时间间隔:每隔X毫秒,执行一次文件的fsync操作。基于累计消息条数:每X条消息后,同步执行一次fsync操作。

     如果你关注“吞吐量优先”,比如一些普通的日志采集、监控消息等业务类型,你完全关闭“基于时间间隔”策略(即注释点log.flush.interval.ms),此时将采用文件系统默认刷盘机制;此外你还需要适度调整IRS个数(包括replication的个数)。

     如果你关注“数据可靠性优先”,除了合适的IRS个数、以及producer端合理的“acks”时机,我们还应该权衡这两种刷盘策略的合理阈值,建议使用“每秒刷盘”,同时评估一个broker正常的TPS,并将其作为“刷盘条数”的阈值(此值不能太小,比如TPS为20000,你设置“log.flush.interval.messages=1000”,这以为这broker仍然在不停的刷盘,性能极大降级并最终影响SLA)。

 

    4)num.partitions:每个Topic的partition个数,默认值为1,尽管admin工具可以修改此值,但是仍然建议大家在部署时就设置合理。

   如果你的集群是存储一些时序性有严格要求的消息,建议设置为1,或者设置为其他值、但是需要producer端使用合理的Partitioner确保消息一致性,以及consumer消费时不要混合消费多个partitions。

  如果你的集群对发送、消费的效率要求很高,那么partition的个数,可以设置的较大,具体partitions的个数应该设置为多少,尚没有"完美"的参考值,通常需要考虑broker集群的个数、每个broker磁盘并发的能力、broker上Topic的个数(以及它们各自partitions的个数总和)等;一个比较有粗略的参考值为,建议初步设置为broker节点的个数。

 

    5)日志保留策略:通常我们需要开启,设定kafka保存消息(数据文件)的时长,也限定consumer允许数据回溯的最大时间。查出时间阈值的日志文件,可以“压缩备份”,也可以删除。

    6)分组协调:主要适用于消费者,通常production环境,消费者时分组的,每组多个消费者使用相同的groupId;虽然消费者处理消息的效率基本差异不大,但是仍然存在消费者失效(或者block)等问题,此时Broker则可以决策哪些是“慢消费者”、“失效消费者”,以决定适时平衡(由其他在线的消费者接管消费服务)。

 

 

    2、Broker JVM参数修改(kafka-server-start.sh)

base_dir=$(dirname $0)
## 在base_dir参数之后增加
export KAFKA_HEAP_OPTS="-Xms12G -Xmx12G -XX:NewRatio=2 -XX:SurvivorRatio=8 -XX:MaxMetaspaceSize=512M -XX:CompressedClassSpaceSize=512M"
## 开启JMX监控,默认端口为5760
export JMX_PORT=${JMX_PORT:-5760}
## 如果你希望基于jolokia + HTTP方式输出监控数据,以被采集器获取,可以增加jolokia配置
## export KAFKA_OPTS="$KAFKA_OPTS -javaagent:/opt/jolokia/jolokia-jvm-1.5-agent.jar"

    只需要增加有关内存的相关参数即可。

 

    3、kafka-server-stop.sh

#将原来PIDS的获取方式,修改为“kafka\.kafka”,否则无法获取实际进程的ID

PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}')

 

    主要是解决,此脚本无法正确找到kafka进程的问题。

 

    4、kafka broker与client兼容关系描述

    参考:https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix

    简述:0.9以上版本的client和broker,低版本client可以访问高版本的broker,但是高版本的client访问低版本的broker时可能存在兼容性问题。

 

    5、kafka consumer group的几种状态

    1)Empty:此group中没有任何消费者在线(曾经提交过offset),可能是新建的group。当一个“Empty”的group过期之后,其状态会迁移为“Dead”。

    2)PrepareingRebalance:准备进行rebalance,通常是此group中有消费者加入或者离开时(探测周期)触发,这意味着topic/partition在组内多个消费者之间重新分配。

    3)AwaitingSync:等待Group Leader重新分配topic/partitions。

    4)Stable:正常状态。

    5)Dead:Group内已经没有任何消费者(members),且其offset记录等meta信息即将被删除。

 

 

    

  • 大小: 347 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics