Advertisement

java定时任务管理系统开源_定时任务管理系统(Quartz和Spring的整合)开源和源码简述(五)...

阅读量:

利用学习的时间这里写了个Spring和Quartz结合的一个web项目,纯后端的项目,restful接口

实现对定时任务的增、删、改、查、停止, 启动、定时规则修改、立即执行等。github地址:holly-quartz-web,这里刚开始是为了学习源码,后来有了一些改动,再后来就想做一些业务上的改造,所以clone了一个quartz-core的项目进行改造,后期打算对其集群方式进行改造等等。github地址:quartz-core,有一起感兴趣的朋友可以一起改造,目前的项目比较简单可以作为学习入门的项目,也可以作为搭建job管理系统的初期项目,慢慢迭代。

在四的时候我们讲了下整体run方法以及集群实现的核心思想,进一步解释这条规则就是:一个调度器实例在执行涉及到分布式问题的数据库操作前,首先要获取QUARTZ2_LOCKS表中对应当前调度器的行级锁,获取锁后即可执行其他表中的数据库操作,随着操作事务的提交,行级锁被释放,供其他调度器实例获取.

集群中的每一个调度器实例都遵循这样一种严格的操作规程,那么对于同一类调度器来说,每个实例对数据库的操作只能是串行的.而不同名的调度器之间却可以并行执行.这节我们看下细节,Quartz的一些设计上的取舍,以及有节点宕机后的job恢复执行(别的服务器节点是怎么接替它的任务的)。

在JobStoreSupport类中有个内部类ClusterManager,ClusterManager也是个Thread,在run方法中

@Override

public void run() {

while (!shutdown) {

if (!shutdown) {

long timeToSleep = getClusterCheckinInterval();

long transpiredTime = (System.currentTimeMillis() - lastCheckin);

timeToSleep = timeToSleep - transpiredTime;

if (timeToSleep <= 0) {

timeToSleep = 100L;

}

if(numFails > 0) {

timeToSleep = Math.max(getDbRetryInterval(), timeToSleep);

}

try {

Thread.sleep(timeToSleep);

} catch (Exception ignore) {

}

}

if (!shutdown && this.manage()) {

signalSchedulingChangeImmediately(0L);

}

}//while !shutdown

}

}

从方法中可以看出 run期间会sleep一会,实际运行的是manage方法

private boolean manage() {

boolean res = false;

try {

res = doCheckin();

numFails = 0;

getLog().debug("ClusterManager: Check-in complete.");

} catch (Exception e) {

if(numFails % 4 == 0) {

getLog().error(

"ClusterManager: Error managing cluster: "

+ e.getMessage(), e);

}

numFails++;

}

return res;

}

实际的方法还是在

protected boolean doCheckin() throws JobPersistenceException {

boolean transOwner = false;

boolean transStateOwner = false;

boolean recovered = false;

Connection conn = getNonManagedTXConnection();

try {

// Other than the first time, always checkin first to make sure there is

// work to be done before we acquire the lock (since that is expensive,

// and is almost never necessary). This must be done in a separate

// transaction to prevent a deadlock under recovery conditions.

List failedRecords = null;

if (!firstCheckIn) {

failedRecords = clusterCheckIn(conn);

commitConnection(conn);

}

if (firstCheckIn || (failedRecords.size() > 0)) {

getLockHandler().obtainLock(conn, LOCK_STATE_ACCESS);

transStateOwner = true;

// Now that we own the lock, make sure we still have work to do.

// The first time through, we also need to make sure we update/create our state record

failedRecords = (firstCheckIn) ? clusterCheckIn(conn) : findFailedInstances(conn);

if (failedRecords.size() > 0) {

getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);

//getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);

transOwner = true;

clusterRecover(conn, failedRecords);

recovered = true;

}

}

commitConnection(conn);

} catch (JobPersistenceException e) {

rollbackConnection(conn);

throw e;

} finally {

try {

releaseLock(LOCK_TRIGGER_ACCESS, transOwner);

} finally {

try {

releaseLock(LOCK_STATE_ACCESS, transStateOwner);

} finally {

cleanupConnection(conn);

}

}

}

firstCheckIn = false;

return recovered;

}

先检查数据库中的,再对自身做一下检查。当发现最后有失败的节点的时候会进行恢复。clusterRecover方法就是进行恢复的方法。

protected void clusterRecover(Connection conn, List failedInstances)

throws JobPersistenceException {

if (failedInstances.size() > 0) {

long recoverIds = System.currentTimeMillis();

logWarnIfNonZero(failedInstances.size(),

"ClusterManager: detected " + failedInstances.size()

+ " failed or restarted instances.");

try {

for (SchedulerStateRecord rec : failedInstances) {

getLog().info(

"ClusterManager: Scanning for instance ""

+ rec.getSchedulerInstanceId()

+ ""'s failed in-progress jobs.");

List firedTriggerRecs = getDelegate()

.selectInstancesFiredTriggerRecords(conn,

rec.getSchedulerInstanceId());

int acquiredCount = 0;

int recoveredCount = 0;

int otherCount = 0;

Set triggerKeys = new HashSet();

for (FiredTriggerRecord ftRec : firedTriggerRecs) {

TriggerKey tKey = ftRec.getTriggerKey();

JobKey jKey = ftRec.getJobKey();

triggerKeys.add(tKey);

// release blocked triggers..

if (ftRec.getFireInstanceState().equals(STATE_BLOCKED)) {

getDelegate()

.updateTriggerStatesForJobFromOtherState(

conn, jKey,

STATE_WAITING, STATE_BLOCKED);

} else if (ftRec.getFireInstanceState().equals(STATE_PAUSED_BLOCKED)) {

getDelegate()

.updateTriggerStatesForJobFromOtherState(

conn, jKey,

STATE_PAUSED, STATE_PAUSED_BLOCKED);

}

// release acquired triggers..

if (ftRec.getFireInstanceState().equals(STATE_ACQUIRED)) {

getDelegate().updateTriggerStateFromOtherState(

conn, tKey, STATE_WAITING,

STATE_ACQUIRED);

acquiredCount++;

} else if (ftRec.isJobRequestsRecovery()) {

// handle jobs marked for recovery that were not fully

// executed..

if (jobExists(conn, jKey)) {

@SuppressWarnings("deprecation")

SimpleTriggerImpl rcvryTrig = new SimpleTriggerImpl(

"recover_"

+ rec.getSchedulerInstanceId()

+ "_"

+ String.valueOf(recoverIds++),

Scheduler.DEFAULT_RECOVERY_GROUP,

new Date(ftRec.getScheduleTimestamp()));

rcvryTrig.setJobName(jKey.getName());

rcvryTrig.setJobGroup(jKey.getGroup());

rcvryTrig.setMisfireInstruction(SimpleTrigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY);

rcvryTrig.setPriority(ftRec.getPriority());

JobDataMap jd = getDelegate().selectTriggerJobDataMap(conn, tKey.getName(), tKey.getGroup());

jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_NAME, tKey.getName());

jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_GROUP, tKey.getGroup());

jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getFireTimestamp()));

jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_SCHEDULED_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getScheduleTimestamp()));

rcvryTrig.setJobDataMap(jd);

rcvryTrig.computeFirstFireTime(null);

storeTrigger(conn, rcvryTrig, null, false,

STATE_WAITING, false, true);

recoveredCount++;

} else {

getLog()

.warn(

"ClusterManager: failed job '"

+ jKey

+ "' no longer exists, cannot schedule recovery.");

otherCount++;

}

} else {

otherCount++;

}

// free up stateful job's triggers

if (ftRec.isJobDisallowsConcurrentExecution()) {

getDelegate()

.updateTriggerStatesForJobFromOtherState(

conn, jKey,

STATE_WAITING, STATE_BLOCKED);

getDelegate()

.updateTriggerStatesForJobFromOtherState(

conn, jKey,

STATE_PAUSED, STATE_PAUSED_BLOCKED);

}

}

getDelegate().deleteFiredTriggers(conn,

rec.getSchedulerInstanceId());

// Check if any of the fired triggers we just deleted were the last fired trigger

// records of a COMPLETE trigger.

int completeCount = 0;

for (TriggerKey triggerKey : triggerKeys) {

if (getDelegate().selectTriggerState(conn, triggerKey).

equals(STATE_COMPLETE)) {

List firedTriggers =

getDelegate().selectFiredTriggerRecords(conn, triggerKey.getName(), triggerKey.getGroup());

if (firedTriggers.isEmpty()) {

if (removeTrigger(conn, triggerKey)) {

completeCount++;

}

}

}

}

logWarnIfNonZero(acquiredCount,

"ClusterManager: ......Freed " + acquiredCount

+ " acquired trigger(s).");

logWarnIfNonZero(completeCount,

"ClusterManager: ......Deleted " + completeCount

+ " complete triggers(s).");

logWarnIfNonZero(recoveredCount,

"ClusterManager: ......Scheduled " + recoveredCount

+ " recoverable job(s) for recovery.");

logWarnIfNonZero(otherCount,

"ClusterManager: ......Cleaned-up " + otherCount

+ " other failed job(s).");

if (!rec.getSchedulerInstanceId().equals(getInstanceId())) {

getDelegate().deleteSchedulerState(conn,

rec.getSchedulerInstanceId());

}

}

} catch (Throwable e) {

throw new JobPersistenceException("Failure recovering jobs: "

+ e.getMessage(), e);

}

}

}

期间集群检查及恢复涉及到核心表示QRTZ_SCHEDULER_STATE表。会有检查的时间和间隔。判断是否故障想必大家很明白了。检查时间长时间没有更新。
0e186dbf1061?from=timeline

故障检测核心表

发现有故障的节点 会接管对应的任务,并将该节点的数据删除以免下次再检测到,然后重复接管。

集群方面基本就是这些内容,细节还得debug代码,好多我自己也不知道。看的出quartz在设计的时候有好多取舍的地方,以数据库为边界的操作,数据库随时有可能发生变化。 如果这时调度器发生了改变,新的trigger添加进来,那么有可能新添加的trigger比当前待执行的trigger更急迫,那么需要放弃当前trigger重新获取,然而,这里存在一个值不值得的问题,如果重新获取新trigger的时间要长于当前时间到新trigger出发的时间,那么即使放弃当前的trigger,仍然会导致xntrigger获取失败,但我们又不知道获取新的trigger需要多长时间,于是,我们做了一个主观的评判,若jobstore为RAM,那么假定获取时间需要7ms,若jobstore是持久化的,假定其需要70ms,当前时间与新trigger的触发时间之差小于这个值的我们认为不值得重新获取。这些都算是取舍吧,还有官方的文档介绍,集群特性对于高cpu使用率的任务效果很好,但是对于大量的短任务,各个节点都会抢占数据库锁,这样就出现大量的线程等待资源.这种情况随着节点的增加会越来越严重. 所以比较适合较长时间执行一次的任务

Only one node will fire the job for each firing. What I mean by that is, if the job has a repeating trigger that tells it to fire every 10 seconds, then at 12:00:00 exactly one node will run the job, and at 12:00:10 exactly one node will run the job, etc. It won't necessarily be the same node each time - it will more or less be random which node runs it. The load balancing mechanism is near-random for busy schedulers (lots of triggers) but favors the same node for non-busy (e.g. few triggers) schedulers.

The clustering feature works best for scaling out long-running and/or cpu-intensive jobs (distributing the work-load over multiple nodes). If you need to scale out to support thousands of short-running (e.g 1 second) jobs, consider partitioning the set of jobs by using multiple distinct schedulers (including multiple clustered schedulers for HA). The scheduler makes use of a cluster-wide lock, a pattern that degrades performance as you add more nodes (when going beyond about three nodes - depending upon your database's capabilities, etc.).

整个分析先到此处,后续计划 会支持注解方式,即在一个类上用注解的方式即可实现任务的添加。集群方式改造,希望引入zookeeper实现。减轻短小任务对数据库的压力以及引入rpc模式实现远程电泳任务等等,后续持续更新。。。

全部评论 (0)

还没有任何评论哟~