腾讯云消息队列 CKafka购买和使用
腾讯云消息队列CKafka是一个拥有分布式的、高吞吐量、可扩展性,支持日志收集、监控数据聚合、流式数据处理、在线和离线分析的消息队列服务。
腾讯云消息队列 CKafka开通购买地址 https://cloud.tencent.com/product/ckafka
腾讯云消息队列 CKafka最新官方入门文档 https://cloud.tencent.com/product/ckafka/getting-started
腾讯云消息队列CKafka针对开源社区的 CKafka 提供全托管服务,可选的存储和带宽资源,使得腾讯云消息队列CKafka根据需求申请购买,只需专注于业务开发,不用考虑部署运维,低成本、更弹性、更可靠。无论您的业务部署在云上还是云下,用户在控制台创建队列和消费组,即可通过客户端生产与消费消息
本文旨在以最简单明了的方式引导您快速上手消息队列CKafka,为进一步熟悉和使用消息队列 CKafka 的功能提供入门。快速接入消息队列 CKafka 服务的步骤如下:
腾讯云消息队列CKafka购买
登录腾讯云,选择消息队列CKafka,进入消息队列CKafka平台,点击立即购买
创建腾讯云消息队列CKafka实例
1、用户点击立即购买,页面跳转至购买CKafka实例的页面,选择不同规格资源后点击确认完成支付,用户需要依次选择流量峰值、存储容量。如果想清除已选配置,请点击清除配置
2、选择完规格后,点击确认订单,显示确认订单界面界面,查看已选配置是否正确,确认后,点击去支付按钮。在支付界面选择支付类型,支付成功后,跳转到支付成功界面。如果余额不足,请点击去充值
参数 | 说明 |
---|---|
地域 |
实例所在的地理位置。购买后无法更换地域。 不同地域的实例之间内网互不相通;选择靠近您的地域,可降低网络时延、提高您的访问速度 。 |
实例类型 | 即Kafka类型:VPC实例。 |
流量峰值 |
支持20MB/s、30MB/s 买时请按照读流量与写流量之和为购买规格。 举例:实际写流量峰值 10MB/s,读流量峰值 20MB/s,10MB/s+20MB/s 则以 30MB/s 规格购买。 为了业务的稳定性,建议购买大于实际流量 30% 左右的余量作为 buffer。 |
数据盘类型 | 高性能HDD云硬盘 |
存储容量 | 300G、600G |
网络配置 |
专有网络(推荐):也称为VPC(Virtual Private Cloud)。VPC是一种隔离的网络环境 安全性和性能均高于传统的经典网络。 |
购买量 | 根据时间选择购买量:1月,2月,3月,6月,1年,2年,3年 |
腾讯云消息队列界面包括集群ID/名称,运行状态,实例类型,付费类型,创建时间,操作。可以按照集群ID/名称进行搜索,创建失败的实例用户可自由选择删除该列表
续费
续费入口:只有创建成功且运行状态为“服务中”的实例,才有允许续费,点击对应实例的续费功能
Topic管理
在实例列表界面选择一个实例,单击名称,进入实例详情界面,选择Topic管理列表,进入Topic管理界面,显示信息包括:Topic名称、分区数、副本数、消息保留时长(小时)、同步落盘、同步复制、操作。Topic可以按照Topic名称进行搜索
创建 Topic
在实例列表界面选择一个实例,单击名称,进入实例详情界面,选择Topic管理。单击新建,进入Topic创建界面
参数 | 说明 |
---|---|
Topic说明 |
1. Topic 名称只能包含字母,数字,下划线(_)和短横线(-) 2名称长度限制在 3-64 字节之间,长于 64 字节将被自动截取 3. 一旦创建后不能再修改 Topic 名称 |
分区数 |
您可以设置Topic的分区数,分区数越大消费的并发度越大。该参数设置为1时,消费消息时会按照先入先出的顺序进行消费 取值范围:1-20,默认值:3 |
副本数 |
您可以为每个Topic设置副本的数量,Kafka会自动在每个副本上备份数据,当其中一个Broker节点故障时数据依然是可用的 副本数越大可靠性越高。该参数设置为1时,表示只有一份数据。取值范围:1-3,默认值:3 |
消息保留时长 | Topic中的消息超过老化时间后,消息将会被删除,老化的消息无法被消费。取值范围:1-168,默认值:72 |
同步复制 | 开启同步复制后,需要在客户端配置acks=-1,否则无效 |
同步落盘 |
同步落盘是指生产的每条消息都会立即写入磁盘。开启:生产的每条消息都会立即写入磁盘,可靠性更高。 关闭:生产的消息存在内存中,不会立即写入磁盘 |
点击确定按钮,提示创建Topic成功,查看Topic创建情况
删除 Topic
在实例列表界面选择一个实例,单击名称,进入实例详情界面,选择Topic管理。选择对应Topic名称行的删除功能,删除单个Topic。勾选多个Topic、或全选按钮,删除多个Topic
连接CKafka集群
创建主机连接CKafka
登录腾讯云系统,选择计算-弹性云主机,点击新建。选择与创建消息队列CKafka一致的区域创建主机,选择对应规格,镜像系统与CKafka系统一致,目前为centos 7.3即可,可选挂载存储。点击确认订单,返回后查看主机状态,选择控制台,进入主机,登录命令窗口,下载CKafka客户端,执行CKafka命令,命令中IP对应为CKafka集群中实例详情界面的IP
生产消费Demo演示
通过maven引入jar包依赖
maven依赖
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.CKafka/CKafka -->
<dependency>
<groupId>org.apache.CKafka</groupId>
<artifactId>CKafka_2.11</artifactId>
<version>0.10.0.0</version>
</dependency>
</dependencies>
写公共配置类
BigDataConfig
public class BigDataConfig {
/**
* CKafka 的 topic
*/
public final static String TOPIC="string112";
/**
* CKafka broker list
* 此处用来修改CKafka 的连接地址
*/
public final static String BROKER_URL = "10.0.44.65:9092";
/**
* 消费组Id
*/
public final static String GROUP_ID="tj-group22";
/**
* 客户连接 Id
* 对数据不产生影响,对CKafka而言,只关注 groupId
*/
public final static String CLIENT_ID="TJclient2";
/**
* CKafka message->key
* 对生产消费消息,没有影响,可暂时忽略
*/
public final static String KEY="generator";
}
生产者逻辑
Producer
public class Producer extends Thread {
private final CKafkaProducer<Integer, String> producer;
private final String topic;
private final Boolean isAsync;
public Producer(String topic, Boolean isAsync) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BigDataConfig.BROKER_URL);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producer = new CKafkaProducer<>(props);
this.topic = topic;
this.isAsync = isAsync;
}
public void run() {
int messageNo = 1;
while (true) {
String messageStr = "Message_" + messageNo;
long startTime = System.currentTimeMillis();
if (isAsync) {
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr), new DemoCallBack(startTime, messageNo, messageStr));
} else {
try {
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr)).get();
System.out.println("Sent message to topic [ "+ topic+ " ] -----> (" + messageNo + ", " + messageStr + ")");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
++messageNo;
}
}
}
class DemoCallBack implements Callback {
private final long startTime;
private final int key;
private final String message;
public DemoCallBack(long startTime, int key, String message) {
this.startTime = startTime;
this.key = key;
this.message = message;
}
/**
* 消息发送成功后的回调方法
*/
public void onCompletion(RecordMetadata metadata, Exception exception) {
long elapsedTime = System.currentTimeMillis() - startTime;
if (metadata != null) {
String topic = metadata.topic();
System.out.println(
"Producer send to topic 【 "+topic+" 】 message ------>(key: " + key + ", value: " + message + " ) sent to partition(" + metadata.partition() +
"), " +
"offset( " + metadata.offset() + " ) 耗时 " + elapsedTime + " 毫秒");
} else {
exception.printStackTrace();
}
}
}
消费者逻辑
Consumer
public class Consumer extends ShutdownableThread {
private final CKafkaConsumer<Integer, String> consumer;
private final String topic;
public Consumer(String topic) {
super("CKafkaConsumerExample", false);
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BigDataConfig.BROKER_URL);
props.put(ConsumerConfig.GROUP_ID_CONFIG, BigDataConfig.GROUP_ID);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.CKafka.common.serialization.IntegerDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.CKafka.common.serialization.StringDeserializer");
consumer = new CKafkaConsumer<>(props);
this.topic = topic;
}
@Override
public void doWork() {
consumer.subscribe(Collections.singletonList(this.topic));
ConsumerRecords<Integer, String> records = consumer.poll(1000);
for (ConsumerRecord<Integer, String> record : records) {
System.out.println("From topic 【 "+ topic+" 】 Received message------> (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
}
}
@Override
public String name() {
return null;
}
@Override
public boolean isInterruptible() {
return false;
}
}
测试类
ApplicationForMessage
public class ApplicationForMessage {
public static void main(String[] args) {
/**
* 生产者生产消息
*/
boolean isAsync = args.length == 0 || !args[0].trim().equalsIgnoreCase("sync");
Producer producerThread = new Producer(BigDataConfig.TOPIC, isAsync);
producerThread.start();
/**
* 消费者消费消息
*/
Consumer consumerThread = new Consumer(BigDataConfig.TOPIC);
consumerThread.start();
}
}