class Consumer {
/**
* Create a ConsumerConnector
* 创建一个 ConsumerConnector
*
* @param config at the minimum, need to specify the groupid of the consumer and the zookeeper
* connection string zookeeper.connect.
* 配置参数最少要设置此消费者的 groupid 和 Zookeeper 的连接字符串 Zookeeper.connect
*/
public static kafka.javaapi.consumer.ConsumerConnector createJavaConsumerConnector(ConsumerConfig config);
}
/**
* V: type of the message 消息的类型
* K: type of the optional key associated with the message 消息可选的 key 的类型
*/
public interface kafka.javaapi.consumer.ConsumerConnector {
/**
* Create a list of message streams of type T for each topic.
* 为每个 topic 创建一个 T 类型的消息流
*
* @param topicCountMap a map of (topic, #streams) pair
* (topic, #streams) 对的 Map
* @param decoder a decoder that converts from Message to T
* 将消息转换为 T 类型的解码器
* @return a map of (topic, list of KafkaStream) pairs.
* The number of items in the list is #streams. Each stream supports
* an iterator over message/metadata pairs.
* 返回一个 (topic, KafkaStream 列表)对的 Map。list 的元素个数为#streams。每个 stream 都支持一个对 message/metadata 对的迭代器。
*/
public <K,V> Map<String, List<KafkaStream<K,V>>>
createMessageStreams(Map<String, Integer> topicCountMap, Decoder<K> keyDecoder, Decoder<V> valueDecoder);
/**
* Create a list of message streams of type T for each topic, using the default decoder.
* 使用默认的解码器为每个 topic 创建一个 T 类型的消息流
*/
public Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(Map<String, Integer> topicCountMap);
/**
* Create a list of message streams for topics matching a wildcard.
* 为符合通配符的 topics 创建一个消息流列表
*
* @param topicFilter a TopicFilter that specifies which topics to
* subscribe to (encapsulates a whitelist or a blacklist).
* 指明哪些 topic 被订阅的 topic 过滤器(封装一个白名单或者黑名单)
* @param numStreams the number of message streams to return.
* 将返回的消息流的数量
* @param keyDecoder a decoder that decodes the message key
* 用于解码消息键的解码器
* @param valueDecoder a decoder that decodes the message itself
* 解码消息的解码器
* @return a list of KafkaStream. Each stream supports an
* iterator over its MessageAndMetadata elements.
* KafkaStream 的列表。每个流支持一个遍历消息及元数据元素的迭代器
*/
public <K,V> List<KafkaStream<K,V>>
createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams, Decoder<K> keyDecoder, Decoder<V> valueDecoder);
/**
* Create a list of message streams for topics matching a wildcard, using the default decoder.
* 使用默认的解码器为符合通配符的 topic 创建消息流列表
*/
public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);
/**
* Create a list of message streams for topics matching a wildcard, using the default decoder, with one stream.
* 使用一个流和默认的解码器为符合通配符的 topic 创建消息流列表
*/
public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter);
/**
* Commit the offsets of all topic/partitions connected by this connector.
* 提交连接到这个连接器的所有 topic/partition 的偏移量
*/
public void commitOffsets();
/**
* Shut down the connector
* 关闭这个连接器
*/
public void shutdown();
}
class kafka.javaapi.consumer.SimpleConsumer {
/**
* Fetch a set of messages from a topic.
* 从一个 topic 上拉取抓取一堆消息
*
* @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.
* request 指定 topic 名称,topic 分区,起始的比特偏移量,最大的抓取的比特量
* @return a set of fetched messages
* 抓取的消息集合
*/
public FetchResponse fetch(kafka.javaapi.FetchRequest request);
/**
* Fetch metadata for a sequence of topics.
* 抓取一个 topic 序列的元数据
*
* @param request specifies the versionId, clientId, sequence of topics.
* request 指明 versionId,clientId,topic 序列
* @return metadata for each topic in the request.
* request 中的每个 topic 的元数据
*/
public kafka.javaapi.TopicMetadataResponse send(kafka.javaapi.TopicMetadataRequest request);
/**
* Get a list of valid offsets (up to maxSize) before the given time.
* 获取一个在指定时间前有效偏移量(到最大数值)的列表
*
* @param request a [[kafka.javaapi.OffsetRequest]] object.
* 一个 [[kafka.javaapi.OffsetRequest]] 对象
* @return a [[kafka.javaapi.OffsetResponse]] object.
* 一个 [[kafka.javaapi.OffsetResponse]] 对象
*/
public kafka.javaapi.OffsetResponse getOffsetsBefore(OffsetRequest request);
/**
* Close the SimpleConsumer.
* 关闭 SimpleConsumer
*/
public void close();
}
对于大多数应用,高层的消费者 Api 已经足够优秀了。一些应用需求的特性还没有暴露给高层消费者(比如在重启消费者时设置初始的 offset)。它们可以取代我们的底层 SimpleConsumer Api。这个逻辑可能更复杂一点,你可以参照这个示例。