不引入调度框架的分布式定期跑同步任务的实现

70次阅读
没有评论

不引入调度框架的分布式定期跑同步任务的实现

需求:

  • 部分租户需要根据其自定义的频率从其他平台向我们系统进行同步商品 并且平台由于新开发的 并没有商品的更新回调可用,所以感知其变化和更新只能通过定时跑来做。
  • 生产环境是分布式环境
  • leader要求先简单实现 所以不引入分布式调度框架

难点分析:

- 同一任务跑多次 没能实现租户自定义的频率,甚至会因为访问过于频繁被小商店服务端当做恶意访问Ban掉
- 竞争同一个资源导致死锁 

实现:

先上时序图吧

不引入调度框架的分布式定期跑同步任务的实现

把需求拆分为:调度问题+同步 同步就是调用api 将返回结果批量的更新进数据库中 ,发消息通知其他服务balabala 都是业务没啥可说的。

  1. 调度 :调度如果完全使用Spring Schedule是不可行的 ,因为在一台服务器上要为多个租户跑定时,可@Schedule默认是单线程的,不支持多个任务,所以就要使用ThreadPoolTaskScheduler了

通过它的shcedule方法 ,可以接受一个task和一个触发器 ,触发器我们可以根据租户定义的频率生成Cron 从而构造一个CronTrigger ,task自然传入我们希望它执行的同步方法(调用api 写库balabala)

不引入调度框架的分布式定期跑同步任务的实现

这样我们就可以实现跑多个任务了 这个方法返回了一个异步的Future对象,根据它的的继承我们可以发现 它继承了Future

不引入调度框架的分布式定期跑同步任务的实现

所以它一定也有对应的cancel isDone等方法来操作、判断这个异步任务的状态,所以我们可以通过维护一个map 来实现对正在执行的task的停止和启动,key使用租户的唯一的标识,value使用这个future对象。

用处:

  • 可以在开启一个租户的任务的时候,检查任务是否已经在map中,如果有则停止这次启动,避免了分布式情况下的多次跑。
  • 根据租户的信息,对应的停止/更新这个租户的任务。
  • 在运行时也可以通过调度线程不断的获取到在跑哪些task,状态如何。

但是上面这些也仅仅是解决了跑多个任务这个需求,task会根据服务的启动和停止 来全部启动/全部停止,并不能动态的更新任务。

  1. 轮询

    其实在管理配置的增删改查在另一个服务,所以要更新task的话就要使用rpc框架,但是rpc发出的http信息又会因为负载均衡 不一定打到哪台服务器,就会出现服务器之间跑的task不一致的情况,所以我采用了轮训的方式,启一个线程每分钟从表中取出数据,并且与上一分钟的数据(存redis)进行对比,从而得到数据的变化(新增 删除 修改)。

不引入调度框架的分布式定期跑同步任务的实现

​ 但是其实麻烦的就是对比两个List的过程,所以我把客户配置对象的equals和hashCode方法重写,从而能判断两个对象是否值相等,不然只能判断地址。又根据equals方法加入了判断list中有无此元素的方法 返回索引。

不引入调度框架的分布式定期跑同步任务的实现

对比的方法:

不引入调度框架的分布式定期跑同步任务的实现

基本思想就是使用equals(List的contain方法实现也是使用了对象本身的equals方法)判断是否变化,用id为依据判断是新增还是修改,每判断完一个就remove掉,最后redis中的list剩下的对象(上一分钟的list)就是现在数据库中没有的 ,也就是被删除了的对象 停止任务即可。最后更新redis。

下面是同步的一些方法 其中syncSpu和syncOrder方法是同步spu和订单的方法 都是异步的。

不引入调度框架的分布式定期跑同步任务的实现

这样目前的实现算是在分布式环境下 对很多情景做了妥协的跑批定时同步任务的一种实践吧,为了防止任务在多台服务器上运行,使用了redission分布式锁,释放时间是个敏感的地方,如果设置的太短就会出现任务没运行完 ,其他实例就能拿到锁进行同步;如果设置太长,而且恰巧这个租户设置的同步频率十分频繁,就会出现下一次任务因为无法拿到锁,无法执行的情况。

但是相比而言,后者可以在设置频率的地方进行限制,不会让他同步的过于频繁,所以释放时间给了200s。

但是直觉上还是感觉目前方案还有很多地方有漏洞 ,比如在感知用户配置变化的时候 对比两次调度间的任务改动后,对任务进行操作后,要是redis没能更新成最新的数据,该怎么回滚;等等 还是有很多不完善的地方 。

(其实我们对接的电商平台要是出个商品变化/修改回调的api 哪有这些事,分分钟解决完了就)

评论(没有评论)