业务流程
首先我们分析下这个流程
用户提交任务。首先将任务推送至延迟队列中。
延迟队列接收到任务后,首先将任务推送至job pool中,然后计算其执行时间。
然后生成延迟任务(仅仅包含任务id)放入某个桶中
时间组件时刻轮询各个桶,当时间到达的时候从job pool中获得任务元信息。
监测任务的合法性如果已经删除则pass。继续轮询。如果任务合法则再次计算时间
如果合法则计算时间,如果时间合法:根据topic将任务放入对应的ready queue,然后从bucket中移除。如果时间不合法,则重新计算时间再次放入bucket,并移除之前的bucket中的内容
消费端轮询对应topic的ready queue。获取job后做自己的业务逻辑。与此同时,服务端将已经被消费端获取的job按照其设定的TTR,重新计算执行时间,并将其放入bucket。
完成消费后,发送finish消息,服务端根据job id删除对应信息。
对象
我们现在可以了解到中间存在的几个组件
延迟队列,为Redis延迟队列。实现消息传递
Job pool 任务池保存job元信息。根据文章描述使用K/V的数据结构,key为ID,value为job
Delay Bucket 用来保存业务的延迟任务。文章中描述使用轮询方式放入某一个Bucket可以知道其并没有使用topic来区分,个人这里默认使用顺序插入
Timer 时间组件,负责扫描各个Bucket。根据文章描述存在多个Timer,但是同一个Timer同一时间只能扫描一个Bucket
Ready Queue 负责存放需要被完成的任务,但是根据描述根据Topic的不同存在多个Ready Queue。
其中Timer负责轮询,Job pool、Delay Bucket、Ready Queue都是不同职责的集合。
任务状态
ready:可执行状态,
delay:不可执行状态,等待时钟周期。
reserved:已被消费者读取,但没有完成消费。
deleted:已被消费完成或者已被删除。
对外提供的接口
额外的内容
首先根据状态状态描述,finish和delete操作都是将任务设置成deleted状态。
根据文章描述的操作,在执行finish或者delete的操作的时候任务已经从元数据中移除,此时deleted状态可能只存在极短时间,所以实际实现中就直接删除了。
文章中并没有说明响应超时后如何处理,所以个人现在将其重新投入了待处理队列。
文章中因为使用了集群,所以使用Redis的setnx锁来保证多个时间循环处理多个桶的时候不会出现重复循环。这里因为是简单的实现,所以就很简单的每个桶设置一个时间队列处理。也是为了方便简单处理。关于分布式锁可以看我之前的文章里面有描述。
实现
现在我们根据设计内容完成设计。这一块设计我们分四步完成
任务及相关对象
目前需要两个对象,一个是任务对象(job)一个负责保存任务引用的对象(delay job),Spring Boot 基础就不介绍了,推荐下这个实战教程:github.com/javastacks/spring-boot-best-practice
任务对象
@Data@AllArgsConstructor@NoArgsConstructorpublicclassJobimplementsSerializable{/***延迟任务的唯一标识,用于检索任务*/@JsonSerialize(using=ToStringSerializer.class)privateLongid;/***任务类型(具体业务类型)*/privateStringtopic;/***任务的延迟时间*/privatelongdelayTime;/***任务的执行超时时间*/privatelongttrTime;/***任务具体的消息内容,用于处理具体业务逻辑用*/privateStringmessage;/***重试次数*/privateintretryCount;/***任务状态*/privateJobStatusstatus;}
任务引用对象
@Data@AllArgsConstructorpublicclassDelayJobimplementsSerializable{/***延迟任务的唯一标识*/privatelongjodId;/***任务的执行时间*/privatelongdelayDate;/***任务类型(具体业务类型)*/privateStringtopic;publicDelayJob(Jobjob){this.jodId=job.getId();this.delayDate=System.currentTimeMillis()+job.getDelayTime();this.topic=job.getTopic();}publicDelayJob(Objectvalue,Doublescore){this.jodId=Long.parseLong(String.valueOf(value));this.delayDate=System.currentTimeMillis()+score.longValue();}}
容器
目前我们需要完成三个容器的创建,Job任务池、延迟任务容器、待完成任务容器
job任务池,为普通的K/V结构,提供基础的操作
@Component@Slf4jpublicclassJobPool{@AutowiredprivateRedisTemplateredisTemplate;privateStringNAME="job.pool";privateBoundHashOperationsgetPool(){BoundHashOperationsops=redisTemplate.boundHashOps(NAME);returnops;}/***添加任务*@paramjob*/publicvoidaddJob(Jobjob){log.info("任务池添加任务:{}",JSON.toJSONString(job));getPool().put(job.getId(),job);return;}/***获得任务*@paramjobId*@return*/publicJobgetJob(LongjobId){Objecto=getPool().get(jobId);if(oinstanceofJob){return(Job)o;}returnnull;}/***移除任务*@paramjobId*/publicvoidremoveDelayJob(LongjobId){log.info("任务池移除任务:{}",jobId);//移除任务getPool().delete(jobId);}}
延迟任务,使用可排序的ZSet保存数据,提供取出最小值等操作
@Slf4j@ComponentpublicclassDelayBucket{@AutowiredprivateRedisTemplateredisTemplate;privatestaticAtomicIntegerindex=newAtomicInteger(0);@Value("${thread.size}")privateintbucketsSize;privateList<String>bucketNames=newArrayList<>();@BeanpublicList<String>createBuckets(){for(inti=0;i<bucketsSize;i++){bucketNames.add("bucket"+i);}returnbucketNames;}/***获得桶的名称*@return*/privateStringgetThisBucketName(){intthisIndex=index.addAndGet(1);inti1=thisIndex%bucketsSize;returnbucketNames.get(i1);}/***获得桶集合*@parambucketName*@return*/privateBoundZSetOperationsgetBucket(StringbucketName){returnredisTemplate.boundZSetOps(bucketName);}/***放入延时任务*@paramjob*/publicvoidaddDelayJob(DelayJobjob){log.info("添加延迟任务:{}",JSON.toJSONString(job));StringthisBucketName=getThisBucketName();BoundZSetOperationsbucket=getBucket(thisBucketName);bucket.add(job,job.getDelayDate());}/***获得最新的延期任务*@return*/publicDelayJobgetFirstDelayTime(Integerindex){Stringname=bucketNames.get(index);BoundZSetOperationsbucket=getBucket(name);Set<ZSetOperations.TypedTuple>set=bucket.rangeWithScores(0,1);if(CollectionUtils.isEmpty(set)){returnnull;}ZSetOperations.TypedTupletypedTuple=(ZSetOperations.TypedTuple)set.toArray()[0];Objectvalue=typedTuple.getValue();if(valueinstanceofDelayJob){return(DelayJob)value;}returnnull;}/***移除延时任务*@paramindex*@paramdelayJob*/publicvoidremoveDelayTime(Integerindex,DelayJobdelayJob){Stringname=bucketNames.get(index);BoundZSetOperationsbucket=getBucket(name);bucket.remove(delayJob);}}
待完成任务,内部使用topic进行细分,每个topic对应一个list集合
@Component@Slf4jpublicclassReadyQueue{@AutowiredprivateRedisTemplateredisTemplate;privateStringNAME="process.queue";privateStringgetKey(Stringtopic){returnNAME+topic;}/***获得队列*@paramtopic*@return*/privateBoundListOperationsgetQueue(Stringtopic){BoundListOperationsops=redisTemplate.boundListOps(getKey(topic));returnops;}/***设置任务*@paramdelayJob*/publicvoidpushJob(DelayJobdelayJob){log.info("执行队列添加任务:{}",delayJob);BoundListOperationslistOperations=getQueue(delayJob.getTopic());listOperations.leftPush(delayJob);}/***移除并获得任务*@paramtopic*@return*/publicDelayJobpopJob(Stringtopic){BoundListOperationslistOperations=getQueue(topic);Objecto=listOperations.leftPop();if(oinstanceofDelayJob){log.info("执行队列取出任务:{}",JSON.toJSONString((DelayJob)o));return(DelayJob)o;}returnnull;}}
轮询处理
设置了线程池为每个bucket设置一个轮询操作
@ComponentpublicclassDelayTimerimplementsApplicationListener<ContextRefreshedEvent>{@AutowiredprivateDelayBucketdelayBucket;@AutowiredprivateJobPooljobPool;@AutowiredprivateReadyQueuereadyQueue;@Value("${thread.size}")privateintlength;@OverridepublicvoidonApplicationEvent(ContextRefreshedEventcontextRefreshedEvent){ExecutorServiceexecutorService=newThreadPoolExecutor(length,length,0L,TimeUnit.MILLISECONDS,newLinkedBlockingQueue<Runnable>());for(inti=0;i<length;i++){executorService.execute(newDelayJobHandler(delayBucket,jobPool,readyQueue,i));}}}
测试请求
/***测试用请求*@authordaify**/@RestController@RequestMapping("delay")publicclassDelayController{@AutowiredprivateJobServicejobService;/***添加*@paramrequest*@return*/@RequestMapping(value="add",method=RequestMethod.POST)publicStringaddDefJob(Jobrequest){DelayJobdelayJob=jobService.addDefJob(request);returnJSON.toJSONString(delayJob);}/***获取*@return*/@RequestMapping(value="pop",method=RequestMethod.GET)publicStringgetProcessJob(Stringtopic){Jobprocess=jobService.getProcessJob(topic);returnJSON.toJSONString(process);}/***完成一个执行的任务*@paramjobId*@return*/@RequestMapping(value="finish",method=RequestMethod.DELETE)publicStringfinishJob(LongjobId){jobService.finishJob(jobId);return"success";}@RequestMapping(value="delete",method=RequestMethod.DELETE)publicStringdeleteJob(LongjobId){jobService.deleteJob(jobId);return"success";}}
测试
添加延迟任务
通过postman请求:localhost:8000/delay/add
此时这条延时任务被添加进了线程池中
2019-08-1221:21:36.589INFO21444---[nio-8000-exec-6]d.samples.redis.delay.container.JobPool:任务池添加任务:{"delayTime":10000,"id":3,"message":"tag:testid:3","retryCount":0,"status":"DELAY","topic":"test","ttrTime":10000}2019-08-1221:21:36.609INFO21444---[nio-8000-exec-6]d.s.redis.delay.container.DelayBucket:添加延迟任务:{"delayDate":1565616106609,"jodId":3,"topic":"test"}
根据设置10秒钟之后任务会被添加至ReadyQueue中
2019-08-1221:21:46.744INFO21444---[pool-1-thread-4]d.s.redis.delay.container.ReadyQueue:执行队列添加任务:DelayJob(jodId=3,delayDate=1565616106609,topic=test)
获得任务
这时候我们请求localhost:8000/delay/pop
这个时候任务被响应,修改状态的同时设置其超时时间,然后放置在DelayBucket中
2019-08-0919:36:02.342INFO58456---[nio-8000-exec-3]d.s.redis.delay.container.ReadyQueue:执行队列取出任务:{"delayDate":1565321728704,"jodId":1,"topic":"测试"}2019-08-0919:36:02.364INFO58456---[nio-8000-exec-3]d.samples.redis.delay.container.JobPool:任务池添加任务:{"delayTime":10000,"id":1,"message":"延迟10秒,超时30秒","retryCount":0,"status":"RESERVED","topic":"测试","ttrTime":30000}2019-08-0919:36:02.384INFO58456---[nio-8000-exec-3]d.s.redis.delay.container.DelayBucket:添加延迟任务:{"delayDate":1565321792364,"jodId":1,"topic":"测试"}
按照设计在30秒后,任务假如没有被消费将会重新放置在ReadyQueue中
@Data@AllArgsConstructorpublicclassDelayJobimplementsSerializable{/***延迟任务的唯一标识*/privatelongjodId;/***任务的执行时间*/privatelongdelayDate;/***任务类型(具体业务类型)*/privateStringtopic;publicDelayJob(Jobjob){this.jodId=job.getId();this.delayDate=System.currentTimeMillis()+job.getDelayTime();this.topic=job.getTopic();}publicDelayJob(Objectvalue,Doublescore){this.jodId=Long.parseLong(String.valueOf(value));this.delayDate=System.currentTimeMillis()+score.longValue();}}0
任务的删除/消费
现在我们请求:localhost:8000/delay/delete
此时在Job pool中此任务将会被移除,此时元数据已经不存在,但任务还在DelayBucket中循环,然而在循环中当检测到元数据已经不存的话此延时任务会被移除。
@Data@AllArgsConstructorpublicclassDelayJobimplementsSerializable{/***延迟任务的唯一标识*/privatelongjodId;/***任务的执行时间*/privatelongdelayDate;/***任务类型(具体业务类型)*/privateStringtopic;publicDelayJob(Jobjob){this.jodId=job.getId();this.delayDate=System.currentTimeMillis()+job.getDelayTime();this.topic=job.getTopic();}publicDelayJob(Objectvalue,Doublescore){this.jodId=Long.parseLong(String.valueOf(value));this.delayDate=System.currentTimeMillis()+score.longValue();}}1