SparkConf conf = new SparkConf() JavaSparkContext sc = new JavaSparkContext(conf); Map<String, String> kafkaParams = new HashMap<>(); JavaInputDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));


上一篇:
下一篇:
切换中文