> bin/zookeeper-server-start.sh config/zookeeper.properties[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)...
然后开始启动 Kafka 服务器:
> bin/kafka-server-start.sh config/server.properties[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)...
KTable wordCounts = textLines
// 按照空格将每个文本行拆分成单词
// Split each text line, by whitespace, into words.
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
// 确保每个单词作为记录的 key 值以便于下一步的聚合
// Ensure the words are available as record keys for the next aggregate operation.
.map((key, value) -> new KeyValue<>(value, value))
// 计算每个单词的出现频率并将他们保存到“Counts”的表中
// Count the occurrences of each word (record key) and store the results into a table named "Counts".
.countByKey("Counts")
> echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt