Kafka 是一个分布式流处理平台,可以用来处理大量数据流。本教程将介绍 Kafka API 的基本使用方法。

快速开始

要开始使用 Kafka API,你需要先了解以下几个概念:

  • Producer:生产者,负责向 Kafka 主题发送消息。
  • Consumer:消费者,负责从 Kafka 主题读取消息。
  • Broker:代理,Kafka 集群中的服务器,负责存储和处理消息。
  • Topic:主题,Kafka 中消息的分类,类似于数据库中的表。

1. 创建 Kafka 主题

首先,你需要创建一个 Kafka 主题。以下是一个创建主题的示例代码:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

AdminClient adminClient = AdminClient.create(props);
NewTopic newTopic = new NewTopic("test-topic", 1, (short) 1);
adminClient.createTopics(Arrays.asList(newTopic));
adminClient.close();

2. 发送消息

接下来,你可以使用 Producer 向主题发送消息:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, String>("test-topic", "key", "value"));
producer.close();

3. 读取消息

最后,你可以使用 Consumer 从主题读取消息:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));
while (true) {
    ConsumerRecord<String, String> record = consumer.poll(Duration.ofMillis(100));
    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
consumer.close();

扩展阅读

想要了解更多关于 Kafka 的信息,可以访问我们的 Kafka 教程 页面。

Kafka Architecture