`
QING____
  • 浏览: 2230894 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Apache kafka原理与特性(0.8V)

 
阅读更多

前言: kafka是一个轻量级的/分布式的/具备replication能力的日志采集组件,通常被集成到应用系统中,收集"用户行为日志"等,并可以使用各种消费终端(consumer)将消息转存到HDFS等其他结构化数据存储系统中.因为日志消息通常为文本数据,尺寸较小,且对实时性以及数据可靠性要求不严格,但是需要日志存储端具备较高的数据吞吐能力,这种"宽松"的设计要求,非常适合使用kafka. 

一.入门

    1.1 简介

    Kafka是一个"分布式的"/"可分区的(partitioned)"/"基于备份的(replicated)"/"基于commit-log存储"的服务. 它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现.

    kafka消息是根据Topic进行归类,发送消息者成为Producer,消息接收者成为Consumer;此外kafka集群有多个kafka实例组成,每个实例(server)称为broker.

    无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性以及保存一些meta信息.

 

(摘自官网) 

    其中client与server的通讯,都是基于TCP,而且消息协议非常轻量级.

    Topics/logs

    一个Topic可以认为是一类消息,每个topic将被分成多个partition(区),每个partition在存储层面是append log文件.任何发布到此partition的消息都会直接追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),offset为一个long型数字,它唯一的标记一条消息.kafka并没有提供其他额外的索引机制来存储offset,因为在kafka中几乎不允许对消息进行"随机读-写",一旦消息写入log日志之后,将不能被修改.



(摘自官网) 

    kafka和JMS实现(activeMQ)不同的是:即使消息被消费,消息仍然不会被立即删除.日志文件将会根据broker中的配置要求,保留一定的时间之后删除;比如log文件保留2天,那么两天后,文件会被清除,无论其中的消息是否被消费.kafka通过这种简单的手段,来释放磁盘空间.此外,kafka的性能并不会因为日志文件的太多而低下,所以即使保留较多的log文件,也不不会有问题.

    对于consumer而言,它需要保存消费消息的offset,对于offset的保存和使用,有consumer来控制;当consumer正常消费消息时,offset将会"线性"的向前驱动,即消息将依次顺序被消费.事实上consumer可以使用任意顺序消费消息,它只需要将offset重置为任意值..(offset将会保存在zookeeper中,参见下文)

    kafka集群几乎不需要维护任何consumer和producer状态信息,这些信息有zookeeper保存;因此producer和consumer的客户端实现非常轻量级,它们可以随意离开,而不会对集群造成额外的影响.

    partitions的设计目的有多个.最根本原因是kafka基于文件存储.通过分区,可以将日志内容分散到多个server上,来避免文件尺寸达到单机磁盘的上限,每个partiton都会被当前server(kafka实例)保存;可以将一个topic切分多任意多个partitions(备注:基于sharding),来消息保存/消费的效率.此外越多的partitions意味着可以容纳更多的consumer,有效提升并发消费的能力.(具体原理参见下文).

    Distribution

    一个Topic的多个partitions,被分布在kafka集群中的多个server上;每个server(kafka实例)负责partitions中消息的读写操作;此外kafka还可以配置每个partition需要备份的个数(replicas),每个partition将会被备份到多台机器上,以提高可用性.[replicas特性在0.8V才支持]

    基于replicated方案,那么就意味着需要对多个备份进行调度;一个partition可以在多个server上备份,那么其中一个server作为此partiton的leader;leader负责此partition所有的读写操作,如果leader失效,那么将会有其他follower来接管(成为新的leader);follower只是单调的和leader跟进,同步消息即可..由此可见作为leader的server承载了全部的请求压力,因此从集群的整体考虑,有多少个partitions就意味着有多少个"leader",kafka会将"leader"均衡的分散在每个实例上,来确保整体的性能稳定.[备注:kafka中将leader角色权限下放到partition这个层级]

 

kafka-cluster 

    Producers

    Producer将消息发布到指定的Topic中,同时Producer也能决定将此消息发送到哪个partition;如果一个Topic有多个partitions时,你需要选择partition的算法(由producer决定),比如基于"round-robin"方式或者通过其他的一些算法等.无论如何选择partition路由算法,我们最直接的目的就是希望消息能够均匀的发送给每个partition,这样可以让consumer消费的消息量也能"均衡".

    Consumers

    本质上kafka只支持Topic.每个consumer属于一个consumer group;反过来说,每个group中可以有多个consumer.对于Topic中的一条特定的消息,只会被订阅此Topic的每个group中的一个consumer消费,此消息不会发送给一个group的多个consumer;那么一个group中所有的consumer将会交错的消费整个Topic.

    如果所有的consumer都具有相同的group,这种情况和JMS queue模式很像;消息将会在consumers之间负载均衡.

    如果所有的consumer都具有不同的group,那这就是"发布-订阅";消息将会广播给所有的消费者.



(摘自官网) 

    在kafka中,一个partition中的消息只会被group中的一个consumer消费(同一时刻);每个group中consumer消息消费互相独立;我们可以认为一个group是一个"订阅"者,一个Topic中的每个partions,只会被一个"订阅者"中的一个consumer消费,不过一个consumer可以同时消费多个partitions中的消息.kafka只能保证一个partition中的消息被某个consumer消费时是顺序的.事实上,从Topic角度来说,当有多个partitions时,消息仍不是全局有序的.

    通常情况下,一个group中会包含多个consumer,这样不仅可以提高topic中消息的并发消费能力,而且还能提高"故障容错"性,如果group中的某个consumer失效,那么其消费的partitions将会有其他consumer自动接管.

    kafka的设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息.

    Guarantees

    1) 发送到partitions中的消息将会按照它接收的顺序追加到日志中,无论一个partition由多少个log文件构成,那么它发送给consumer的顺序是一定的.

    2) 对于消费者而言,它们消费消息的顺序和日志中消息顺序一致.

    3) 如果Topic的"replication factor"为N,那么允许N-1个kafka实例失效.只要有一个replication存活,那么此partition的读写操作都不会中断.

 

    1.2 Use cases

    Messaging

    和一些常规的消息系统相比,kafka仍然是个不错的选择;它具备partitons/replication和容错,可以使kafka具有良好的扩展性和性能优势.不过到目前为止,我们应该很清楚认识到,kafka并没有提供JMS中的"事务性""消息传输担保(消息确认机制)""消息分组"等企业级特性;kafka只能使用作为"常规"的消息系统,在一定程度上,尚未确保消息的发送与接收绝对可靠(比如,消息重发,消息发送丢失等)

    Websit activity tracking

    kafka可以作为"网站活性跟踪"的最佳工具;可以将网页/用户操作等信息发送到kafka中.并实时监控,或者离线统计分析等.

    Log Aggregation

    kafka的特性决定它非常适合作为"日志收集中心";application可以将操作日志"批量""异步"的发送到kafka集群中,而不是保存在本地或者DB中;kafka可以批量提交消息/压缩消息等,这对producer端而言,几乎感觉不到性能的开支.此时consumer端可以使hadoop等其他系统化的存储和分析系统.

 

二. 设计原理

    kafka的设计初衷是希望做为一个统一的信息收集平台,能够实时的收集反馈信息,并需要能够支撑较大的数据量,且具备良好的容错能力.

    1.Persistence

    kafka使用文件存储消息(append only log),这就直接决定kafka在性能上严重依赖文件系统的本身特性.且无论任何OS下,对文件系统本身的优化是非常艰难的.文件缓存/直接内存映射等是常用的手段.因为kafka是对日志文件进行append操作,因此磁盘检索的开支是较小的;同时为了减少磁盘写入的次数,broker会将消息暂时buffer起来,当消息的个数(或尺寸)达到一定阀值时,再flush到磁盘,这样减少了磁盘IO调用的次数.对于kafka而言,较高性能的磁盘,将会带来更加直接的性能提升.

    2.Efficiency

    需要考虑的影响性能点很多,除磁盘IO之外,我们还需要考虑网络IO,这直接关系到kafka的吞吐量问题.kafka并没有提供太多高超的技巧;对于producer端,可以将消息buffer起来,当消息的条数达到一定阀值时,批量发送给broker;对于consumer端也是一样,批量fetch多条消息.不过消息量的大小可以通过配置文件来指定.对于kafka broker端,似乎有个sendfile系统调用可以潜在的提升网络IO的性能:将文件的数据映射到系统内存中,socket直接读取相应的内存区域即可,而无需进程再次copy和交换(这里涉及到"磁盘IO数据"/"内核内存"/"进程内存"/"网络缓冲区",多者之间的数据copy).

 

    其实对于producer/consumer/broker三者而言,CPU的开支应该都不大,因此启用消息压缩机制是一个良好的策略;压缩需要消耗少量的CPU资源,不过对于kafka而言,网络IO更应该需要考虑.可以将任何在网络上传输的消息都经过压缩.kafka支持gzip/snappy等多种压缩方式.

    3. Producer

    Load balancing

    kafka集群中的任何一个broker,都可以向producer提供metadata信息,这些metadata中包含"集群中存活的servers列表"/"partitions leader列表"等信息(请参看zookeeper中的节点信息). 当producer获取到metadata信息之后, producer将会和Topic下所有partition leader保持socket连接;消息由producer直接通过socket发送到broker,中间不会经过任何"路由层".事实上,消息被路由到哪个partition上,有producer客户端决定.比如可以采用"random""key-hash""轮询"等,如果一个topic中有多个partitions,那么在producer端实现"消息均衡分发"是必要的.在producer端的配置文件中,开发者可以指定partition路由的方式.

 

    Asynchronous send

    将多条消息暂且在客户端buffer起来,并将他们批量发送到broker;小数据IO太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率;不过这也有一定的隐患,比如当producer失效时,那些尚未发送的消息将会丢失.

    4.Consumer

    consumer端向broker发送"fetch"请求,并告知其获取消息的offset;此后consumer将会获得一定条数的消息;consumer端也可以重置offset来重新消费消息.[备注:offset,消息偏移量,integer值,broker可以根据offset来决定消息的起始位置]

    在JMS实现中,Topic模型基于push方式,即broker将消息推送给consumer端.不过在kafka中,采用了pull方式,即consumer在和broker建立连接之后,主动去pull(或者说fetch)消息;这中模式有些优点,首先consumer端可以根据自己的消费能力适时的去fetch消息并处理,且可以控制消息消费的进度(offset);此外,消费者可以良好的控制消息消费的数量,batch fetch.

    其他JMS实现,消息消费的位置是有provider保留,以便避免重复发送消息或者将没有消费成功的消息重发等,同时还要控制消息的状态.这就要求JMS broker需要太多额外的工作.在kafka中,partition中的消息只有一个consumer在消费,且不存在消息状态的控制,也没有复杂的消息确认机制,可见kafka broker端是相当轻量级的.当消息被consumer接收之后,consumer可以在本地保存最后消息的offset,并间歇性的向zookeeper注册offset.由此可见,consumer客户端也很轻量级.

    这就意味着,kafka中consumer负责维护消息的消费记录,而broker则不关心这些,这种设计不仅提高了consumer端的灵活性,也适度的减轻了broker端设计的复杂度;这是和众多JMS prodiver的区别.此外,kafka中消息ACK的设计也和JMS有很大不同,kafka中的消息时批量(通常以消息的条数或者chunk的尺寸为单位)发送给consumer,当消息消费成功后,向zookeeper提交消息的offset,而不会向broker交付ACK.或许你已经意识到,这种"宽松"的设计,将会有"丢失"消息/"消息重发"的危险.

 

    5.Message Delivery Semantics

    对于JMS实现,消息传输担保非常直接:有且只有一次(exactly once).在kafka中稍有不同,对于consumer而言:

    1) at most once: 最多一次,这个和JMS中"非持久化"消息类似.发送一次,无论成败,将不会重发.

    2) at least once: 消息至少发送一次,如果消息未能接受成功,可能会重发,直到接收成功.

    3) exactly once: 消息只会发送一次.

    at most once: 消费者fetch消息,然后保存offset,然后处理消息;当client保存offset之后,但是在消息处理过程中consumer进程失效(crash),导致部分消息未能继续处理.那么此后可能其他consumer会接管,但是因为offset已经提前保存,那么新的consumer将不能fetch到offset之前的消息(尽管它们尚没有被处理),这就是"at most once".

    at least once: 消费者fetch消息,然后处理消息,然后保存offset.如果消息处理成功之后,但是在保存offset阶段zookeeper异常或者consumer失效,导致保存offset操作未能执行成功,这就导致接下来再次fetch时可能获得上次已经处理过的消息,这就是"at least once".

    exactly once: kafka中并没有严格的去实现(基于2阶段提交,事务),我们认为这种策略在kafka中是没有必要的.

    因为"消息消费"和"保存offset"这两个操作的先后时机不同,导致了上述3种情况,通常情况下"at-least-once"是我们搜选.(相比at most once而言,重复接收数据总比丢失数据要好).



 

    6. Replication

    kafka中,replication策略是基于partition,而不是topic;kafka将每个partition数据复制到多个server上,任何一个partition有一个leader和多个follower(可以没有);备份的个数可以通过broker配置文件来设定.leader处理所有的read-write请求,follower需要和leader保持同步.Follower就像一个"consumer",消费消息并保存在本地日志中;leader负责跟踪所有的follower状态,如果follower"落后"太多或者失效,leader将会把它从replicas同步列表中删除.当所有的follower都将一条消息保存成功,此消息才被认为是"committed",那么此时consumer才能消费它,这种同步策略,就要求follower和leader之间必须具有良好的网络环境.即使只有一个replicas实例存活,仍然可以保证消息的正常发送和接收,只要zookeeper集群存活即可.(备注:不同于其他分布式存储,比如hbase需要"多数派"存活才行)

    kafka判定一个follower存活与否的条件有2个:1) follower需要和zookeeper保持良好的链接 2) 它必须能够及时的跟进leader,不能落后太多.如果同时满足上述2个条件,那么leader就认为此follower是"活跃的".如果一个follower失效(server失效)或者落后太多,leader将会把它从同步列表中移除[备注:如果此replicas落后太多,它将会继续从leader中fetch数据,直到足够up-to-date,然后再次加入到同步列表中;kafka不会更换replicas宿主!因为"同步列表"中replicas需要足够快,这样才能保证producer发布消息时接受到ACK的延迟较小].

    当leader失效时,需在followers中选取出新的leader,可能此时follower落后于leader,因此需要选择一个"up-to-date"的follower.kafka中leader选举并没有采用"投票多数派"的算法,因为这种算法对于"网络稳定性"/"投票参与者数量"等条件有较高的要求,而且kafka集群的设计,还需要容忍N-1个replicas失效.对于kafka而言,每个partition中所有的replicas信息都可以在zookeeper中获得,那么选举leader将是一件非常简单的事情.选择follower时需要兼顾一个问题,就是新leader server上所已经承载的partition leader的个数,如果一个server上有过多的partition leader,意味着此server将承受着更多的IO压力.在选举新leader,需要考虑到"负载均衡",partition leader较少的broker将会更有可能成为新的leader.

    在整几个集群中,只要有一个replicas存活,那么此partition都可以继续接受读写操作.

    7.Log

    如果一个topic的名称为"my_topic",它有2个partitions,那么日志将会保存在my_topic_0和my_topic_1两个目录中;日志文件中保存了一序列"log entries"(日志条目),每个log entry格式为"4个字节的数字N表示消息的长度" + "N个字节的消息内容";每个日志都有一个offset来唯一的标记一条消息,offset的值为8个字节的数字,表示此消息在此partition中所处的起始位置..每个partition在物理存储层面,有多个log file组成(称为segment).segment file的命名为"最小offset".kafka.例如"00000000000.kafka";其中"最小offset"表示此segment中起始消息的offset.



(摘自官网) 

    其中每个partiton中所持有的segments列表信息会存储在zookeeper中.

    当segment文件尺寸达到一定阀值时(可以通过配置文件设定,默认1G),将会创建一个新的文件;当buffer中消息的条数达到阀值时将会触发日志信息flush到日志文件中,同时如果"距离最近一次flush的时间差"达到阀值时,也会触发flush到日志文件.如果broker失效,极有可能会丢失那些尚未flush到文件的消息.因为server意外失效,仍然会导致log文件格式的破坏(文件尾部),那么就要求当server启东是需要检测最后一个segment的文件结构是否合法并进行必要的修复.

    获取消息时,需要指定offset和最大chunk尺寸,offset用来表示消息的起始位置,chunk size用来表示最大获取消息的总长度(间接的表示消息的条数).根据offset,可以找到此消息所在segment文件,然后根据segment的最小offset取差值,得到它在file中的相对位置,直接读取输出即可.

    日志文件的删除策略非常简单:启动一个后台线程定期扫描log file列表,把保存时间超过阀值的文件直接删除(根据文件的创建时间).为了避免删除文件时仍然有read操作(consumer消费),采取copy-on-write方式.

    8.Distribution

    kafka使用zookeeper来存储一些meta信息,并使用了zookeeper watch机制来发现meta信息的变更并作出相应的动作(比如consumer失效,触发负载均衡等)

    1) Broker node registry: 当一个kafka broker启动后,首先会向zookeeper注册自己的节点信息(临时znode),同时当broker和zookeeper断开连接时,此znode也会被删除.

    格式: /broker/ids/[0...N]   -->host:port;其中[0..N]表示broker id,每个broker的配置文件中都需要指定一个数字类型的id(全局不可重复),znode的值为此broker的host:port信息.

    2) Broker Topic Registry: 当一个broker启动时,会向zookeeper注册自己持有的topic和partitions信息,仍然是一个临时znode.

    格式: /broker/topics/[topic]/[0...N]  其中[0..N]表示partition索引号.

    3) Consumer and Consumer group: 每个consumer客户端被创建时,会向zookeeper注册自己的信息;此作用主要是为了"负载均衡".

    一个group中的多个consumer可以交错的消费一个topic的所有partitions;简而言之,保证此topic的所有partitions都能被此group所消费,且消费时为了性能考虑,让partition相对均衡的分散到每个consumer上.

    4) Consumer id Registry: 每个consumer都有一个唯一的ID(host:uuid,可以通过配置文件指定,也可以由系统生成),此id用来标记消费者信息.

    格式: /consumers/[group_id]/ids/[consumer_id]

    仍然是一个临时的znode,此节点的值为{"topic_name":#streams...},即表示此consumer目前所消费的topic + partitions列表.

    5) Consumer offset Tracking: 用来跟踪每个consumer目前所消费的partition中最大的offset.

    格式: /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]   -->offset_value

    此znode为持久节点,可以看出offset跟group_id有关,以表明当group中一个消费者失效,其他consumer可以继续消费.

    6) Partition Owner registry: 用来标记partition正在被哪个consumer消费.临时znode

    格式: /consumers/[group_id]/owners/[topic]/[broker_id-partition_id]   -->consumer_node_id

    此节点表达了"一个partition"只能被group下一个consumer消费,同时当group下某个consumer失效,那么将会触发负载均衡(即:让partitions在多个consumer间均衡消费,接管那些"游离"的partitions)

 

    当consumer启动时,所触发的操作:

    A) 首先进行"Consumer id Registry";

    B) 然后在"Consumer id Registry"节点下注册一个watch用来监听当前group中其他consumer的"leave"和"join";只要此znode path下节点列表变更,都会触发此group下consumer的负载均衡.(比如一个consumer失效,那么其他consumer接管partitions).

    C) 在"Broker id registry"节点下,注册一个watch用来监听broker的存活情况;如果broker列表变更,将会触发所有的groups下的consumer重新balance.

 

    Consumer均衡算法

    当一个group中,有consumer加入或者离开时,会触发partitions均衡.均衡的最终目的,是提升topic的并发消费能力.

     1) 假如topic1,具有如下partitions: P0,P1,P2,P3

     2) 假如group中,有如下consumer: C0,C1

     3) 首先根据partition索引号对partitions排序: P0,P1,P2,P3

     4) 根据consumer.id排序: C0,C1

     5) 计算倍数: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)

     6) 然后依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)]



    总结: 

    1) Producer端使用zookeeper用来"发现"broker列表,以及和Topic下每个partition leader建立socket连接并发送消息.

    2) Broker端使用zookeeper用来注册broker信息,以及监测partition leader存活性.

    3) Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader建立socket连接,并获取消息.

 

三.主要配置 

    1.Broker主要配置

##broker标识,cluster中,此ID必须唯一
broker.id=0
##接受consumer/producer的链接端口
port=9092
##用来维护集群状态,以及consumer消费记录
##consumer和broker必须接入到同一个zk环境中.
##zookeeper.connect指定zookeeper的地址,默认情况下将会在zk的“/”目录下
##创建meta信息和路径,为了对znode进行归类,我们可以在connect之后追加路径,比如
##zookeeper.connect=127.0.0.1:2181/kafka
##不过需要注意,此后的producer、consumer都需要带上此根路径
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=30000
##broker所能接受的消息的最大尺寸
##producer不能发布更大尺寸的message
messages.max.bytes=1000000
##broker在处理client请求时,允许开启的线程个数.默认为3.
num.network.threads=3
##用于磁盘IO操作的线程的个数,默认为8,建议和磁盘的个数保持一致
num.io.threads=8
##允许入队的最大请求数,"数据操作请求"首先加入队列,等待IO线程
##进行磁盘操作获取数据,数据操作结束后,请求被移除队列并由network
##线程响应给client端.此参数用于控制"等待IO处理的请求数".
queued.max.requests=500
#socket调优参数: sendBuffer (SO_SNDBUF)
socket.send.buffer.bytes=1048576
##socket调优参数:receiveBuffer (SO_RCVBUFF)
socket.receive.buffer.bytes=1048576
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
#################Log##########
log.dirs=/tmp/kafka-logs
##每个topic的分区数.
##kafka的特点就在于"分区",每个Topic被拆分成多个partitions
##partitions可以被sharding到多个broker上,以提高并发能力和"可用性",
##此值建议根据broker的个数、consumer的个数合理设定。
num.partitions=2
##log文件片段的最大尺寸,每个partition(逻辑上)的数据都会被写入到磁盘的
##log文件中(append only),此参数用于控制单个文件的大小.
## 1024*1024*1024,1G
##log.segment.bytes=

##log文件"sync"到磁盘之前累积的消息条数
##因为磁盘IO操作是一个慢操作,但又是一个"数据可靠性"的必要手段
##所以此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡.
##如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞)
##如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟.
##物理server故障,将会导致没有fsync的消息丢失.
##默认值为10000
log.flush.interval.messages=10000
##仅仅通过interval来控制消息的磁盘写入时机,是不足的.
##此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔
##达到阀值,也将触发.
log.flush.interval.ms=1000
#对某些特定的topic而言,重写log.flush.interval.messages属性
##log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000

######################
##是否自动创建topic
##如果broker中没有topic的信息,当producer/consumer操作topic时,是否自动创建.
##如果为false,则只能通过API或者command创建topic
auto.create.topics.enable=true
##partition leader与replicas之间通讯时,socket的超时时间
controller.socket.timeout.ms=30000
##partition leader与replicas数据同步时,消息的队列尺寸.
controller.message.queue.size=10
##partitions的"replicas"个数,不得大于集群中broker的个数
default.replication.factor=1
##partition Leader和follower通讯时,如果在此时间内,没有收到follower的"fetch请求"
##leader将会认为follower"失效",将不会与其同步消息.[follower主动跟随leader,并请求同步消息]
replica.lag.time.max.ms=10000
##如果follower落后与leader太多,将会认为此follower[或者说partition relicas]已经失效
##通常,在follower与leader通讯时,因为网络延迟或者链接断开,总会导致replicas中消息同步滞后
##如果消息之后太多,leader将认为此follower网络延迟较大或者消息吞吐能力有限,将会把此replicas迁移
##到其他follower中.
##在broker数量较少,或者网络不足的环境中,建议提高此值.
replica.lag.max.messages=4000
##follower与leader之间的socket超时时间
replica.socket.timeout.ms=30000
##1024*1024,follower每次fetch数据的最大尺寸
##没有意义的参数
replica.fetch.max.bytes=1048576
##当follower的fetch请求发出后,等待leader发送数据的时间.
##超时后,将会重新fetch.
replica.fetch.wait.max.ms=500
##fetch的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会阻塞,直到满足条件
replica.fetch.min.bytes=1
##follower中开启的fetcher线程数,增加此值可以提高数据同步到速度,但也额外的增加了leader的IO负荷.
num.replica.fetchers=1
###########################
##检测log文件的时间间隔
log.cleanup.interval.mins=1
##log文件被保留的时长,如果超过此时长,将会被清除,无论log中的消息是否被消费过.
log.retention.hours=168

 

    2.Consumer主要配置

 

##当前消费者的group名称,需要指定
##消息的消费进度,是根据group来划定的
group.id=
##consumer作为zookeeper client,需要通过zk保存一些meta信息,
##比如consumer消费的消息offset等.
##必须和broker使用同样的zk配置
zookeeper.connect=hostname1:port,hostname2:port2
zookeeper.session.timeout.ms=6000
zookeeper.connection.timeout.ms=6000
zookeeper.sync.time.ms=2000
##当前consumer的标识,可以设定,也可以有系统生成.
##主要用来跟踪消息消费情况,便于观察
conusmer.id=
##获取消息的最大尺寸,broker不会像consumer输出大于此值的消息chunk
##每次feth将得到多条消息,此值为总大小
##提升此值,将会消耗更多的consumer端内存
fetch.messages.max.bytes=1048576
##broker发送给consumer的最小数据尺寸,如果消息尺寸不足,将会等待,直到满足.
fetch.min.bytes=1
##当消息的尺寸不足时,server阻塞的时间,如果超时,消息将立即发送给consumer.
fetch.wait.max.ms=100
queued.max.message.chunks=10
##当有新的consumer加入到group时,将会reblance,此后将会有partitions的消费端迁移到新
##的consumer上,如果一个consumer获得了某个partition的消费权限,那么它将会向zk注册
##"Partition Owner registry"节点信息,但是有可能此时旧的consumer尚没有释放此节点,
##此值用于控制,注册节点的重试次数.
rebalance.max.retries=4
##当consumer消费一定量的消息之后,将会自动向zookeeper提交offset信息
##注意offset信息并不是每消费一次消息就向zk提交一次,而是现在本地保存(内存),并定期提交
auto.commit.enable=true
##自动提交的时间间隔,默认为1分钟.
auto.commit.interval.ms=60*1000

 

   3.Producer主要配置

 

##对于开发者而言,需要通过broker.list指定当前producer需要关注的broker列表
##producer通过和每个broker链接,并获取partitions,
##如果某个broker链接失败,将导致此上的partitons无法继续发布消息
##格式:host1:port,host2:port2,其中host:port需要参考broker配置文件.
##对于producer而言没有使用zookeeper自动发现broker列表,非常奇怪。(0.8V和0.7有区别)
metadata.broker.list=
##producer接收消息ack的时机.默认为0.
##0: producer不会等待broker发送ack
##1: 当leader接收到消息之后发送ack
##2: 当所有的follower都同步消息成功后发送ack.
request.required.acks=0
##在向producer发送ack之前,broker允许等待的最大时间
##如果超时,broker将会向producer发送一个error ACK.意味着上一次消息因为某种
##原因未能成功(比如follower未能同步成功)
request.timeout.ms=10000
##producer消息发送的模式,同步或异步.
##异步意味着消息将会在本地buffer,并适时批量发送
##默认为sync,建议async
producer.type=sync
##消息序列化类,将消息实体转换成byte[]
serializer.class=kafka.serializer.DefaultEncoder
key.serializer.class=${serializer.class}
##partitions路由类,消息在发送时将根据此实例的方法获得partition索引号.
##默认为消息的hashcode % partitions个数
partitioner.class=kafka.producer.DefaultPartitioner

##消息压缩算法,none,gzip,snappy
compression.codec=none
##消息在producer端buffer的条数.仅在producer.type=async下有效
batch.num.messages=200
##在async模式下,当message被缓存的时间超过此值后,将会批量发送给broker
##此值和batch.num.messages协同工作.
queue.buffering.max.ms=5000
##在async模式下,producer端允许buffer的最大消息量
##无论如何,producer都无法尽快的将消息发送给broker,从而导致消息在producer端大量沉积
##此时,如果消息的条数达到阀值,将会导致producer端阻塞或者消息被抛弃.
queue.buffering.max.messages=10000
##当消息在producer端沉积的条数达到"queue.buffering.max.meesages"后
##阻塞一定时间后,队列仍然没有enqueue(producer仍然没有发送出任何消息)
##此时producer可以继续阻塞或者将消息抛弃,此timeout值用于控制"阻塞"的时间
##-1: 无阻塞超时限制,消息不会被抛弃
##0:立即清空队列,消息被抛弃
queue.enqueue.timeout.ms=-1
##当producer接收到error ACK,或者没有接收到ACK时,允许消息重发的次数
##因为broker并没有完整的机制来避免消息重复,所以当网络异常时(比如ACK丢失)
##有可能导致broker接收到重复的消息.
message.send.max.retries=3
##producer刷新topic metada的时间间隔
##producer需要知道partition leader的位置,以及当前topic的情况
##因此producer需要一个机制来获取最新的metadata,当producer遇到特定错误时,将会立即刷新
##(比如topic失效,partition丢失,leader失效等),此外也可以通过此参数来配置额外的刷新机制
topic.metadata.refresh.interval.ms=600000

 

 

    broker配置文件请参考: kafka.server.KafkaConfig

    consumer配置文件请参考: kafka.consumer.ConsumerConfig

    producer配置文件请参考: kafka.producer.ProducerConfig

 

    【kafka部署与实践】

---END--

  • 大小: 8.5 KB
  • 大小: 19.1 KB
  • 大小: 26.2 KB
  • 大小: 131.2 KB
  • 大小: 8.2 KB
  • 大小: 13.2 KB
  • 大小: 12.9 KB
分享到:
评论
5 楼 hellojei 2016-10-24  
太棒了,看了两遍,顺便还写了个markdown笔记,附带原文链接了已经哈哈
4 楼 tivan 2014-07-26  
不错
3 楼 learned 2014-05-29  
分析的很专业...
2 楼 hongjn 2014-04-08  
非常赞啊。十分感谢。
1 楼 w926494698 2014-03-28  
nb太详细了

相关推荐

    kafka-client-0.8.2.1-jdk-1.5

    org.apache.kafka kafka-clients-jdk15 0.8.2.1 cd example; mvn package This will build a simple console producer. 已知问题 不支持SNAPPY和LZ4压缩方式 不支持jdk1.6之后添加的TimeUnit.MINUETS, TimeUnit....

    apache-kafka-book-examples, 固定和更新的代码示例"Apache Kafka".zip

    apache-kafka-book-examples, 固定和更新的代码示例"Apache Kafka" 固定和更新的代码示例 book apache"更新到 Apache Kafka 0.8.1.1配置优化以在 Windows 机器上使用Windows 批处理脚本固定( 由 ...

    Confluent的Apache Kafka Python客户端-python

    = v0.8、Confluent Cloud 和 Confluent 平台兼容的高级生产者、消费者和 AdminClient。 客户端是: 可靠的 - 它是 librdkafka 的包装器(通过二进制轮自动提供),广泛部署在各种生产场景中。 它使用与 Java ...

    libkafka:适用于Apache Kafka v0.8 +的C ++客户端库。 还包括C API

    适用于Apache Kafka v0.8 +的C ++客户端库。 还包括C API。 带有完整的单元测试套件。 支持以下Kafka API请求/响应调用: 元数据 生产 拿来 抵消 包括对使用GZIP或Snappy压缩的MessageSet压缩的支持。 由于线路...

    Sarama是Apache Kafka 0.8及更高版本的Go库。-Golang开发

    sarama Sarama是MIT许可的Apache Kafka 0.8版(及更高版本)的Go客户端库。 可通过godoc获得入门API文档和示例。 模拟sarama中提供了用于测试的模拟程序。Sarama是MIT许可的Apache Kafka版本0.8(及更高版本)的Go...

    Apache Kafka 版本演进及特性介绍

    虽然仍有一部分Kafka的老用户在使用 0.8.x 版本,但 Kafka 0.8.x 确实是比较老的版本了。如果不是对Kafka非常熟悉,很容易忽略各个版本之间的差异,也不会清楚某个版本的特点及使用方式。本文我们就一起学习下Kafka...

    kafkapython教程-Kafka快速入门(十二)-Python客户端.pdf

    Kafka快速⼊门(⼗⼆)——Python客户端 Kafka快速⼊门(⼗⼆)——Python客户端 ⼀、confluent-kafka 1、confluent-kafka简介 confluent-kafka是Python模块,是对librdkafka的轻量级封装,⽀持Kafka 0.8以上版本。...

    sarama:Sarama 是 Apache Kafka 0.8 及更高版本的 Go 库-开源

    Sarama 是 MIT 许可的 Go 客户端库,适用于 Apache Kafka 0.8 版(及更高版本)。 API 文档和示例可通过 godoc 获得。 Sarama 提供“2 个版本 + 2 个月”的兼容性保证:我们支持 Kafka 和 Go 两个最新的稳定版本,...

    kafka-storm-starter:代码示例显示了如何将Apache Kafka 0.8+与Apache Storm 0.9+和Apache Spark Streaming 1.1+集成在一起,同时使用Apache Avro作为数据序列化格式

    kafka-storm-starter:代码示例显示了如何将Apache Kafka 0.8+与Apache Storm 0.9+和Apache Spark Streaming 1.1+集成在一起,同时使用Apache Avro作为数据序列化格式

    Apache Kafka的Python客户端-Python开发

    kafka-python的设计功能与官方的Java客户端非常相似,并且散布了pythonic接口(例如,用于Apache Kafka分布式流处理系统的co Kafka Python客户端Python客户端。客户端,并带有大量的pythonic接口(例如,消费者迭代...

    flume-ng-kafka-sink:将数据发布到 Apache Kafka 的 Apache Flume Sink 实现

    Flume-NG-Kafka-Sink 这是一个 Sink 实现,可以将数据发布到主题。 目标是将 Flume 与 Kafka 集成,以便基于拉式的处理系统... Apache Kafka - 0.8.1.1 先决条件 Java 1.6 或更高版本 安装(请参阅上面的依赖版本)

    Packt.Learning.Apache.Kafka.2nd.Edition

    how to install and build Kafka 0.8.x using different versions of Scala. Chapter 2, Setting Up a Kafka Cluster, describes the steps required to set up a single- or multi-broker Kafka cluster and shares...

    kafka-node:适用于Apache Kafka 0.8及更高版本的Node.js客户端

    Kafka-node是Apache Kafka 0.9及更高版本的Node.js客户端。 目录 抵消 行政 故障排除/常见问题解答 首次发送时出现KeyedPartitioner错误的HighLevelProducer 如何调试问题? 对于新使用者,如何从分区中的最新...

    kafka-test-app

    curl -O http://apache.arvixe.com/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz tar xzf kafka_2.10-0.8.2.1.tgz cd kafka_2.10-0.8.2.1 sbt update sbt package ## Start zookeeper and kafka server 启动zookeepker,...

    kafkatcl:Apache Kafka 分布式消息系统的 Tcl 接口

    KafkaTcl,Apache Kafka 分布式消息系统的 Tcl 接口KafkaTcl 为 Kafka C 语言 API“librdkafka”提供了一个 Tcl 接口。功能提供自然的 Tcl 接口快速地异步线程安全自由!执照在许可的伯克利版权下开源,请参阅文件 ...

    logkafka:收集日志并将行发送到Apache Kafka

    logkafka-收集日志并将行发送到Apache Kafka 0.8+简介logkafka逐行将日志文件内容发送到kafka 0.8。 它将一行文件视为一条kafka消息。 如果您想将其部署在生产环境中,请参阅。特征Zookeeper进行日志收集配置管理[带...

    kafka-template-examples:kafka生产者消费者模板scala、java版例子

    cubeanalyse kafka examples 可以用来作为编写Kafka producer和consumer的模板。 1.如果是mvn项目pom依赖: org.apache.kafka kafka-clients 0.8.2.1 ..."org.apache.kafka" %% "kafka" % "0.8.2.1")

    phpkafka:Apache Kafka 的 PHP 扩展

    Apache Kafka 0.8 的PHP 扩展。 它建立在 kafka C 驱动程序 ( )。 它通过非阻塞调用与 kafka 代理建立持久连接,因此它应该非常快。 重要提示:库正在大量开发中,某些功能尚未实现。要求: 下载并安装 。 运行sudo...

    brod:适用于ErlangElixir的Apache Kafka客户端库

    特征支持Apache Kafka v0.8 + 可靠的生产者实现,支持运行中的请求和异步确认消费者和生产者都可以在内部处理领导者连任和其他集群干扰每个brod_client最多打开1个TCP连接到一个代理,如果需要,一个代理可以创建更...

    kafka连接池_python版本

    kafka连接池_python版本...采用连接池思想,抽取一种连接池的方式,连接池是采用Apache pool作为池管理,然后将生产者的连接点放到池中,在编译时需注意kafka版本问题以及所对应的scala,kafka版本是kafka_2.10-0.8.2.1

Global site tag (gtag.js) - Google Analytics