SparkConf conf = new SparkConf()
JavaSparkContext sc = new JavaSparkContext(conf);
Map<String, String> kafkaParams = new HashMap<>();
JavaInputDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));