腾讯云消息队列Kafka代码接入指南

身份认证:依照公有云安全标准,客户端与服务端的通信,需要通过OAC的ak/sk进行身份认证,kafka使用SASL+JAAS插件认证机制,传递ak和加签信息,进行身份认证。
 
环境要求
 
JAVA 1.7 U51以上版本,推荐使用JAVA 1.8
KAFKA服务端目前是1.1.0版本
KAFKA客户端推荐1.1.0版本,至少需要大于等于0.11以上版本,并支持1.1.0的服务端版本

集成Kafka
 
1.  引入kafka-clients.jar、kafka-clients-security-1.1.0.jar到项目工程,并将依赖包也一起引入项目工程,KAFKA客户端可以使用从官网下载的jar包,版本需要大于等于0.11,并支持kafka1.1.0服务端。
 
注:获取服务接入点请参考操作指南。
 
2.  创建kafka_jaas.conf文件。
 
KafkaClient {
    org.apache.kafka.common.security.oac.OacLoginModule required
    AccessKey=""
    SecretKey="";
};
 
 注:AccessKey & SecretKey请登录公有云控制台获取。
 
3. 设置security.protocol和sasl.mechanism属性。
 
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
 
依赖包
 
<dependencies>
  <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>1.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients-security</artifactId>
        <version>1.1.0</version>
    </dependency>
</dependencies>
 
发送消息
 
示例代码
 
public void testProducer() {
    Properties props = new Properties();
    String topic = "test2";
    props.put("bootstrap.servers", "10.42.8.222:9092");// 此处填写接入点服务地址,接入点获取请点击操作指南
    props.put("security.protocol", "SASL_PLAINTEXT");
    props.put("sasl.mechanism", "PLAIN");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    Producer<String, String> producer = new KafkaProducer<>(props);
    for (int i = 0; i < 50; i++) {
        producer.send(new ProducerRecord<String, String>(topic, topic + "-K-" + Integer.toString(i), topic + "-V-" + Integer.toString(i)));
    }
    producer.close();
}
 
消费消息
 
示例代码
 
public static void testConsumer() {
    Properties props = new Properties();
    String topic = "test2";
    props.put("bootstrap.servers", "localhost:9092");//// 此处填写接入点服务地址,接入点获取请点击操作指南
    props.put("security.protocol", "SASL_PLAINTEXT");
    props.put("sasl.mechanism", "PLAIN");
    props.put("group.id", "szx_test");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList(topic));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            log.info("offset = " + record.offset() + ", key = " + record.key() + ", value = " + record.value());
        }
    }
}

标签