开发背景: 大数据部门需要其他各个业务部门业务表的数据,并且需要做到与其他业务部门业务表达到数据一致,也就会有与其他业务表达到数据同步这么一个过程,本组件的开发这是用来实现与各个业务部门表数据做到数据同步
实现原理: Mysql Binlog日志文件:(每次对mysql进行一次写请求,都会写一次二进制日志,记录Mysql具体执行了什么操作,mysql主从复制就是基于binlog日志的) 本组件通过解析Binlog文件得到mysql对那张表具体执行了什么操作,然后将数据封装到MessageQueueDTO中,然后推消息到对应的kafak消息队列,对应的kafak监听器监听到消息,根据表名将Mysql执行的具体操作分发到对应的service进行相应的处理
业务service标记表名枚举:MonitorTableName 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import org.springframework.stereotype.Indexed;import java.lang.annotation.*;@Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Indexed public @interface MonitorTableName { String value () default "" ; }
各服务操作基础接口:IMonitorOperateService 实现了该接口的service或者继承了实现该接口的类都会被后置处理器放入到ConcurrentHashMap中,key为对应的service,值为service对应的class
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import com.kafakdemo.demo.dto.MessageQueueDTO;public interface IMonitorOperateService { void create (MessageQueueDTO messageQueueDTO) ; void update (MessageQueueDTO messageQueueDTO) ; void delete (MessageQueueDTO messageQueueDTO) ; }
ETL监听后置处理器: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 import org.springframework.aop.support.AopUtils;import org.springframework.beans.factory.config.BeanPostProcessor;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;public abstract class AbstractEtlListenerBeanPostProcessor implements BeanPostProcessor { private final Map<IMonitorOperateService, Class<?>> lstMonitorOperateServices = new ConcurrentHashMap<>(); @Override public Object postProcessAfterInitialization (Object bean, String beanName) { if (bean instanceof IMonitorOperateService) { synchronized (this .lstMonitorOperateServices) { if (this .lstMonitorOperateServices.containsKey(bean)) { return bean; } this .lstMonitorOperateServices.put((IMonitorOperateService) bean, AopUtils.getTargetClass(bean)); } } return bean; } protected Map<IMonitorOperateService, Class<?>> getLstMonitorOperateServices() { return lstMonitorOperateServices; } }
对IMonitorOperateService的基础实现类: IMonitorOperateService接口的一个简单实现,各个service只需要继承该类就可以被加载到ConcurrentHashMap中,这样写的好处是,不需要在各个service中进行打印日志,减轻了些许代码量,并且提供了返回MessageQueueDTO中实体集合的父类方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 import com.fasterxml.jackson.core.type.TypeReference;import com.kafakdemo.demo.dto.MessageQueueDTO;import com.kafakdemo.demo.util.BaseUtils;import lombok.extern.slf4j.Slf4j;import java.util.ArrayList;import java.util.List;import java.util.Objects;@Slf4j public abstract class AbstractMonitorOperateService implements IMonitorOperateService { @Override public void create (MessageQueueDTO messageQueueDTO) { log.debug("---过滤新增数据表名:{}" , messageQueueDTO.getTable()); } @Override public void delete (MessageQueueDTO messageQueueDTO) { log.debug("---过滤删除数据表名:{}" , messageQueueDTO.getTable()); } @Override public void update (MessageQueueDTO messageQueueDTO) { log.debug("---过滤更新数据表名:{}" , messageQueueDTO.getTable()); } protected <T> List<T> parseArray (MessageQueueDTO messageQueueDTO, TypeReference<List<T>> typeReference) { if (Objects.isNull(messageQueueDTO)) { return new ArrayList<>(0 ); } return BaseUtils.parseArray(messageQueueDTO.getData(), typeReference); } protected <T> List<T> parseOldArray (MessageQueueDTO messageQueueDTO, TypeReference<List<T>> typeReference) { if (Objects.isNull(messageQueueDTO)) { return new ArrayList<>(0 ); } return BaseUtils.parseArray(messageQueueDTO.getOld(), typeReference); } }
具体操作处理工厂: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import com.kafakdemo.demo.dto.MessageQueueDTO;import com.kafakdemo.demo.enums.DataBaseOperateTypeEnum;import org.springframework.stereotype.Component;import java.util.List;@Component public class DefaultOperateFactory { public void doExecute (MessageQueueDTO messageQueueDTO, List<IMonitorOperateService> lstTargets) { boolean flag = messageQueueDTO.isFromException(); String className = messageQueueDTO.getClassName(); String type = messageQueueDTO.getType().toUpperCase(); for (IMonitorOperateService monitorOperateService : lstTargets) { messageQueueDTO.setFromException(flag); messageQueueDTO.setClassName(className); if (DataBaseOperateTypeEnum.INSERT.getType().equals(type)) { monitorOperateService.create(messageQueueDTO); } else if (DataBaseOperateTypeEnum.UPDATE.getType().equals(type)) { monitorOperateService.update(messageQueueDTO); } else if (DataBaseOperateTypeEnum.DELETE.getType().equals(type)) { monitorOperateService.delete(messageQueueDTO); } } } }
监听调用策略类: 继承了AbstractEtlListenerBeanPostProcessor类,调用了父类的getLstMonitorOperateServicesByTableName(String tableName)方法,得到了所有实现了IMonitorOperateService或者继承AbstractMonitorOperateService的service,拿到service上标注的MonitorTableName注解的value值,根据表名拿到对应的service
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 import com.google.common.collect.Lists;import com.kafakdemo.demo.annotation.MonitorTableName;import org.apache.commons.collections.MapUtils;import org.apache.commons.lang3.StringUtils;import org.springframework.stereotype.Component;import java.util.Iterator;import java.util.List;import java.util.Map;import java.util.Objects;@Component public class EtlListenerServiceStrategy extends AbstractEtlListenerBeanPostProcessor { public List<IMonitorOperateService> getLstMonitorOperateServicesByTableName (String tableName) { List<IMonitorOperateService> lstTargets = Lists.newArrayList(); if (StringUtils.isEmpty(tableName)) { return lstTargets; } Map<IMonitorOperateService, Class<?>> lstMonitorOperateServices = getLstMonitorOperateServices(); if (MapUtils.isNotEmpty(lstMonitorOperateServices)) { Iterator<Map.Entry<IMonitorOperateService, Class<?>>> monitorOperateServiceIterators = lstMonitorOperateServices.entrySet().iterator(); while (monitorOperateServiceIterators.hasNext()) { Map.Entry<IMonitorOperateService, Class<?>> monitorOperateServiceEntry = monitorOperateServiceIterators.next(); MonitorTableName monitorTableName = monitorOperateServiceEntry.getValue().getAnnotation(MonitorTableName.class); if (Objects.nonNull(monitorTableName) && tableName.equals(monitorTableName.value())) { lstTargets.add(monitorOperateServiceEntry.getKey()); } } } return lstTargets; } }
监控操作分发器: 根据表名拿到具体的service,然后执行具体的操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import com.kafakdemo.demo.dto.MessageQueueDTO;import lombok.extern.slf4j.Slf4j;import org.apache.commons.collections.CollectionUtils;import org.springframework.stereotype.Component;import javax.annotation.Resource;import java.util.List;@Component @Slf4j public class MonitorOperateDispatcher { @Resource EtlListenerServiceStrategy etlListenerServiceStrategy; @Resource DefaultOperateFactory defaultOperateFactory; public void execute (MessageQueueDTO messageQueueDTO) { List<IMonitorOperateService> lstMonitorOperateServices = etlListenerServiceStrategy.getLstMonitorOperateServicesByTableName(messageQueueDTO.getTable()); if (CollectionUtils.isEmpty(lstMonitorOperateServices)) { log.warn("没有配置当前数据表,是否需要配置请业务方确认,表名:{}" , messageQueueDTO.getTable()); return ; } defaultOperateFactory.doExecute(messageQueueDTO, lstMonitorOperateServices); } }