造个轮子-任务调度执行小框架-任务清单执行恢复实现

前言

okey,通过前面的两篇文章,关于这个任务执行这一块,我想应该是明白了。但是这里的话,还是不够的。我们希望对于任务还可以做到执行失败的重试执行,关于这个意外宕机的一个状态恢复。

当然这里要说的是,由于完整的实现,就是按照那种牛逼的标准来实现的话,这个实现确实很复杂,要考虑的情况也非常多,这个是没有办法的事情。所以这里只能实现一个简单的。

这里的话又不得不提到我们一开始,对于这个任务清单和任务清单项的一个类型的处理了:

我们主要有这几个类型:

package com.huterox.todoscheduler.core.enumType;

import java.io.Serializable;

/**
 * TodoItem类型,是必须要坚持执行,还是非必须
 * */
public enum TodoItemElementType implements Serializable {
    /**
     * 是不是必须要执行,也就是说,如果当前这个item执行失败,整个
     * 清单任务执行失败
     * */
    MUSTITEM("MUSTITEM"),
    /**
     * 执行失败可以跳过,注意,这个时候,重启的时候是不会管这个的
     * */
    CONTINUTEITEM("CONTINUTEITEM");
    private String elementCode;
    TodoItemElementType(String s) {
        this.elementCode = s;
    }
    public String getElementCode() {
        return elementCode;
    }
    public void setElementCode(String elementCode) {
        this.elementCode = elementCode;
    }

}

package com.huterox.todoscheduler.core.enumType;

import java.io.Serializable;

/**
 * 任务清单对应的类型
 * */
public enum TodoListElementType implements Serializable {

    /**
     * 宕机之后重启先执行修复函数然后继续执行没有完成的任务的模式
     * */
    StrongConsistency("StrongConsistency"),
    /**
     * 宕机之后重启执行修复,但是不继续执行任务
     * */
    WeakConsistency("WeakConsistency"),
    /**
     * 宕机之后重启不执行任何操作
     * */
    NothingConsistency("NothingConsistency");

    private String elementCode;

    TodoListElementType(String s) {
        this.elementCode = s;
    }

    public String getElementCode() {
        return elementCode;
    }

    public void setElementCode(String elementCode) {
        this.elementCode = elementCode;
    }
}

所以我们此时还涉及到这个。

恢复执行流程

那么在这里的话,我们就需要先聊到这个恢复的执行流程。

这里的话主要先看到这个接口: 在这里插入图片描述

package com.huterox.todoscheduler.core.scheduler;

/**
 * 定制任务执行器,在这块的话,主要是扫描失败任务
 * 让后尝试恢复任务,以及服务器宕机之后对任务的恢复
 * */
public interface SchedulerEx {

    //恢复任务
    void repairTask();

    //重新执行失败的任务
    void againTask();

    //定时扫描失败的进程你
    void DaemonScheduling();
}

所以看到这个我估计你应该是明白了它的一个操作:

失败任务执行

首先,在这里我们先来看到对于失败任务的执行。在这里的话,注意看到这个东西:

我们的话在这些方法执行的时候的话,会保存状态,这个状态的保存的话,还是通过这个JDK的一个Serilizable接口来做。这里的话,不存在传输的问题,直接存,然后解析即可。

package com.huterox.todoscheduler.common;

import java.io.File;

import java.io.*;

public class SerializationUtils implements Serializable {

    private static final String DEFAULT_DIRECTORY = "src/main/resources";

    public static void serializeObject(Object object, String fileName) {
        serializeObject(object, DEFAULT_DIRECTORY, fileName);
    }

    public static Object deserializeObject(String fileName) {
        return deserializeObject(DEFAULT_DIRECTORY, fileName);
    }

    public static void serializeObject(Object object, String directory, String fileName) {
        try {
            // 创建目录
            File dir = new File(directory);
            if (!dir.exists()) {
                if (dir.mkdirs()) {
                    System.out.println("目录已创建: " + directory);
                } else {
                    System.out.println("无法创建目录: " + directory);
                    return;
                }
            }

            // 序列化对象
            FileOutputStream fileOut = new FileOutputStream(directory + "/" + fileName);
            ObjectOutputStream out = new ObjectOutputStream(fileOut);
            out.writeObject(object);
            out.close();
            fileOut.close();
            System.out.println("对象已序列化并保存到文件: " + directory + "/" + fileName);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static Object deserializeObject(String directory, String fileName) {
        Object object = null;
        try {
            // 反序列化对象
            FileInputStream fileIn = new FileInputStream(directory + "/" + fileName);
            ObjectInputStream in = new ObjectInputStream(fileIn);
            object = in.readObject();
            in.close();
            fileIn.close();
            System.out.println("从文件中反序列化出对象: " + directory + "/" + fileName);

        } catch (Exception e) {
            e.printStackTrace();
        }
        return object;
    }

    public static void deleteSerializedObject(String directory, String fileName) {
        try {
            File file = new File(directory + "/" + fileName);
            if (file.exists()) {
                if (file.delete()) {
                    System.out.println("文件删除成功: " + directory + "/" + fileName);
                } else {
                    System.out.println("无法删除文件: " + directory + "/" + fileName);
                }
            } else {
                System.out.println("文件不存在: " + directory + "/" + fileName);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

然后在执行的时候,我们把失败的任务放在这里:

在这里插入图片描述

package com.huterox.todoscheduler.core.global;

import com.huterox.todoscheduler.common.SerializationUtils;
import com.huterox.todoscheduler.core.execute.proxy.TodoListExBean;

import java.io.Serializable;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

/**
 * 执行失败的清单对象列表
 * 冷知识:CopyOnWriteArrayList 支持直接迭代删除
 * */
public class TodoListFailedList implements Serializable {

    private static volatile List<TodoListExBean> list;

    private TodoListFailedList() {}

    public static List<TodoListExBean> getInstance() {
        if (list == null) {
            synchronized (TodoListFailedList.class) {
                if (list == null) {
                    list = new CopyOnWriteArrayList<>();
                }
            }
        }
        return list;
    }

    public static List<TodoListExBean> addFailedWithSerializable(TodoListExBean todoListExBean) {
        if (list == null) {
            synchronized (TodoListFailedList.class) {
                if (list == null) {
                    list = new CopyOnWriteArrayList<>();
                }
            }
        }
        list.add(todoListExBean);
        //把这个list进行备份,这里面存放了失败的执行清单,所以这个时候要注意恢复
        SerializationUtils.serializeObject(list,"failedCatalogue","catalogue.ser");
        return list;
    }

    public static List<TodoListExBean> addFailed(TodoListExBean todoListExBean) {
        if (list == null) {
            synchronized (TodoListFailedList.class) {
                if (list == null) {
                    list = new CopyOnWriteArrayList<>();
                }
            }
        }
        list.add(todoListExBean);
        return list;
    }

}

所以的话,你可以直接找到这个东西,然后在对应目录去找到,然后实例化,然后去执行:

   public void againTask() {
        //重新执行失败任务

        Object deserializeObject = SerializationUtils
                .deserializeObject("failedCatalogue", "catalogue.ser");
        if (deserializeObject!=null){
            CopyOnWriteArrayList<TodoListExBean> failedCatalogue = (CopyOnWriteArrayList<TodoListExBean>) deserializeObject;

            for(TodoListExBean todoListExBean:failedCatalogue){
                if(todoListExBean.getExTimes()<Configuration.ExTimes){
                    DefaultExecuteCore defaultExecuteCore = new DefaultExecuteCore();
                    defaultExecuteCore.setTodoListExBean(todoListExBean);
                    TaskWrapper taskWrapper = new TaskWrapper();
                    taskWrapper.setExecuteCore(defaultExecuteCore);
                    //重新提交
                    TaskManager o = (TaskManager) ConfigEngine.corePart.get(TaskManager.class);
                    o.submitTask(taskWrapper,defaultExecuteCore.getClsId());
                }

            }
        }

    }

重启执行中任务恢复

之后的话就是这个执行当中的任务的恢复。 这个的话主要在这里:

    public void repairTask() {
        //两步走战略
        //1. 先恢复执行线程当中的任务
        //2. 查看失败队列有没有任务,有的话运行

        try {
            //先等一会儿再启动
            Thread.sleep(Configuration.WaitingRepairTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        Object deserializeObject = SerializationUtils
                .deserializeObject("runningTask", "task.ser");
        if(deserializeObject!=null){
            DefaultTaskManager runningTask = (DefaultTaskManager) deserializeObject;

            //拿到当时的任务队列
            ThreadPoolExecutor executor = runningTask.getExecutor();
            ArrayList<Runnable> runnableWrappers = new ArrayList<>(executor.getQueue());
            for(Runnable runnable:runnableWrappers){
                TaskWrapper taskWrapper = (TaskWrapper) runnable;
                taskWrapper.setExecuteType(ExecuteType.Repair);

                //重新提交
                TaskManager o = (TaskManager) ConfigEngine.corePart.get(TaskManager.class);
                o.submitTask(taskWrapper,taskWrapper.getExecuteCore().getClsId());
            }

        }

        againTask();
    }

这里的话,你发现,我这里只是重新设置了一下类型为修复类型就执行代码了,主要是因为这个: 在TaskWrapper里面,是这样的:

package com.huterox.todoscheduler.core.wapper;

import com.huterox.todoscheduler.core.enumType.ExecuteType;
import com.huterox.todoscheduler.core.execute.ExecuteCore;

import java.io.Serializable;


public class TaskWrapper implements Runnable, Serializable {

    private ExecuteCore executeCore;

    private ExecuteType executeType = ExecuteType.Run;

    public TaskWrapper() {
    }

    public ExecuteCore getExecuteCore() {
        return executeCore;
    }

    public ExecuteType getExecuteType() {
        return executeType;
    }

    public void setExecuteType(ExecuteType executeType) {
        this.executeType = executeType;
    }

    public void setExecuteCore(ExecuteCore executeCore) {
        this.executeCore = executeCore;
    }

    public TaskWrapper(ExecuteCore executeCore) {
        this.executeCore = executeCore;
    }

    @Override
    public void run() {
        if(executeType==ExecuteType.Run){
            executeCore.run();
        }else if(executeType==ExecuteType.Repair){
            executeCore.repair();
        }

    }
}

执行修复

之后的话,跳转回到这里: 在这里插入图片描述

这个方法的实现的完整代码在这里:

 @Override
    public void repair() {
        //这里的话明确一点,那就是在我们的任务清单有好几种状态,一个是没有状态,还没有执行完毕
        //还有一个是运行状态,对应意外宕机,我们唯一能够做的就是执行修复方法。此时在调用这个方法的
        //时候只有两种情况:1. 任务执行完毕,还没有来得及移除任务队列,2. 在执行复原状态
        //所以现在我们这边主要就是对这个情况进行操作,因为完整地进行跟进复原确实有点难受,先简单这样
        //进行实现,后面再进行优化。这里的话,其实按照道理,我们都要做一套日志系统的,但是这里没有只是简单
        //输出,所以的话,后面可以考虑植入更多的组件,当然更主要的原因是,一开始没有考虑那么多,只是做一个简单
        //的dome出来先看看,后面再把这个部分给拆了,就可以在这里插入更多组件了

        TodoListStateType todoListStateType = todoListExBean.getTodoListStateType();
        if(todoListStateType!=null){
            if(todoListExBean.getTodoListStateType()==TodoListStateType.Repairing){
                //重新继续执行
                if(todoListExBean.getTodoListAfterHandler()!=null){
                    TodoListAfterHandler todoListAfterHandler = todoListExBean.getTodoListAfterHandler();
                    try {
                        todoListAfterHandler.repair(todoListExBean.getStateWrapper());
                    } catch (Exception e) {
                        e.printStackTrace();
                        //这个要是继续执行失败的话,调用它的失败处理器,重复先前的操作
                        if(todoListExBean.getTodoListErrorHandler()!=null){
                            try {
                                TodoListErrorHandler todoListErrorHandler = todoListExBean.getTodoListErrorHandler();
                                todoListErrorHandler.concierge(
                                        todoListExBean.getStateWrapper()
                                );
                            }catch (Exception e1){
                                e1.printStackTrace();
                                if(todoListExBean.getTodoListElementType()==TodoListElementType.StrongConsistency){
                                    todoListExBean.setTodoListStateType(TodoListStateType.Error);
                                    TodoListFailedList.addFailedWithSerializable(todoListExBean);
                                }else {
                                    System.err.println("任务强制终止");
                                }
                            }
                        }

                        //占个坑位,后面做日志组件,配合数据库,其实后面在这个上面做一套Web都是可以的
                        //不过实现Web的话,可能考虑内嵌一个打包好的软件来做了,非要我做,那我就自定义日志
                        //格式,然后用Python快速做一个可视化web应用,解析日志格式,然后显示即可,Python写起来
                        //比较快,主要还是,你也不想,看日志还要打开tomcat吧,并且展示的需求不复杂,对于这种日志查看
                        //还要vue+SB那实在是太SB了,即便是Vue+flask我都嫌弃麻烦,一个Python Gradio 都能搞定的事情
                        //还要老一套太慢了。

                    }
                }
            }else if(todoListExBean.getTodoListStateType()==TodoListStateType.Running){
                //此时还在运行,那么就要看看有哪些方法了
                Map<Integer, TodoItemExBean> sortedMap = todoListExBean.getSortedMap();

                //开始遍历执行清单项
                for(Map.Entry<Integer, TodoItemExBean> entry:sortedMap.entrySet()){
                    //注意我们这里只进行修复,万一重复执行了,扣了两次钱咋办
                    TodoItemExBean value = entry.getValue();
                    if(value.getTodoItemStateType()!=null){
                        TodoItemStateType todoItemStateType = value.getTodoItemStateType();
                        if(todoItemStateType==TodoItemStateType.Repairing){
                            //执行修复,这里的话,我们执行的是方法的后置处理的修复方法
                            if(value.getTodoItemAfterRunningHandler()!=null){
                                TodoItemAfterRunningHandler todoItemAfterRunningHandler = value.getTodoItemAfterRunningHandler();
                                try {
                                    todoItemAfterRunningHandler.repair(
                                            value.getStateWrapper(),todoListExBean.getStateWrapper()
                                    );
                                } catch (Exception e) {
                                    e.printStackTrace();
                                    //同样的,如果这里G了,那么一样大哥也就救不了,Sorry
                                }

                            }

                        }
                    }

                }
            }
        }

这样一来的话,就实现了一套简单的恢复方案。

组件整合

组件整合容器

之后的话,我们现在各个组件就是基本开发完毕了,只是有些东西还没有做,就像这个注释里面说的一样: 在这里插入图片描述

所以现在对组件进行整合就好了。 这个时候的话就要看到这个了,这个的话其实就是一个存放组件的容器: 在这里插入图片描述 我们的很多东西放在这里,后面整合上面配置的时候,要设置拿到上面组件,都可以通过这个来拿到。

启动类

那么之后的话,就是我们的启动了,启动的话顾名思义,就是要把我们这些组件都实例化好,然后才能愉快玩耍,所以的化,这样处理就好了:

package com.huterox.todoscheduler;

import com.huterox.todoscheduler.common.ConfigurationParse;
import com.huterox.todoscheduler.common.TaskManager;
import com.huterox.todoscheduler.common.impl.DefaultTaskManager;
import com.huterox.todoscheduler.common.impl.PropertiesConfigurationParse;
import com.huterox.todoscheduler.config.ConfigEngine;
import com.huterox.todoscheduler.core.ExecuteManager;
import com.huterox.todoscheduler.core.scheduler.SchedulerEx;
import com.huterox.todoscheduler.core.scheduler.impl.DefaultSchedulerEx;
import com.huterox.todoscheduler.core.suports.TodoListTemplateContext;

import java.io.Serializable;

/**
 * 任务执行器入口
 * */
public class HTodoSchedulerApplication implements Serializable {


    public HTodoSchedulerApplication() {
        run();
    }

    private void run(){
        
            //这个是我们处理配置的组件,这个是我们默认的
            ConfigEngine.corePart.put(ConfigurationParse.class,new PropertiesConfigurationParse());
            //这个是清单模板组件,这里面包括创建IOC容器,然后扫描创建得到模板
            ConfigEngine.corePart.put(TodoListTemplateContext.class,new TodoListTemplateContext());

            /*
             *   这是一条分水岭,到这里,初始化完成,可以正常执行
             * */

            //线程池,完成这个任务清单的执行,通过已经得到的模板,执行模板代理对象
            ConfigEngine.corePart.put(TaskManager.class,new DefaultTaskManager());
            //任务提交执行器具,后面启动一个任务清单就用这个,创建模板代理对象,给到TaskManger
            ConfigEngine.corePart.put(ExecuteManager.class, new ExecuteManager());
            //意外的一些补偿,还有失败任务重新执行的定时器,这个的话也是,需要做到这种处理
            ConfigEngine.corePart.put(SchedulerEx.class,new DefaultSchedulerEx());
    }

    public static void main(String[] args) {

    }
}

总结

okey,现在我们代码写完了,之后的化进行测试,以及针对第三方框架做适配就好了,那么接下来的话,就是测试,然后做适配给到SpringBoot,如何在SB项目当中愉快使用到这个。不过这方面的内容的话,我目前是不会写的,因为这个项目其实一开始只是因为我的毕设刚好用得上,但是我还没有写道那里,只是把要用到的写好而已。当然你也可以帮助我完成测试整合,项目地址如下(第三次暗示):gitee.com/Huterox/hto…。 okey,那么接下来继续考研去了,所以博文接下来将继续进入 周更,月更的情况了。至于毕设,难搞的都差不多了,安安心心考研就好了,前期打比赛还浪费了10来天呢!不过,每天干个10小时+对我来说完全没问题,只是这段时间是没有问题的,这段时间当牛马,总比未来继续当牛马好,当然此外,对应考研相关的笔记我也可能会以这种博文的形式进行发布。这个目的主要是为了重复,书读百遍,其意自现。自知自己不是天才,只能不断刷遍数,不断重复。脑子没有,小肝肝还是有的。

全部评论

相关推荐

点赞 评论 收藏
转发
点赞 评论 收藏
转发
点赞 收藏 评论
分享
牛客网
牛客企业服务