🎉 Apache Kafka 快速入门指南

1. 环境准备

✅ 步骤一:安装 Java 1.8+
✅ 步骤二:下载 Kafka 发行包 点击下载
✅ 步骤三:配置环境变量 KAFKA_HOME 并添加到 PATH

2. 基础操作

启动服务


bin/zookeeper-server-start.sh config/zookeeper.properties

# 启动 Kafka 服务
bin/kafka-server-start.sh config/server.properties
Kafka_JieGou

创建主题

bin/kafka-topics.sh --create --topic test_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

3. 生产者示例

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class ProducerExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        ProducerRecord<String, String> record = new ProducerRecord<>("test_topic", "Hello Kafka!");

        producer.send(record).get();
        producer.close();
    }
}
Producer_Example

4. 消费者实现

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class ConsumerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("test_topic"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("收到消息: key=%s, value=%s%n", record.key(), record.value());
            }
        }
    }
}
Consumer_Example

5. 扩展学习

🔗 深入理解 Kafka 配置参数
🔗 Kafka 高级生产者实践