Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); producer.send(new ProducerRecord<>("my_topic", "key", "value")); producer.close(); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("group.id", "my_group"); Consumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my_topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.println(record.value()); } } consumer.close(); props.put("bootstrap.servers", "kafka-server1:9092,kafka-server2:9092,kafka-server3:9092"); props.put("num.partitions", 5);


上一篇:
下一篇:
切换中文