腾讯云消息队列 CKafka购买和使用

腾讯云消息队列CKafka是一个拥有分布式的、高吞吐量、可扩展性,支持日志收集、监控数据聚合、流式数据处理、在线和离线分析的消息队列服务。
腾讯云消息队列 CKafka开通购买地址 https://cloud.tencent.com/product/ckafka
腾讯云消息队列 CKafka最新官方入门文档 https://cloud.tencent.com/product/ckafka/getting-started
 
腾讯云消息队列CKafka针对开源社区的 CKafka 提供全托管服务,可选的存储和带宽资源,使得腾讯云消息队列CKafka根据需求申请购买,只需专注于业务开发,不用考虑部署运维,低成本、更弹性、更可靠。无论您的业务部署在云上还是云下,用户在控制台创建队列和消费组,即可通过客户端生产与消费消息

本文旨在以最简单明了的方式引导您快速上手消息队列CKafka,为进一步熟悉和使用消息队列 CKafka 的功能提供入门。快速接入消息队列 CKafka 服务的步骤如下:

腾讯云消息队列CKafka购买
登录腾讯云,选择消息队列CKafka,进入消息队列CKafka平台,点击立即购买

创建腾讯云消息队列CKafka实例
1、用户点击立即购买,页面跳转至购买CKafka实例的页面,选择不同规格资源后点击确认完成支付,用户需要依次选择流量峰值、存储容量。如果想清除已选配置,请点击清除配置
2、选择完规格后,点击确认订单,显示确认订单界面界面,查看已选配置是否正确,确认后,点击去支付按钮。在支付界面选择支付类型,支付成功后,跳转到支付成功界面。如果余额不足,请点击去充值
 
参数 说明
地域 实例所在的地理位置。购买后无法更换地域。
不同地域的实例之间内网互不相通;选择靠近您的地域,可降低网络时延、提高您的访问速度 。
实例类型 即Kafka类型:VPC实例。
流量峰值 支持20MB/s、30MB/s
买时请按照读流量与写流量之和为购买规格。
举例:实际写流量峰值 10MB/s,读流量峰值 20MB/s,10MB/s+20MB/s 则以 30MB/s 规格购买。
为了业务的稳定性,建议购买大于实际流量 30% 左右的余量作为 buffer。
数据盘类型 高性能HDD云硬盘
存储容量 300G、600G
网络配置 专有网络(推荐):也称为VPC(Virtual Private Cloud)。VPC是一种隔离的网络环境
安全性和性能均高于传统的经典网络。
购买量 根据时间选择购买量:1月,2月,3月,6月,1年,2年,3年
 
腾讯云消息队列界面包括集群ID/名称,运行状态,实例类型,付费类型,创建时间,操作。可以按照集群ID/名称进行搜索,创建失败的实例用户可自由选择删除该列表
续费
续费入口:只有创建成功且运行状态为“服务中”的实例,才有允许续费,点击对应实例的续费功能

Topic管理
在实例列表界面选择一个实例,单击名称,进入实例详情界面,选择Topic管理列表,进入Topic管理界面,显示信息包括:Topic名称、分区数、副本数、消息保留时长(小时)、同步落盘、同步复制、操作。Topic可以按照Topic名称进行搜索

创建 Topic
在实例列表界面选择一个实例,单击名称,进入实例详情界面,选择Topic管理。单击新建,进入Topic创建界面
 
 
 
参数 说明
Topic说明 1. Topic 名称只能包含字母,数字,下划线(_)和短横线(-) 2名称长度限制在 3-64 字节之间,长于 64 字节将被自动截取
3. 一旦创建后不能再修改 Topic 名称
分区数 您可以设置Topic的分区数,分区数越大消费的并发度越大。该参数设置为1时,消费消息时会按照先入先出的顺序进行消费
取值范围:1-20,默认值:3
副本数 您可以为每个Topic设置副本的数量,Kafka会自动在每个副本上备份数据,当其中一个Broker节点故障时数据依然是可用的
副本数越大可靠性越高。该参数设置为1时,表示只有一份数据。取值范围:1-3,默认值:3
消息保留时长 Topic中的消息超过老化时间后,消息将会被删除,老化的消息无法被消费。取值范围:1-168,默认值:72
同步复制 开启同步复制后,需要在客户端配置acks=-1,否则无效
同步落盘 同步落盘是指生产的每条消息都会立即写入磁盘。开启:生产的每条消息都会立即写入磁盘,可靠性更高。
关闭:生产的消息存在内存中,不会立即写入磁盘
 
点击确定按钮,提示创建Topic成功,查看Topic创建情况
 
删除 Topic
在实例列表界面选择一个实例,单击名称,进入实例详情界面,选择Topic管理。选择对应Topic名称行的删除功能,删除单个Topic。勾选多个Topic、或全选按钮,删除多个Topic
 
连接CKafka集群
创建主机连接CKafka
登录腾讯云系统,选择计算-弹性云主机,点击新建。选择与创建消息队列CKafka一致的区域创建主机,选择对应规格,镜像系统与CKafka系统一致,目前为centos 7.3即可,可选挂载存储。点击确认订单,返回后查看主机状态,选择控制台,进入主机,登录命令窗口,下载CKafka客户端,执行CKafka命令,命令中IP对应为CKafka集群中实例详情界面的IP
 
生产消费Demo演示
通过maven引入jar包依赖
 
maven依赖
 
<dependencies>
  <!-- https://mvnrepository.com/artifact/org.apache.CKafka/CKafka -->
  <dependency>
      <groupId>org.apache.CKafka</groupId>
      <artifactId>CKafka_2.11</artifactId>
      <version>0.10.0.0</version>
  </dependency>
</dependencies>
写公共配置类
 
BigDataConfig
 
public class BigDataConfig {
    /**
    * CKafka 的 topic
    */
    public final static String TOPIC="string112";
 
    /**
    * CKafka broker list
    * 此处用来修改CKafka 的连接地址
    */
 
    public final static String BROKER_URL = "10.0.44.65:9092";
    /**
    * 消费组Id
    */
    public final static String GROUP_ID="tj-group22";
    /**
    * 客户连接 Id
    * 对数据不产生影响,对CKafka而言,只关注 groupId
    */
    public final static String CLIENT_ID="TJclient2";
    /**
    *  CKafka message->key
    *  对生产消费消息,没有影响,可暂时忽略
    */
 
    public final static String KEY="generator";
}
生产者逻辑
 
Producer
 
public class Producer extends Thread {
    private final CKafkaProducer<Integer, String> producer;
    private final String topic;
    private final Boolean isAsync;
 
    public Producer(String topic, Boolean isAsync) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BigDataConfig.BROKER_URL);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producer = new CKafkaProducer<>(props);
        this.topic = topic;
        this.isAsync = isAsync;
    }
public void run() {
        int messageNo = 1;
        while (true) {
            String messageStr = "Message_" + messageNo;
            long startTime = System.currentTimeMillis();
            if (isAsync) {
                producer.send(new ProducerRecord<>(topic,
                        messageNo,
                        messageStr), new DemoCallBack(startTime, messageNo, messageStr));
            } else {
                try {
                    producer.send(new ProducerRecord<>(topic,
                            messageNo,
                            messageStr)).get();
                    System.out.println("Sent message to topic [ "+ topic+ " ] -----> (" + messageNo + ", " + messageStr + ")");
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
            ++messageNo;
        }
    }
}
 
class DemoCallBack implements Callback {
 
    private final long startTime;
    private final int key;
    private final String message;
 
    public DemoCallBack(long startTime, int key, String message) {
        this.startTime = startTime;
        this.key = key;
        this.message = message;
    }
 
    /**
    * 消息发送成功后的回调方法
    */
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        long elapsedTime = System.currentTimeMillis() - startTime;
        if (metadata != null) {
            String topic = metadata.topic();
            System.out.println(
                    "Producer send to topic 【 "+topic+" 】  message ------>(key: " + key + ",  value: " + message + " ) sent to partition(" + metadata.partition() +
                            "), " +
                            "offset( " + metadata.offset() + " ) 耗时  " + elapsedTime + " 毫秒");
        } else {
            exception.printStackTrace();
        }
    }
}
消费者逻辑
 
Consumer
 
public class Consumer extends ShutdownableThread {
    private final CKafkaConsumer<Integer, String> consumer;
    private final String topic;
 
    public Consumer(String topic) {
        super("CKafkaConsumerExample", false);
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BigDataConfig.BROKER_URL);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, BigDataConfig.GROUP_ID);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.CKafka.common.serialization.IntegerDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.CKafka.common.serialization.StringDeserializer");
 
        consumer = new CKafkaConsumer<>(props);
        this.topic = topic;
    }
 
    @Override
    public void doWork() {
        consumer.subscribe(Collections.singletonList(this.topic));
        ConsumerRecords<Integer, String> records = consumer.poll(1000);
        for (ConsumerRecord<Integer, String> record : records) {
            System.out.println("From topic 【 "+ topic+" 】 Received message------> (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
        }
    }
 
    @Override
    public String name() {
        return null;
    }
 
    @Override
    public boolean isInterruptible() {
        return false;
    }
}
测试类
 
ApplicationForMessage
 
public class ApplicationForMessage {
    public static void main(String[] args) {
        /**
        * 生产者生产消息
        */
        boolean isAsync = args.length == 0 || !args[0].trim().equalsIgnoreCase("sync");
        Producer producerThread = new Producer(BigDataConfig.TOPIC, isAsync);
        producerThread.start();
 
        /**
        * 消费者消费消息
        */
        Consumer consumerThread = new Consumer(BigDataConfig.TOPIC);
        consumerThread.start();
    }
}