配置

Kafka 使用 property file 格式 键值对进行配置。这些数值可以通过文件或者编程形式指定。

核心的必要配置信息如下:

  • broker.id

  • log.dirs

  • zookeeper.connect

Topic 级别的配置和默认值在下面 进行更深入的讨论。

更多关于 broker 的配置信息可以在 scala 类 kafka.server.KafkaConfig 中找到。

Topic 级别配置 与 Topic 相关的配置既包含服务器默认值,也包含可选的每个 Topic 覆盖值。如果没有给出 Topic 单独的配置,那么服务器全局配置就会被使用。Topic 的配置可以在创建时通过一个或者多个 --config 选项来提供。本示例使用自定义的最大消息大小和刷新率创建一个名为 my-topic 的 topic:

 > bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1
        --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1

也可以在使用 alter configs 命令稍后更改或设置覆盖值。本示例重置 my-topic 的最大消息的大小:

 > bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic
    --config max.message.bytes=128000

移除一个设置可以通过如下方法:

 > bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic
    --delete-config max.message.bytes

下面是 Topic 级别配置,服务器默认属性列是该属性的默认配置,修改该属性的服务器级别设置可以用来改变 Topic 的属性。

下面是 Java 生产者的配置:

For those interested in the legacy Scala producer configs, information can be found here.

We introduce both the old 0.8 consumer configs and the new consumer configs respectively below.

The essential old consumer configurations are the following:

  • group.id

  • zookeeper.connect

  • smallest : automatically reset the offset to the smallest offset

  • largest : automatically reset the offset to the largest offset

  • anything else: throw exception to the consumer | | consumer.timeout.ms | -1 | Throw a timeout exception to the consumer if no message is available for consumption after the specified interval | | exclude.internal.topics | true | Whether messages from internal topics (such as offsets) should be exposed to the consumer. | | client.id | group id value | The client id is a user-specified string sent in each request to help trace calls. It should logically identify the application making the request. | | zookeeper.session.timeout.ms | 6000 | ZooKeeper session timeout. If the consumer fails to heartbeat to ZooKeeper for this period of time it is considered dead and a rebalance will occur. | | zookeeper.connection.timeout.ms | 6000 | The max time that the client waits while establishing a connection to zookeeper. | | zookeeper.sync.time.ms | 2000 | How far a ZK follower can be behind a ZK leader | | offsets.storage | zookeeper | Select where offsets should be stored (zookeeper or kafka). | | offsets.channel.backoff.ms | 1000 | The backoff period when reconnecting the offsets channel or retrying failed offset fetch/commit requests. | | offsets.channel.socket.timeout.ms | 10000 | Socket timeout when reading responses for offset fetch/commit requests. This timeout is also used for ConsumerMetadata requests that are used to query for the offset manager. | | offsets.commit.max.retries | 5 | Retry the offset commit up to this many times on failure. This retry count only applies to offset commits during shut-down. It does not apply to commits originating from the auto-commit thread. It also does not apply to attempts to query for the offset coordinator before committing offsets. i.e., if a consumer metadata request fails for any reason, it will be retried and that retry does not count toward this limit. | | dual.commit.enabled | true | If you are using "kafka" as offsets.storage, you can dual commit offsets to ZooKeeper (in addition to Kafka). This is required during migration from zookeeper-based offset storage to kafka-based offset storage. With respect to any given consumer group, it is safe to turn this off after all instances within that group have been migrated to the new version that commits offsets to the broker (instead of directly to ZooKeeper). | | partition.assignment.strategy | range | Select between the "range" or "roundrobin" strategy for assigning partitions to consumer streams.The round-robin partition assignor lays out all the available partitions and all the available consumer threads. It then proceeds to do a round-robin assignment from partition to consumer thread. If the subscriptions of all consumer instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts will be within a delta of exactly one across all consumer threads.) Round-robin assignment is permitted only if: (a) Every topic has the same number of streams within a consumer instance (b) The set of subscribed topics is identical for every consumer instance within the group.Range partitioning works on a per-topic basis. For each topic, we lay out the available partitions in numeric order and the consumer threads in lexicographic order. We then divide the number of partitions by the total number of consumer streams (threads) to determine the number of partitions to assign to each consumer. If it does not evenly divide, then the first few consumers will have one extra partition. |

More details about consumer configuration can be found in the scala classkafka.consumer.ConsumerConfig.

Since 0.9.0.0 we have been working on a replacement for our existing simple and high-level consumers. The code is considered beta quality. Below is the configuration for the new consumer:

Below is the configuration of the Kafka Connect framework.

Below is the configuration of the Kafka Streams client library.

Last updated