Pulsar3.0 是 Pulsar 社区推出的第一个 LTS 长期支持版本。
成都创新互联是一家集网站建设,新化企业网站建设,新化品牌网站建设,网站定制,新化网站建设报价,网络营销,网络优化,新化网站推广为一体的创新建站企业,帮助传统企业提升企业形象加强企业竞争力。可充分满足这一群体相比中小企业更为丰富、高端、多元的互联网需求。同时我们时刻保持专业、时尚、前沿,时刻以成就客户成长自我,坚持不断学习、思考、沉淀、净化自己,让我们为更多的企业打造出实用型网站。
图片
如图所示,LTS 版本会最长支持到 36 个月,而 Feature 版本最多只有六个月;类似于我们使用的 JDK11,17,21 都是可以长期使用的;所以也推荐大家都升级到 LTS 版本。
作为首个 LTS 版本,3.0 自然也是自带了许多新特性,这个会在后续介绍。
先来看看升级指南:
图片
在官方的兼容表中会发现:不推荐跨版本升级。
也就是说如果你现在还在使用的是 2.10.x,那么推荐是先升级到 2.11.x 然后再升级到 3.0.x.
而且根据我们的使用经验来看,首个版本是不保险的,即便是 LTS 版本;所以不推荐直接升级到 3.0.0,而是更推荐 3.0.1+,这个小版本会修复 3.0 所带来的一些 bug。
先讲一下我们的升级流程,大家可以用做参考。
根据我们的使用场景,为了以防万一,首先需要将我们的插件依赖升级到对应的版本。
图片
其实简单来说就是更新下依赖,然后再重新打包,在后续的流程进行测试。
之后是预热镜像,我们使用 harbor 搭建了自己的 docker 镜像仓库,这样在升级重启镜像的时候可以更快的从内网拉取镜像。
毕竟一个 pulsar-all 的镜像也不小,尽量的缩短启动时间。
预热的过程也很简单:
docker pull apachepulsar/pulsar-all:3.0.1
docker tag apachepulsar/pulsar-all:3.0.1 harbor-private.xx.com/pulsar/pulsar-all:3.0.1
docker image push harbor-private.xx.com/pulsar/pulsar-all:3.0.1
之后升级的时候就可以使用私服的镜像了。
我这边有写了一个 cli 可以帮我快速创建或升级一个集群,然后触发我所编写的功能测试。
./pulsar-upgrade-cli upgrade pulsar-test ./charts/pulsar --version x.x.x -f charts/pulsar/values.yaml -n pulsar-test
这个 cli 很简单,一共就做三件事:
之后的效果如下:
图片
主要就是覆盖了我们的使用场景,都跑通过之后才会走后续的流程。
图片
之后会启动一个 200 左右的并发生产和消费数据,模拟线上的使用情况,会一直让这个任务跑着,大概一晚上就可以了,第二天通过监控查看:
组件的升级步骤这里参考了官方指南:https://pulsar.apache.org/docs/3.1.x/administration-upgrade/#upgrade-zookeeper-optional
图片
只要一步步按照这个流程走,问题不大,哪一步出现问题后需要及时回滚,回滚流程参考下面的回滚部分。
同时在升级过程中需要一直查看 broker 的 error 日志,如果有明显的不符合预期的日志一定要注意。
在升级 bookkeeper 的时候,broker 可能会出现 bk 连接失败的异常,这个可以不用在意。
都升级完后就是线上业务验证环节了:
当出现异常的时候需要立即回滚,这里的异常一般就是消息收发异常,客户端掉线等。
经过我的测试 3.0.x 的存储和之前的版本是兼容的,所以 bookkeeper 都能降级其他的组件就没啥可担心的了。
需要降级时直接将所有组件降级为上一个版本即可。
因为是从 2.x 升级到 3.x 也是涉及到了跨大版本,所以也准备了灾难恢复的方案。
比如极端情况下升级失败,所有数据丢失的情况。
整个灾难恢复的主要目的就是恢复后的集群对外提供的域名不发生变化,同时所有的客户端可以自动重连上来,也就是最坏的情况下所有的数据丢了可以接受,但不能影响业务正常使用。
所以我们的流程如下:
@SneakyThrows
@Test
void backup(){
List topicList = pulsarAdmin.topics().getPartitionedTopicList("tenant/namespace");
log.info("topic size={}",topicList.size());
// create a custom thread pool
CopyOnWriteArrayList dataList = new CopyOnWriteArrayList<>();
ExecutorService customThreadPool = Executors.newFixedThreadPool(10);
for (String topicName : topicList) {
customThreadPool.execute(()-> {
PartitionedTopicMetadata metadata;
try {
metadata = pulsarAdmin.topics().getPartitionedTopicMetadata(topicName);
TopicMeta topicMeta = new TopicMeta();
// backup topic
topicMeta.setName(topicName);
topicMeta.setPartition(metadata.partitions);
// backup permission
Map> permissions = pulsarAdmin.topics().getPermissions(topicName);
topicMeta.setPermissions(permissions);
// back sub
List subscriptions = new ArrayList<>();
PartitionedTopicStats topicStats = pulsarAdmin.topics().getPartitionedStats(topicName, true);
topicStats.getSubscriptions().forEach((k,v)-> subscriptions.add(k));
topicMeta.setSubscriptions(subscriptions);
dataList.add(topicMeta);
} catch (PulsarAdminException e) {
throw new RuntimeException(e);
} }); }
customThreadPool.shutdown();
while (!customThreadPool.isTerminated()) {
}
log.info("{}",dataList.size());
log.info("{}",JSONUtil.toJsonStr(dataList));
}
// TopicMetaData
@Data
public class TopicMeta {
private String name;
private int partition;
Map> permissions;
List subscriptions = new ArrayList<>();
}
第一步是备份 topic:
因为我们客户端使用了 JWT 验证,所有为了使得恢复的 Pulsar 集群可以让客户端无缝切换到新集群,因此必须得使用相同的公私钥。
这个其实比较简单,我们使用的是 helm 安装的集群,所以只需要备份好 Secret 即可。
apiVersion: v1
data:
PRIVATEKEY: XXX
PUBLICKEY: XXX
kind: Secret
metadata:
name: pulsar-token-asymmetric-key
namespace: pulsar
type: Opaque
# 还有几个 superUser 的 Secret
首先使用 helm 重新创建一个新集群:
./scripts/pulsar/prepare_helm_release.sh -n pulsar -k pulsar
helm install \ --values charts/pulsar/values.yaml \ --set namespace=pulsar\
--set initialize=true \
pulsar ./charts/pulsar -n pulsar
直接使用刚才备份的公私钥覆盖到新集群即可。
进入 toolset pod 创建需要使用的 tenant/namespace
k exec -it pulsar-toolset-0 -n pulsar bash
bin/pulsar-admin tenants create tenant
bin/pulsar-admin namespaces create tenant/namespace
之后便是最重要的元数据恢复了:
@SneakyThrows
@Test
void restore() {
PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl("http://url:8080")
.authentication(AuthenticationFactory.token(token))
.build();
Path filePath = Path.of("restore-ns.json");
String fileContent = Files.readString(filePath);
List topicMetaList = JSON.parseArray(fileContent, TopicMeta.class);
ExecutorService customThreadPool = Executors.newFixedThreadPool(50);
for (TopicMeta topicMeta : topicMetaList) {
customThreadPool.execute(() -> {
// Create topic
try {
pulsarAdmin.topics().createPartitionedTopic(topicMeta.getName(), topicMeta.getPartition());
} catch (PulsarAdminException e) {
log.error("Create topic error");
}
// Create sub
for (String subscription : topicMeta.getSubscriptions()) {
try {
pulsarAdmin.topics().createSubscription(topicMeta.getName(), subscription, MessageId.latest);
} catch (PulsarAdminException e) {
log.error("createSubscription error");
} }
// Grant permission
topicMeta.getPermissions().forEach((role, authActions) -> {
permission(pulsarAdmin, topicMeta.getName(), role, authActions);
});
log.info("topic:{} restore success", topicMeta.getName());
}); }
customThreadPool.shutdown();
while (!customThreadPool.isTerminated()) {
} log.info("restore success");
}
private synchronized void permission(PulsarAdmin pulsarAdmin, String topic, String role, Set authActions) {
try {
pulsarAdmin.topics().grantPermission(topic, role, authActions);
} catch (PulsarAdminException e) {
log.error("grantPermission error", e);
}
}
流程和备份类似:
因为授权接口限制了并发调用,所有需要加锁,导致整个恢复的流程就会比较慢。
8000 topic 的 namespace 大概恢复时间为 40min 左右。
之后依次恢复其他 namespace 即可。
admin.namespaces().setNamespaceMessageTTL("tenant/namespace", 3600 * 6);
admin.namespaces().setBacklogQuota("tenant/namespace", BacklogQuota)
如果之前的集群有设置 TTL 或者是 backlogQuota 时都需要手动恢复。
以上就是整个升级和灾难恢复的流程,当然灾难恢复希望大家不要碰到。
我会在下一篇详细介绍 Pulsar 3.0 的新功能以及所碰到的一些坑。
标题名称:Pulsar3.0升级指北,你学会些什么?
网站链接:http://www.csdahua.cn/qtweb/news49/307149.html
网站建设、网络推广公司-快上网,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 快上网