共计 2132 个字符,预计需要花费 6 分钟才能阅读完成。
详细介绍:MyData 基于 Web API 的数据集成平台 v0.7.0
部署文档:[用 Docker 部署 MyData v0.7.1](https://www.mydata.work/docs#/./docker/ 用 Docker 部署 MyData)
使用手册:MyData 使用手册 v0.7.1
交流 Q 群:430089673
MyData 后端结构
MyData 的后端由 3 个子服务组成,分别是 管理服务
、 任务服务
、 业务数据服务
;
- 管理服务:通过项目、数据标准、应用 API、环境的管理 配置出同步业务数据的任务;
- 任务服务:根据配置的任务 定时调用应用 API 和数据服务 实现业务数据的传输和存储;
- 数据服务:封装业务数据的隔离机制和读写操作;
依赖的组件:
- MySQL:存储管理数据;
- Redis:缓存管理数据和任务;
- MongoDB;存储业务数据;
下图从数据流角度 展示 3 个子服务的关联:
注:开源版本采用单体 SpringBoot;
任务服务
配置任务
任务主要包括:项目环境、数据标准、应用 API、任务类型、字段映射、任务周期;
- 项目环境:确定应用 API 的统一前缀地址;
- 数据标准:明确集成的业务数据的数据结构;
- 应用 API:业务数据的传输通道;
- 任务类型:明确数据的传输方向,
提供数据
表示从应用 API 读取业务员数据、消费数据
表示向应用 API 发送业务数据; - 字段映射:配置接口响应结构中 与标准数据字段的映射关系;
- 任务周期:定期执行任务的时间间隔,格式为 cron 表达式;
任务流程
数据集成的任务执行流程如下图:
-
任务服务启动时(即 MyData 系统启动),查询所有运行状态的任务;
public class JobExecutor implements ApplicationRunner { ... @Override public void run(ApplicationArguments args) { // 移除已有缓存 jobCache.removeAll(); // 查询已启动的任务 List
tasks = taskService.listRunningTasks(); log.info("tasks.size() =" + tasks.size()); if (CollUtil.isNotEmpty(tasks)) {tasks.forEach(this::startTask); } } ... } -
根据任务的 cron 表达式,计算任务的下次执行时间;
/** * 根据 任务的上次执行时间 和 设定间隔规则,计算任务的 下次执行时间 * * @param taskInfo 定时任务 */ private void calculateNextRunTime(TaskInfo taskInfo) {Assert.notNull(taskInfo); Assert.notEmpty(taskInfo.getTaskPeriod()); Date date = taskInfo.getStartTime(); if (taskInfo.getFailCount() > 0) {date = taskInfo.getNextRunTime(); } CronExpression cronExpression = new CronExpression(taskInfo.getTaskPeriod()); Date nextRunTime = cronExpression.getNextValidTimeAfter(date); taskInfo.setNextRunTime(nextRunTime); }
-
计算任务的下次执行时间 与 当前时间的时间差,以时间差作为缓存失效期 将任务存入 redis 缓存;
/** * 缓存任务 * * @param taskInfo 任务对象 * @throws IllegalArgumentException 缓存时长无效 */ public void cacheJob(TaskInfo taskInfo) throws IllegalArgumentException { // 计算任务缓存有效时长 long expire = DateUtil.between(taskInfo.getStartTime(), taskInfo.getNextRunTime(), DateUnit.SECOND); if (expire <= 0) {throw new IllegalArgumentException(StrUtil.format("expire <= 0, startTime = {}, nextRunTime = {}" , DateUtil.format(taskInfo.getStartTime(), DatePattern.NORM_DATETIME_MS_PATTERN) , DateUtil.format(taskInfo.getNextRunTime(), DatePattern.NORM_DATETIME_MS_PATTERN))); } redisUtil.set(CACHE_TASK + taskInfo.getId(), taskInfo); redisUtil.set(CACHE_JOB + taskInfo.getId(), taskInfo.getId(), expire); taskInfo.appendLog("任务存入 redis,缓存时长 {} 秒", expire); }
-
通过监听 redis 的 key 失效事件,获得待执行的任务;
public class RedisKeyExpiredListener implements MessageListener { private final JobExecutor jobExecutor; @Override public void onMessage(Message message, byte[] pattern) {String expiredKey = message.toString(); if (StrUtil.startWith(expiredKey, JobCache.CACHE_JOB)) {String taskId = StrUtil.subSuf(expiredKey, JobCache.CACHE_JOB.length()); jobExecutor.notify(taskId); } } }
-
将任务加入待执行的线程池,随后即可执行
/** * 任务存入执行队列 * * @param taskInfo 任务 */ private void executeJob(TaskInfo taskInfo) {taskInfo.appendLog("任务存入执行队列"); Runnable runnable = new JobThread(taskInfo); getThreadPoolExecutor().execute(runnable); }
-
根据任务类型分别执行
提供数据
和消费数据
流程;-
提供数据
- 调用应用 API,获取 json 格式数据;
- 根据任务中字段映射 解析 json 为业务数据 Map 集合;
- 调用数据服务 将业务数据存入 MongoDB;
case MdConstant.DATA_PRODUCER: // 调用 api 获取 json String json = ApiUtil.read(taskInfo); // 将 json 按字段映射 解析为业务数据 jobDataService.parseData(taskInfo, json); // 根据条件过滤数据 jobDataFilterService.doFilter(taskInfo); // 保存业务数据 jobDataService.saveTaskData(taskInfo); // 更新环境变量 jobVarService.saveVarValue(taskInfo, json); break;
-
消费数据
- 根据任务所选数据标准,查询业务数据;
- 再根据字段映射,将业务数据 转为指定的 json 对象集合;
- 调用应用 API,传输 json 数据;
case MdConstant.DATA_CONSUMER: List
filters = taskInfo.getDataFilters(); if (CollUtil.isNotEmpty(filters)) { // 解析过滤条件值中的 自定义字符串 parseFilterValue(filters); // 排除值为 null 的条件 filters = filters.stream().filter(filter -> filter.getValue() != null).collect(Collectors.toList()); } // 根据过滤条件 查询数据 String dataCode = taskInfo.getDataCode(); if (StrUtil.isNotEmpty(dataCode)) {List
-
-
保存任务执行日志;
正文完