classConsumer { /** * Create a ConsumerConnector * 创建一个 ConsumerConnector * * @param config at the minimum, need to specify the groupid of the consumer and the zookeeper * connection string zookeeper.connect. * 配置参数最少要设置此消费者的 groupid 和 Zookeeper 的连接字符串 Zookeeper.connect */publicstatickafka.javaapi.consumer.ConsumerConnectorcreateJavaConsumerConnector(ConsumerConfig config);}/** * V: type of the message 消息的类型 * K: type of the optional key associated with the message 消息可选的 key 的类型 */publicinterfacekafka.javaapi.consumer.ConsumerConnector { /** * Create a list of message streams of type T for each topic. * 为每个 topic 创建一个 T 类型的消息流 * * @param topicCountMap a map of (topic, #streams) pair * (topic, #streams) 对的 Map * @param decoder a decoder that converts from Message to T * 将消息转换为 T 类型的解码器 * @return a map of (topic, list of KafkaStream) pairs. * The number of items in the list is #streams. Each stream supports * an iterator over message/metadata pairs. * 返回一个 (topic, KafkaStream 列表)对的 Map。list 的元素个数为#streams。每个 stream 都支持一个对 message/metadata 对的迭代器。 */public <K,V> Map<String,List<KafkaStream<K,V>>>createMessageStreams(Map<String,Integer> topicCountMap,Decoder<K> keyDecoder,Decoder<V> valueDecoder); /** * Create a list of message streams of type T for each topic, using the default decoder. * 使用默认的解码器为每个 topic 创建一个 T 类型的消息流 */publicMap<String,List<KafkaStream<byte[],byte[]>>> createMessageStreams(Map<String,Integer> topicCountMap); /** * Create a list of message streams for topics matching a wildcard. * 为符合通配符的 topics 创建一个消息流列表 * * @param topicFilter a TopicFilter that specifies which topics to * subscribe to (encapsulates a whitelist or a blacklist). * 指明哪些 topic 被订阅的 topic 过滤器(封装一个白名单或者黑名单) * @param numStreams the number of message streams to return. * 将返回的消息流的数量 * @param keyDecoder a decoder that decodes the message key * 用于解码消息键的解码器 * @param valueDecoder a decoder that decodes the message itself * 解码消息的解码器 * @return a list of KafkaStream. Each stream supports an * iterator over its MessageAndMetadata elements. * KafkaStream 的列表。每个流支持一个遍历消息及元数据元素的迭代器 */public <K,V> List<KafkaStream<K,V>> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams, Decoder<K> keyDecoder, Decoder<V> valueDecoder);
/** * Create a list of message streams for topics matching a wildcard, using the default decoder. * 使用默认的解码器为符合通配符的 topic 创建消息流列表 */publicList<KafkaStream<byte[],byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter,int numStreams); /** * Create a list of message streams for topics matching a wildcard, using the default decoder, with one stream. * 使用一个流和默认的解码器为符合通配符的 topic 创建消息流列表 */publicList<KafkaStream<byte[],byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter); /** * Commit the offsets of all topic/partitions connected by this connector. * 提交连接到这个连接器的所有 topic/partition 的偏移量 */publicvoidcommitOffsets(); /** * Shut down the connector * 关闭这个连接器 */publicvoidshutdown();}
class kafka.javaapi.consumer.SimpleConsumer {
/**
* Fetch a set of messages from a topic.
* 从一个 topic 上拉取抓取一堆消息
*
* @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.
* request 指定 topic 名称,topic 分区,起始的比特偏移量,最大的抓取的比特量
* @return a set of fetched messages
* 抓取的消息集合
*/
public FetchResponse fetch(kafka.javaapi.FetchRequest request);
/**
* Fetch metadata for a sequence of topics.
* 抓取一个 topic 序列的元数据
*
* @param request specifies the versionId, clientId, sequence of topics.
* request 指明 versionId,clientId,topic 序列
* @return metadata for each topic in the request.
* request 中的每个 topic 的元数据
*/
public kafka.javaapi.TopicMetadataResponse send(kafka.javaapi.TopicMetadataRequest request);
/**
* Get a list of valid offsets (up to maxSize) before the given time.
* 获取一个在指定时间前有效偏移量(到最大数值)的列表
*
* @param request a [[kafka.javaapi.OffsetRequest]] object.
* 一个 [[kafka.javaapi.OffsetRequest]] 对象
* @return a [[kafka.javaapi.OffsetResponse]] object.
* 一个 [[kafka.javaapi.OffsetResponse]] 对象
*/
public kafka.javaapi.OffsetResponse getOffsetsBefore(OffsetRequest request);
/**
* Close the SimpleConsumer.
* 关闭 SimpleConsumer
*/
public void close();
}
对于大多数应用,高层的消费者 Api 已经足够优秀了。一些应用需求的特性还没有暴露给高层消费者(比如在重启消费者时设置初始的 offset)。它们可以取代我们的底层 SimpleConsumer Api。这个逻辑可能更复杂一点,你可以参照这个示例。