前言
@SpringBootApplication
@EnableScheduling
public class Application{
public static void mian(String[] args){
SpringApplication.run(Application.class,args);
}
@Scheduled(cron = "0/3 * * * * *")
public void demoTask() {
//...
}
}
org.itstack.middleware
schedule-spring-boot-starter
1.0.0-RELEASE
与SpringBoot的Sceduling非常像,他的注解是;@EnableScheduling,尽可能降低使用难度
@SpringBootApplication
@EnableDcsScheduling
public class HelloWorldApplication {
public static void main(String[] args) {
SpringApplication.run(HelloWorldApplication.class, args);
}
}
@Component("demoTaskThree")
public class DemoTaskThree {
@DcsScheduled(cron = "0 0 9,13 * * *", desc = "03定时任务执行测试:taskMethod01", autoStartup = false)
public void taskMethod01(){
System.out.println("03定时任务执行测试:taskMethod01");
}
@DcsScheduled(cron = "0 0/30 8-10 * * *", desc = "03定时任务执行测试:taskMethod02", autoStartup = false)
public void taskMethod02(){
System.out.println("03定时任务执行测试:taskMethod02");
}
}
以SpringBoot为基础开发一款中间件我也是第一次,因为接触SpringBoot也刚刚1个月左右。虽然SpringBoot已经出来挺久的了,但由于我们项目开发并不使用SpringBoot的一套东西,所以一直依赖没有接触。直到上个月开始考虑领域驱动设计才接触,嗯!真的不错,那么就开始了夯实技能、学习思想用到项目里。
按照我的产品需求,开发这么一款分布式任务的中间件,我脑袋中的模型已经存在了。另外就是需要开发过程中去探索我需要的知识工具,简单包括;
schedule-spring-boot-starter
└── src
├── main
│ ├── java
│ │ └── org.itstack.middleware.schedule
│ │ ├── annotation
│ │ │ ├── DcsScheduled.java
│ │ │ └── EnableDcsScheduling.java
│ │ ├── annotation
│ │ │ └── InstructStatus.java
│ │ ├── config
│ │ │ ├── DcsSchedulingConfiguration.java
│ │ │ ├── StarterAutoConfig.java
│ │ │ └── StarterServiceProperties.java
│ │ ├── domain
│ │ │ ├── DataCollect.java
│ │ │ ├── DcsScheduleInfo.java
│ │ │ ├── DcsServerNode.java
│ │ │ ├── ExecOrder.java
│ │ │ └── Instruct.java
│ │ ├── export
│ │ │ └── DcsScheduleResource.java
│ │ ├── service
│ │ │ ├── HeartbeatService.java
│ │ │ └── ZkCuratorServer.java
│ │ ├── task
│ │ │ ├── TaskScheduler.java
│ │ │ ├── ScheduledTask.java
│ │ │ ├── SchedulingConfig.java
│ │ │ └── SchedulingRunnable.java
│ │ ├── util
│ │ │ └── StrUtil.java
│ │ └── DoJoinPoint.java
│ └── resources
│ └── META_INF
│ └── spring.factories
└── test
└── java
└── org.itstack.demo.test
└── ApiTest.java
2.1 自定义注解
annotation/EnableDcsScheduling.java & 自定义注解
这个注解一堆的圈A,这些配置都是为了开始启动执行我们的中间件;
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Import({DcsSchedulingConfiguration.class})
@ImportAutoConfiguration({SchedulingConfig.class, CronTaskRegister.class, DoJoinPoint.class})
@ComponentScan("org.itstack.middleware.*")
public @interface EnableDcsScheduling {
}
2.2 扫描自定义注解、初始化配置/服务、启动任务、挂在节点
config/DcsSchedulingConfiguration.java & 初始化配置/服务、启动任务、挂在节点
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Class> targetClass = AopProxyUtils.ultimateTargetClass(bean);
if (this.nonAnnotatedClasses.contains(targetClass)) return bean;
Method[] methods = ReflectionUtils.getAllDeclaredMethods(bean.getClass());
if (methods == null) return bean;
for (Method method : methods) {
DcsScheduled dcsScheduled = AnnotationUtils.findAnnotation(method, DcsScheduled.class);
if (null == dcsScheduled || 0 == method.getDeclaredAnnotations().length) continue;
ListexecOrderList = Constants.execOrderMap.computeIfAbsent(beanName, k -> new ArrayList<>());
ExecOrder execOrder = new ExecOrder();
execOrder.setBean(bean);
execOrder.setBeanName(beanName);
execOrder.setMethodName(method.getName());
execOrder.setDesc(dcsScheduled.desc());
execOrder.setCron(dcsScheduled.cron());
execOrder.setAutoStartup(dcsScheduled.autoStartup());
execOrderList.add(execOrder);
this.nonAnnotatedClasses.add(targetClass);
}
return bean;
}
private void init_server(ApplicationContext applicationContext) {
try {
//获取zk连接
CuratorFramework client = ZkCuratorServer.getClient(Constants.Global.zkAddress);
//节点组装
path_root_server = StrUtil.joinStr(path_root, LINE, "server", LINE, schedulerServerId);
path_root_server_ip = StrUtil.joinStr(path_root_server, LINE, "ip", LINE, Constants.Global.ip);
//创建节点&递归删除本服务IP下的旧内容
ZkCuratorServer.deletingChildrenIfNeeded(client, path_root_server_ip);
ZkCuratorServer.createNode(client, path_root_server_ip);
ZkCuratorServer.setData(client, path_root_server, schedulerServerName);
//添加节点&监听
ZkCuratorServer.createNodeSimple(client, Constants.Global.path_root_exec);
ZkCuratorServer.addTreeCacheListener(applicationContext, client, Constants.Global.path_root_exec);
} catch (Exception e) {
logger.error("itstack middleware schedule init server error!", e);
throw new RuntimeException(e);
}
}
private void init_task(ApplicationContext applicationContext){
CronTaskRegister cronTaskRegistrar = applicationContext.getBean("itstack-middlware-schedule-cronTaskRegister", CronTaskRegister.class);
SetbeanNames = Constants.execOrderMap.keySet();
for (String beanName : beanNames) {
ListexecOrderList = Constants.execOrderMap.get(beanName);
for (ExecOrder execOrder : execOrderList) {
if (!execOrder.getAutoStartup()) continue;
SchedulingRunnable task = new SchedulingRunnable(execOrder.getBean(), execOrder.getBeanName(), execOrder.getMethodName());
cronTaskRegistrar.addCronTask(task, execOrder.getCron());
}
}
}
private void init_node() throws Exception {
SetbeanNames = Constants.execOrderMap.keySet();
for (String beanName : beanNames) {
ListexecOrderList = Constants.execOrderMap.get(beanName);
for (ExecOrder execOrder : execOrderList) {
String path_root_server_ip_clazz = StrUtil.joinStr(path_root_server_ip, LINE, "clazz", LINE, execOrder.getBeanName());
String path_root_server_ip_clazz_method = StrUtil.joinStr(path_root_server_ip_clazz, LINE, "method", LINE, execOrder.getMethodName());
String path_root_server_ip_clazz_method_status = StrUtil.joinStr(path_root_server_ip_clazz, LINE, "method", LINE, execOrder.getMethodName(), "/status");
//添加节点
ZkCuratorServer.createNodeSimple(client, path_root_server_ip_clazz);
ZkCuratorServer.createNodeSimple(client, path_root_server_ip_clazz_method);
ZkCuratorServer.createNodeSimple(client, path_root_server_ip_clazz_method_status);
//添加节点数据[临时]
ZkCuratorServer.appendPersistentData(client, path_root_server_ip_clazz_method + "/value", JSON.toJSONString(execOrder));
//添加节点数据[永久]
ZkCuratorServer.setData(client, path_root_server_ip_clazz_method_status, execOrder.getAutoStartup() ? "1" : "0");
}
}
}
service/ZkCuratorServer.java & zk服务
public static void addTreeCacheListener(final ApplicationContext applicationContext, final CuratorFramework client, String path) throws Exception {
TreeCache treeCache = new TreeCache(client, path);
treeCache.start();
treeCache.getListenable().addListener((curatorFramework, event) -> {
//...
switch (event.getType()) {
case NODE_ADDED:
case NODE_UPDATED:
if (Constants.Global.ip.equals(instruct.getIp()) && Constants.Global.schedulerServerId.equals(instruct.getSchedulerServerId())) {
//执行命令
Integer status = instruct.getStatus();
switch (status) {
case 0: //停止任务
cronTaskRegistrar.removeCronTask(instruct.getBeanName() + "_" + instruct.getMethodName());
setData(client, path_root_server_ip_clazz_method_status, "0");
logger.info("itstack middleware schedule task stop {} {}", instruct.getBeanName(), instruct.getMethodName());
break;
case 1: //启动任务
cronTaskRegistrar.addCronTask(new SchedulingRunnable(scheduleBean, instruct.getBeanName(), instruct.getMethodName()), instruct.getCron());
setData(client, path_root_server_ip_clazz_method_status, "1");
logger.info("itstack middleware schedule task start {} {}", instruct.getBeanName(), instruct.getMethodName());
break;
case 2: //刷新任务
cronTaskRegistrar.removeCronTask(instruct.getBeanName() + "_" + instruct.getMethodName());
cronTaskRegistrar.addCronTask(new SchedulingRunnable(scheduleBean, instruct.getBeanName(), instruct.getMethodName()), instruct.getCron());
setData(client, path_root_server_ip_clazz_method_status, "1");
logger.info("itstack middleware schedule task refresh {} {}", instruct.getBeanName(), instruct.getMethodName());
break;
}
}
break;
case NODE_REMOVED:
break;
default:
break;
}
});
}
public void addCronTask(SchedulingRunnable task, String cronExpression) {
if (null != Constants.scheduledTasks.get(task.taskId())) {
removeCronTask(task.taskId());
}
CronTask cronTask = new CronTask(task, cronExpression);
Constants.scheduledTasks.put(task.taskId(), scheduleCronTask(cronTask));
}
public void removeCronTask(String taskId) {
ScheduledTask scheduledTask = Constants.scheduledTasks.remove(taskId);
if (scheduledTask == null) return;
scheduledTask.cancel();
}
@Pointcut("@annotation(org.itstack.middleware.schedule.annotation.DcsScheduled)")
public void aopPoint(){
}
@Around("aopPoint()")
public Object doRouter(ProceedingJoinPoint jp) throws Throwable {
long begin = System.currentTimeMillis();
Method method = getMethod(jp);
try {
return jp.proceed();
} finally {
long end = System.currentTimeMillis();
logger.info("\nitstack middleware schedule method:{}.{} take time(m):{}", jp.getTarget().getClass().getSimpleName(), method.getName(), (end - begin));
}
}
开发完成后还是需要将Jar包发布到manven中心仓库的,这个过程较长单独写了博客;发布Jar包到Maven中央仓库(为开发开源中间件做准备)
标题名称:自己开发一个分布式的Xxl-Job任务调度组件
网址分享:http://www.csdahua.cn/qtweb/news26/358026.html
网站建设、网络推广公司-快上网,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 快上网