1.2 Broker 1.3 Producer 1.4 Consumer 1.5 ZooKeeper import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class KafkaProducerDemo { public static void main(String[] args) { Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", Integer.toString(i), Integer.toString(i)); producer.send(record); } producer.close(); } } import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Properties; public class KafkaConsumerDemo { public static void main(String[] args) { Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("group.id", "my_consumer_group"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); Consumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("my_topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println("Received message: key = " + record.key() + ", value = " + record.value()); } } } }


上一篇:
下一篇:
切换中文