1 25 4.0.0 6 7com.ht 8kafkatest 91.0-SNAPSHOT 1011 22 23 2412 2113 20org.apache.maven.plugins 14maven-compiler-plugin 1516 1925 3126 30org.apache.kafka 27kafka-clients 280.10.0.0 29
java 代码
1 import org.apache.kafka.clients.CommonClientConfigs; 2 import org.apache.kafka.clients.consumer.ConsumerRecord; 3 import org.apache.kafka.clients.consumer.ConsumerRecords; 4 import org.apache.kafka.clients.consumer.KafkaConsumer; 5 6 import java.util.Collections; 7 import java.util.Properties; 8 9 import static org.apache.kafka.clients.consumer.ConsumerConfig.*;10 11 /**12 * @author sunzq13 * @since 2017/8/2914 */15 public class Application {16 public static void main(String[] args) {17 18 Properties props = new Properties();19 props.put(BOOTSTRAP_SERVERS_CONFIG, "node1:6667,node2:6667,node3:6667,node4:6667");20 props.put(ENABLE_AUTO_COMMIT_CONFIG, "true");21 props.put(GROUP_ID_CONFIG, "test08291103");22 // props.put(ConsumerConfig.CLIENT_ID_CONFIG, "test0829");23 props.put(AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);24 props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");25 props.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");26 props.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");27 props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");28 29 KafkaConsumerconsumer = new KafkaConsumer<>(props);30 // topic name: test931 consumer.subscribe(Collections.singleton("test9"));32 while (true) {33 ConsumerRecords records = consumer.poll(100);34 for (ConsumerRecord record : records)35 System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());36 }37 }38 }
启动参数
-Djava.security.krb5.conf=c:\\app\\conf\\krb5.conf -Djava.security.auth.login.config=c:\\app\\conf\\kafka_jaas.conf
windows 下记得用 \\