哈喽,大家好,我是了不起。
创新互联-专业网站定制、快速模板网站建设、高性价比徽县网站开发、企业建站全套包干低至880元,成熟完善的模板库,直接使用。一站式徽县网站制作公司更省心,省钱,快速模板网站建设找我们,业务覆盖徽县地区。费用合理售后完善,十载实体公司更值得信赖。
之前写过关于 Apache Pulsar 的简单示例,用来了解如何使用 Pulsar 这个新生代的消息队列中间件,但是如果想要在项目中使用,还会欠缺很多,最明显的就是 集成复杂,如果你用过其他消息中间件,比如 Kafka、RabbitMq,只需要简单的引入 jar,就可以通过注解+配置快速集成到项目中。
既然已经了解了 Apache Pulsar,又认识了 spring-boot-starter,今天不妨来看下如何写一个 pulsar-spring-boot-starter 模块。
写一个完整的类似 kafka-spring-boot-starter(springboot 项目已经集成到 spring-boot-starter 中),需要考虑到很多 kafka 的特性, 今天我们主要实现下面几个模板
└── pulsar-starter
├── pulsar-spring-boot-starter
├── pulsar-spring-boot-autoconfigure
├── spring-pulsar
├── spring-pulsar-xx
├── spring-pulsar-sample
└── README.md
整个模块的结构如上其中pulsar-starter作为一个根模块,主要控制子模块依赖的其他 jar 的版本以及使用到的插件版本。类似于 Spring-Bom,这样我们在后续升级 时,就可以解决各个第三方 jar 的可能存在版本冲突导致的问题。
该模块作为外部项目集成的直接引用 jar,可以认为是 pulsar-spring-boot-starter 组件的入口,里面不需要写任何代码,只需要引入需要的依赖(也就是下面的子模块)即可
该模块主要定义了 spring.factories 以及 AutoConfigure、Properties。也就是自动配置的核心(配置项+Bean 配置)
该模块是核心模块,主要的实现都在这里
扩展模块,可以对 spring-pulsar 做更细化的划分
starter 的使用示例项目
上面我们说到实现目标,现在看下各个模块应该包含什么内容,以及怎么实现我们的目标
上面说到 starter 主要是引入整个模块基础的依赖即可,里面不用写代码。
com.sucl
spring-pulsar
${project.version}
com.sucl
pulsar-spring-boot-autoconfigure
${project.version}
org.springframework.boot
spring-boot
org.springframework.boot
spring-boot-starter-logging
org.springframework.boot
spring-boot-configuration-processor
true
@Configuration
@EnableConfigurationProperties({PulsarProperties.class})
@Import({PulsarAnnotationDrivenConfiguration.class})
public class PulsarAutoConfiguration {
private final PulsarProperties properties;
public PulsarAutoConfiguration(PulsarProperties properties) {
this.properties = properties;
}
@Bean(destroyMethod = "close")
public PulsarClient pulsarClient() {
ClientBuilder clientBuilder = new ClientBuilderImpl(properties);
return clientBuilder.build();
}
@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
public ConsumerFactory pulsarConsumerFactory() {
return new DefaultPulsarConsumerFactory(pulsarClient(), properties.getConsumer().buildProperties());
}
@Bean
@ConditionalOnMissingBean(ProducerFactory.class)
public ProducerFactory pulsarProducerFactory() {
return new DefaultPulsarProducerFactory(pulsarClient(), properties.getProducer().buildProperties());
}
}
在目录src/main/resources/META-INF下创建spring.factories,内容如下:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.sucl.pulsar.autoconfigure.PulsarAutoConfiguration
org.apache.pulsar
pulsar-client
org.springframework.boot
spring-boot-autoconfigure
org.springframework
spring-messaging
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Import({PulsarListenerConfigurationSelector.class})
public @interface EnablePulsar {
}
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface PulsarListener {
/**
*
* @return TOPIC 支持SPEL
*/
String[] topics() default {};
/**
*
* @return TAGS 支持SPEL
*/
String[] tags() default {};
}
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface PulsarHandler {
}
flow
按照下面的流程,你会发现通过简单的几行代码就能够实现消息的生产与消费,并集成到项目中去。
com.sucl
pulsar-spring-boot-starter
${project.version}
org.springframework.boot
spring-boot-starter-web
2.添加配置
cycads:
pulsar:
service-url: pulsar://localhost:6650
listener-topics: TOPIC_TEST
3.编写对应消费代码
@Slf4j
@Component
@PulsarListener(topics = "#{'${cycads.listener-topics}'.split(',')}")
public class PulsarDemoListener {
@PulsarHandler
public void onConsumer(Message message){
log.info(">>> 接收到消息:{}", message.getPayload());
}
}
@Slf4j
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = {ContextConfig.class})
@Import({PulsarAutoConfiguration.class})
public class ProducerTests {
@Autowired
private ProducerFactory producerFactory;
@Test
public void sendMessage() {
Producer producer = producerFactory.createProducer("TOPIC_TEST");
MessageId messageId = producer.send("this is a test message");
log.info(">>>>>>> 消息发送完成:{}", messageId);
}
@Configuration
@PropertySource(value = "classpath:application-test.properties")
static class ContextConfig {
//
}
}
2023-02-26 19:57:15.572 INFO 26520 --- [pulsar-01] c.s.p.s.listener.PulsarDemoListener : >>> 接收到消息:GenericMessage [payload=this is a test message, headers={id=f861488c-2afb-b2e7-21a1-f15e9759eec5, timestamp=1677412635571}]
基于 pulsar-client 提供的 ConfigurationData 扩展 Properties;了解 Pulsar Client 如何连接 Broker 并进行消息消费,包括同步消费、异步消费等等
实现 starter 自动配置的关键,基于 SPI 完成配置的自动加载
通过 Bean 生命周期相关扩展实现注解的解析与容器的启动,比如 BeanPostProcessor, BeanFactoryAware, SmartInitializingSingleton, InitializingBean, DisposableBean 等
基于回调与 MethodHandler 实现消息体的封装、参数解析以及方法调用;
https://github.com/sucls/pulsar-starter.git
如果你看过 spring-kafka 的源代码,那么你会发现所有代码基本都是仿造其实现。一方面能够阅读 kafka client 在 spring 具体如何实现;同时通过编写自己的 spring starter 模块,学习 整个 starter 的实现过程。
当前文章:手把手教你写SpringBootStarter
文章起源:http://www.csdahua.cn/qtweb/news32/400932.html
网站建设、网络推广公司-快上网,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 快上网