Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >聊聊puma的DefaultTaskExecutor

聊聊puma的DefaultTaskExecutor

原创
作者头像
code4it
修改于 2020-06-04 02:30:57
修改于 2020-06-04 02:30:57
1.1K20
代码可运行
举报
文章被收录于专栏:码匠的流水账码匠的流水账
运行总次数:0
代码可运行

本文主要研究一下puma的DefaultTaskExecutor

TaskExecutor

puma/puma/src/main/java/com/dianping/puma/taskexecutor/TaskExecutor.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public interface TaskExecutor extends LifeCycle {
​
    boolean isStop();
​
    boolean isMerging();void stopUntil(long timestamp);void cancelStopUntil();void setContext(PumaContext context);void initContext();
​
    PumaContext getContext();
​
    String getTaskId();void setTaskId(String taskId);
​
    String getTaskName();void setTaskName(String taskName);
​
    String getDefaultBinlogFileName();void setDefaultBinlogFileName(String binlogFileName);
​
    Long getDefaultBinlogPosition();void setDefaultBinlogPosition(Long binlogFileName);void setInstanceStorageManager(InstanceStorageManager holder);
​
    List<Sender> getFileSender();
​
    DataHandler getDataHandler();void resume() throws Exception;void pause() throws Exception;
​
    PumaTaskStateEntity getTaskState();void setTaskState(PumaTaskStateEntity taskState);void setInstanceTask(InstanceTask instanceTask);
​
    InstanceTask getInstanceTask();
​
    TableSet getTableSet();
}
  • TaskExecutor继承了LifeCycle,定义了initContext、getContext等方法

AbstractTaskExecutor

puma/puma/src/main/java/com/dianping/puma/taskexecutor/AbstractTaskExecutor.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@ThreadUnSafe
public abstract class AbstractTaskExecutor implements TaskExecutor {
    private PumaContext context;private String taskId;private long serverId;protected String taskName;protected Date beginTime;protected TableSet tableSet;private String defaultBinlogFileName;private Long defaultBinlogPosition;protected Parser parser;protected DataHandler dataHandler;protected Dispatcher dispatcher;private volatile boolean stop = true;protected InstanceStorageManager instanceStorageManager;protected PumaTaskStateEntity state;protected InstanceManager instanceManager;
​
    @Override
    public String getTaskId() {
        return taskId;
    }
​
    @Override
    public void setTaskId(String taskId) {
        this.taskId = taskId;
    }
​
    @Override
    public String getTaskName() {
        return taskName;
    }
​
    @Override
    public void setTaskName(String taskName) {
        this.taskName = taskName;
    }/**
     * @param instanceStorageManager
     *           the binlogPositionHolder to set
     */
    public void setInstanceStorageManager(InstanceStorageManager instanceStorageManager) {
        this.instanceStorageManager = instanceStorageManager;
    }public void setContext(PumaContext context) {
        this.context = context;
    }public PumaContext getContext() {
        return context;
    }public String getDefaultBinlogFileName() {
        return defaultBinlogFileName;
    }public void setDefaultBinlogFileName(String binlogFileName) {
        this.defaultBinlogFileName = binlogFileName;
    }/**
     * @return the defaultBinlogPosition
     */
    public Long getDefaultBinlogPosition() {
        return defaultBinlogPosition;
    }/**
     * @param defaultBinlogPosition
     *           the defaultBinlogPosition to set
     */
    public void setDefaultBinlogPosition(Long defaultBinlogPosition) {
        this.defaultBinlogPosition = defaultBinlogPosition;
    }/**
     * @param parser
     *           the parser to set
     */
    public void setParser(Parser parser) {
        this.parser = parser;
    }/**
     * @param dataHandler
     *           the dataHandler to set
     */
    public void setDataHandler(DataHandler dataHandler) {
        this.dataHandler = dataHandler;
    }/**
     * @param dispatcher
     *           the dispatcher to set
     */
    public void setDispatcher(Dispatcher dispatcher) {
        this.dispatcher = dispatcher;
    }public long getServerId() {
        return serverId;
    }public void setServerId(long serverId) {
        this.serverId = serverId;
    }public boolean isStop() {
        return stop;
    }protected abstract void doStop() throws Exception;protected abstract void doStart() throws Exception;
​
    @Override
    public void start() {
        try {
            stop = false;
​
            parser.start();
            dataHandler.start();
            dispatcher.start();
            doStart();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
​
    @Override
    public void stop() {
        try {
            stop = true;
​
            parser.stop();
            dataHandler.stop();
            dispatcher.stop();doStop();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }public void resume() throws Exception {
        stop = false;
    }public void pause() throws Exception {
        stop = true;
    }
​
    @Override
    public List<Sender> getFileSender() {
        return dispatcher.getSenders();
    }
​
    @Override
    public DataHandler getDataHandler() {
        return this.dataHandler;
    }public PumaTaskStateEntity getTaskState() {
        return state;
    }public void setTaskState(PumaTaskStateEntity state) {
        this.state = state;
    }public Date getBeginTime() {
        return beginTime;
    }public void setBeginTime(Date beginTime) {
        this.beginTime = beginTime;
    }public TableSet getTableSet() {
        return tableSet;
    }public void setTableSet(TableSet tableSet) {
        this.tableSet = tableSet;
    }public InstanceManager getInstanceManager() {
        return instanceManager;
    }public void setInstanceManager(InstanceManager instanceManager) {
        this.instanceManager = instanceManager;
    }
}
  • AbstractTaskExecutor声明实现TaskExecutor接口,它定义了context、defaultBinlogFileName、defaultBinlogPosition、parser、dataHandler、dispatcher等属性;其start方法执行parser、dataHandler、dispatcher的start方法及doStart方法;其stop方法执行parser、dataHandler、dispatcher的stop方法及doStop方法

DefaultTaskExecutor

puma/puma/src/main/java/com/dianping/puma/taskexecutor/DefaultTaskExecutor.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@ThreadUnSafe
public class DefaultTaskExecutor extends AbstractTaskExecutor {private static final Logger LOG = LoggerFactory.getLogger(DefaultTaskExecutor.class);private SrcDbEntity currentSrcDbEntity;private DefaultTableMetaInfoFetcher tableMetaInfoFetcher;private String encoding = "utf-8";private Socket mysqlSocket;private InputStream is;private OutputStream os;private InstanceTask instanceTask;private boolean merging = false;private long runUntilTimestamp;
​
    @Override
    public void doStart() throws Exception {
        Thread.currentThread().setName("DefaultTaskExecutor-" + taskName);
        long failCount = 0;
        merging = false;
        SystemStatusManager.addServer(getTaskName(), "", 0, tableSet);do {
            try {
                loadServerId(instanceManager.getUrlByCluster(instanceTask.getInstance()));// 读position/file文件
                BinlogInfo binlogInfo = instanceStorageManager.getBinlogInfo(getContext().getPumaServerName());if (binlogInfo == null) {
                    this.currentSrcDbEntity = initSrcDbByServerId(-1);
                    if (beginTime != null) {
                        binlogInfo = getBinlogByTimestamp(beginTime.getTime() / 1000);
                    }
                } else {
                    this.currentSrcDbEntity = initSrcDbByServerId(binlogInfo.getServerId());if (binlogInfo.getServerId() != currentSrcDbEntity.getServerId()) {
                        BinlogInfo oldBinlogInfo = binlogInfo;
                        binlogInfo = getBinlogByTimestamp(oldBinlogInfo.getTimestamp() - 60);
                        if (binlogInfo == null) {
                            throw new IOException("Switch Binlog Failed!");
                        } else {
                            Cat.logEvent("BinlogSwitch", taskName, Message.SUCCESS,
                                    oldBinlogInfo.toString() + " -> " + binlogInfo.toString());
                        }
                    }
                }updateTableMetaInfoFetcher();
                getContext().setMasterUrl(currentSrcDbEntity.getHost(), currentSrcDbEntity.getPort());if (!connect()) {
                    throw new IOException("Connection failed.");
                }initConnect();initBinlogPosition(binlogInfo);if (dumpBinlog()) {
                    processBinlog();
                } else {
                    throw new IOException("Binlog dump failed.");
                }
            } catch (Exception e) {
                if (++failCount % 3 == 0) {
                    this.currentSrcDbEntity = chooseNextSrcDb();
                    updateTableMetaInfoFetcher();
                    failCount = 0;
                }
                String msg = "Exception occurs. taskName: " + getTaskName() + " dbServerId: " + (currentSrcDbEntity == null ? 0 : currentSrcDbEntity.getServerId())
                        + ". Reconnect...";
                LOG.error(msg, e);
                Cat.logError(msg, e);
​
                Thread.sleep(((failCount % 10) + 1) * 2000);
            }
        } while (!isStop() && !Thread.currentThread().isInterrupted());}protected void doStop() throws Exception {
        LOG.info("TaskName: " + getTaskName() + ", Stopped.");
        closeTransport();
        SystemStatusManager.deleteServer(getTaskName());
    }//......}
  • DefaultTaskExecutor继承了AbstractTaskExecutor,其doStart方法通过instanceStorageManager.getBinlogInfo获取binlogInfo,若为null且beginTime不为null则从getBinlogByTimestamp获取binlogInfo,之后执行updateTableMetaInfoFetcher、connect、initConnect、initBinlogPosition、dumpBinlog、processBinlog方法;其doStop方法主要执行closeTransport、SystemStatusManager.deleteServer(getTaskName())方法

getBinlogByTimestamp

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    protected BinlogInfo getBinlogByTimestamp(long time) throws IOException {
        BinlogInfo binlogResult = null;
        Transaction t = Cat.newTransaction("BinlogFindByTime", taskName);
​
        Cat.logEvent("BinlogFindByTime.Time", String.valueOf(time));try {
            if (!connect()) {
                throw new IOException("Connection failed.");
            }
            initConnect();
            List<BinlogInfo> binaryLogs = getBinaryLogs();
​
            Cat.logEvent("BinlogFindByTime.BinaryLogs", currentSrcDbEntity.toString(), Message.SUCCESS, Joiner.on(",").join(binaryLogs));
​
            BinlogInfo closestBinlogInfo = null;for (int k = binaryLogs.size() - 1; k >= 0; k--) {
                if (binlogResult != null) {
                    break;
                }
​
                BinlogInfo newBinlogInfo = binaryLogs.get(k);
​
                Cat.logEvent("BinlogFindByTime.Start", newBinlogInfo.toString());getContext().setDBServerId(currentSrcDbEntity.getServerId());
                getContext().setBinlogFileName(newBinlogInfo.getBinlogFile());
                getContext().setBinlogStartPos(4);
                getContext().setMasterUrl(currentSrcDbEntity.getHost(), currentSrcDbEntity.getPort());if (!connect()) {
                    throw new IOException("Connection failed.");
                }
                initConnect();if (dumpBinlog()) {
                    while (!isStop()) {
                        BinlogPacket binlogPacket = (BinlogPacket) PacketFactory.parsePacket(is,
                                PacketType.BINLOG_PACKET,
                                getContext());if (!binlogPacket.isOk()) {
                            LOG.error("TaskName: " + getTaskName() + ", Binlog packet response error.");
                            throw new IOException("TaskName: " + getTaskName() + ", Binlog packet response error.");
                        } else {
                            BinlogEvent binlogEvent = parser.parse(binlogPacket.getBinlogBuf(), getContext());try {
                                getContext().setNextBinlogPos(binlogEvent.getHeader().getNextPosition());if (binlogEvent.getHeader().getEventType() == BinlogConstants.ROTATE_EVENT) {
                                    if (closestBinlogInfo == null) {
                                        break;
                                    } else {
                                        continue;
                                    }
                                }if (binlogEvent.getHeader().getTimestamp() >= time) {
                                    if (closestBinlogInfo != null) {
                                        binlogResult = closestBinlogInfo;
                                    }
                                    break;
                                }if (binlogEvent.getHeader().getEventType() == BinlogConstants.XID_EVENT
                                        && binlogEvent.getHeader().getTimestamp() < time) {
                                    closestBinlogInfo = new BinlogInfo(
                                            currentSrcDbEntity.getServerId(),
                                            getContext().getBinlogFileName(),
                                            binlogEvent.getHeader().getNextPosition(),
                                            0, binlogEvent.getHeader().getTimestamp());
                                }
                            } finally {
                                if (binlogEvent.getHeader().getEventType() == BinlogConstants.ROTATE_EVENT) {
                                    RotateEvent rotateEvent = (RotateEvent) binlogEvent;
                                    getContext().setBinlogFileName(rotateEvent.getNextBinlogFileName());
                                    getContext().setBinlogStartPos(rotateEvent.getFirstEventPosition());
                                } else {
                                    getContext().setBinlogStartPos(binlogEvent.getHeader().getNextPosition());
                                }
                            }
                        }
                    }
                } else {
                    throw new IOException("Binlog dump failed.");
                }
            }
​
            Cat.logEvent("BinlogFindByTime.Success", taskName, Message.SUCCESS,
                    time + " -> " + (binlogResult == null ? "null" : binlogResult.toString()));
            t.setStatus(Message.SUCCESS);
            t.complete();
            return binlogResult;
        } catch (IOException e) {
            t.setStatus(e);
            t.complete();
            throw e;
        }
    }
  • getBinlogByTimestamp方法先执行connect、initConnect,然后通过getBinaryLogs获取binaryLogs,之后遍历binaryLogs执行dumpBinlog,获取binlogEvent.getHeader().getTimestamp()大于等于指定time的BinlogInfo

connect

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    private boolean connect() {
        try {
            closeTransport();
            this.mysqlSocket = new Socket();
            this.mysqlSocket.setTcpNoDelay(false);
            this.mysqlSocket.setKeepAlive(true);
            this.mysqlSocket.connect(new InetSocketAddress(currentSrcDbEntity.getHost(), currentSrcDbEntity.getPort()));
            this.is = new BufferedInputStream(mysqlSocket.getInputStream());
            this.os = new BufferedOutputStream(mysqlSocket.getOutputStream());
            PacketFactory.parsePacket(is, PacketType.CONNECT_PACKET, getContext());LOG.info("TaskName: " + getTaskName() + ", Connection db success.");return true;
        } catch (Exception e) {
            LOG.error("TaskName: " + getTaskName() + ", Connect failed. Reason: " + e.getMessage());return false;
        }
    }
  • connect方法先执行closeTransport,然后创建mysqlSocket进行connect

initConnect

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    protected void initConnect() throws IOException {
        if (!auth()) {
            throw new IOException("Login failed.");
        }if (getContext().isCheckSum()) {
            if (!updateSetting()) {
                throw new IOException("Update setting command failed.");
            }
        }if (!queryBinlogFormat()) {
            throw new IOException("Query config binlogformat failed.");
        }
        if (!queryBinlogImage()) {
            throw new IOException("Query config binlog row image failed.");
        }if (queryServerId() != currentSrcDbEntity.getServerId()) {
            throw new IOException("Server Id Changed.");
        }
    }private boolean auth() {
        try {
            LOG.info("server logining taskName: " + getTaskName() + " host: " + currentSrcDbEntity.getHost() + " port: " + currentSrcDbEntity.getPort() + " username: "
                    + currentSrcDbEntity.getUsername() + " dbServerId: " + currentSrcDbEntity.getServerId());
            AuthenticatePacket authPacket = (AuthenticatePacket) PacketFactory.createCommandPacket(
                    PacketType.AUTHENTICATE_PACKET, getContext());
​
            authPacket.setPassword(currentSrcDbEntity.getPassword());
            authPacket.setUser(currentSrcDbEntity.getUsername());
            authPacket.buildPacket(getContext());
            authPacket.write(os, getContext());
​
            OKErrorPacket okErrorPacket = (OKErrorPacket) PacketFactory.parsePacket(is, PacketType.OKERROR_PACKET,
                    getContext());
            boolean isAuth;if (okErrorPacket.isOk()) {
                LOG.info("TaskName: " + getTaskName() + ", Server login success.");
                isAuth = true;
            } else {
                isAuth = false;
                LOG.error("TaskName: " + getTaskName() + ", Login failed. Reason: " + okErrorPacket.getMessage());
            }return isAuth;
        } catch (Exception e) {
            LOG.error("TaskName: " + getTaskName() + ", Login failed. Reason: " + e.getMessage());return false;
        }
    }private boolean queryBinlogFormat() throws IOException {
        try {
            QueryExecutor executor = new QueryExecutor(is, os);
            String cmd = "show global variables like 'binlog_format'";
            ResultSet rs = executor.query(cmd, getContext());
            List<String> columnValues = rs.getFiledValues();
            boolean isQuery = true;
            if (columnValues == null || columnValues.size() != 2 || columnValues.get(1) == null) {
                LOG.error("TaskName: " + getTaskName()
                        + ", QueryConfig failed Reason:unexcepted binlog format query result.");
                isQuery = false;
            }
            BinlogFormat binlogFormat = BinlogFormat.valuesOf(columnValues.get(1));
            String eventName = String.format("slave(%s) -- db(%s:%d)", getTaskName(), currentSrcDbEntity.getHost(), currentSrcDbEntity.getPort());
            if (binlogFormat == null || !binlogFormat.isRow()) {
                isQuery = false;
                LOG.error("TaskName: " + getTaskName() + ", Unexcepted binlog format: " + binlogFormat.value);
            }
​
            Cat.logEvent("Slave.dbBinlogFormat", eventName, isQuery ? Message.SUCCESS : "1", "");
            if (isQuery) {
                LOG.info("TaskName: " + getTaskName() + ", Query config binlogformat is legal.");
            }
            return isQuery;
        } catch (Exception e) {
            LOG.error("TaskName: " + getTaskName() + ", QueryConfig failed Reason: " + e.getMessage());
            return false;
        }
    }private boolean queryBinlogImage() throws IOException {
        try {
            QueryExecutor executor = new QueryExecutor(is, os);
            String cmd = "show variables like 'binlog_row_image'";
            ResultSet rs = executor.query(cmd, getContext());
            List<String> columnValues = rs.getFiledValues();
            boolean isQuery = true;
            if (columnValues == null || columnValues.size() == 0) {// 5.1
                isQuery = true;
            } else if (columnValues != null && columnValues.size() == 2 && columnValues.get(1) != null) {// 5.6
                BinlogRowImage binlogRowImage = BinlogRowImage.valuesOf(columnValues.get(1));
                isQuery = true;
                if (binlogRowImage == null || !binlogRowImage.isFull()) {
                    isQuery = false;
                    LOG.error("TaskName: " + getTaskName() + ", Unexcepted binlog row image: " + binlogRowImage.value);
                }
            } else {
                LOG.error("TaskName: " + getTaskName()
                        + ", QueryConfig failed Reason:unexcepted binlog row image query result.");
                isQuery = false;
            }
            String eventName = String.format("slave(%s) -- db(%s:%d)", getTaskName(), currentSrcDbEntity.getHost(), currentSrcDbEntity.getPort());
            Cat.logEvent("Slave.dbBinlogRowImage", eventName, isQuery ? Message.SUCCESS : "1", "");
            if (isQuery) {
                LOG.info("TaskName: " + getTaskName() + ", Query config binlog row image is legal.");
            }
            return isQuery;
        } catch (Exception e) {
            LOG.error("TaskName: " + getTaskName() + ", QueryConfig failed Reason: " + e.getMessage());
            return false;
        }
    }
  • initConnect方法依次执行auth、queryBinlogFormat、queryBinlogImage方法;auth方法进行账号密码校验;queryBinlogFormat主要执行show global variables like 'binlog_format'命令;queryBinlogImage主要执行show variables like 'binlog_row_image'

initBinlogPosition

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    protected void initBinlogPosition(BinlogInfo binlogInfo) throws IOException {
        if (binlogInfo == null) {
            List<BinlogInfo> binaryLogs = getBinaryLogs();
            BinlogInfo begin = beginTime == null ? binaryLogs.get(binaryLogs.size() - 1) : binaryLogs.get(0);
            binlogInfo = new BinlogInfo(currentSrcDbEntity.getServerId(), begin.getBinlogFile(), 4l, 0, begin.getTimestamp());
        }getContext().setDBServerId(currentSrcDbEntity.getServerId());
        getContext().setBinlogFileName(binlogInfo.getBinlogFile());
        getContext().setBinlogStartPos(binlogInfo.getBinlogPosition());
        setBinlogInfo(binlogInfo);
​
        SystemStatusManager.addServer(getTaskName(), currentSrcDbEntity.getHost(), currentSrcDbEntity.getPort(), tableSet);
        SystemStatusManager.updateServerBinlog(getTaskName(), binlogInfo);
    }
  • initBinlogPosition主要是将binlogInfo信息设置到PumaContext中

dumpBinlog

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    private boolean dumpBinlog() {
        try {
            ComBinlogDumpPacket dumpBinlogPacket = (ComBinlogDumpPacket) PacketFactory.createCommandPacket(
                    PacketType.COM_BINLOG_DUMP_PACKET, getContext());
            dumpBinlogPacket.setBinlogFileName(getContext().getBinlogFileName());
            dumpBinlogPacket.setBinlogFlag(0);
            dumpBinlogPacket.setBinlogPosition(getContext().getBinlogStartPos());
            dumpBinlogPacket.setServerId(getServerId());
            dumpBinlogPacket.buildPacket(getContext());
​
            dumpBinlogPacket.write(os, getContext());
​
            OKErrorPacket dumpCommandResultPacket = (OKErrorPacket) PacketFactory.parsePacket(is,
                    PacketType.OKERROR_PACKET, getContext());if (dumpCommandResultPacket.isOk()) {
                LOG.info("TaskName: " + getTaskName() + ", Dump binlog command success.");return true;
            } else {
                LOG.error("TaskName: " + getTaskName() + ", Dump binlog failed. Reason: "
                        + dumpCommandResultPacket.getMessage());return false;
            }
        } catch (Exception e) {
            LOG.error("TaskName: " + getTaskName() + " Dump binlog failed. Reason: " + e.getMessage());return false;
        }}
  • dumpBinlog方法主要是发送COM_BINLOG_DUMP_PACKET

processBinlog

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    private void processBinlog() throws IOException {
        while (!isStop()) {
            BinlogPacket binlogPacket = (BinlogPacket) PacketFactory.parsePacket(is, PacketType.BINLOG_PACKET,
                    getContext());if (!binlogPacket.isOk()) {
                LOG.error("TaskName: " + getTaskName() + ", Binlog packet response error.");
                throw new IOException("TaskName: " + getTaskName() + ", Binlog packet response error.");
            } else {
                processBinlogPacket(binlogPacket);
            }
        }
    }protected void processBinlogPacket(BinlogPacket binlogPacket) throws IOException {
        BinlogEvent binlogEvent = parser.parse(binlogPacket.getBinlogBuf(), getContext());if (merging) {
            if (binlogEvent.getHeader().getTimestamp() >= runUntilTimestamp) {
                stop();
            }
        }
​
        SystemStatusManager.incServerParsedCounter(getTaskName());if (binlogEvent.getHeader().getEventType() == BinlogConstants.INTVAR_EVENT
                || binlogEvent.getHeader().getEventType() == BinlogConstants.RAND_EVENT
                || binlogEvent.getHeader().getEventType() == BinlogConstants.USER_VAR_EVENT) {
            LOG.error("TaskName: " + getTaskName() + ", Binlog_format is MIXED or STATEMENT ,System is not support.");
            String eventName = String.format("slave(%s) -- db(%s:%d)", getTaskName(), currentSrcDbEntity.getHost(), currentSrcDbEntity.getPort());
            Cat.logEvent("Slave.dbBinlogFormat", eventName, "1", "");
            Cat.logError("Puma.server.mixedorstatement.format", new IllegalArgumentException("TaskName: "
                    + getTaskName() + ", Binlog_format is MIXED or STATEMENT ,System is not support."));
            stopTask();
        }if (binlogEvent.getHeader().getEventType() != BinlogConstants.FORMAT_DESCRIPTION_EVENT) {
            getContext().setNextBinlogPos(binlogEvent.getHeader().getNextPosition());
        }if (binlogEvent.getHeader().getEventType() == BinlogConstants.ROTATE_EVENT) {
            processRotateEvent(binlogEvent);
        } else {
            processDataEvent(binlogEvent);
        }
    }protected void processRotateEvent(BinlogEvent binlogEvent) {
        RotateEvent rotateEvent = (RotateEvent) binlogEvent;
        getContext().setBinlogFileName(rotateEvent.getNextBinlogFileName());
        getContext().setBinlogStartPos(rotateEvent.getFirstEventPosition());
    }protected void processDataEvent(BinlogEvent binlogEvent) {
        DataHandlerResult dataHandlerResult = null;
        // 一直处理一个binlogEvent的多行,处理完每行马上分发,以防止一个binlogEvent包含太多ChangedEvent而耗费太多内存
        int eventIndex = 0;
        do {
            dataHandlerResult = dataHandler.process(binlogEvent, getContext());
            if (dataHandlerResult != null && !dataHandlerResult.isEmpty()) {
                ChangedEvent changedEvent = dataHandlerResult.getData();
​
                changedEvent.getBinlogInfo().setEventIndex(eventIndex++);updateOpsCounter(changedEvent);dispatch(changedEvent);
            }
        } while (dataHandlerResult != null && !dataHandlerResult.isFinished());if (binlogEvent.getHeader().getEventType() != BinlogConstants.FORMAT_DESCRIPTION_EVENT) {
            getContext().setBinlogStartPos(binlogEvent.getHeader().getNextPosition());
            setBinlogInfo(new BinlogInfo(getBinlogInfo().getServerId(), getBinlogInfo().getBinlogFile(), binlogEvent
                    .getHeader().getNextPosition(), 0, 0));
        }
​
        BinlogInfo binlogInfo = new BinlogInfo(getContext().getDBServerId(), getContext()
                .getBinlogFileName(), binlogEvent.getHeader().getNextPosition(), 0, binlogEvent.getHeader().getTimestamp());
        SystemStatusManager.updateServerBinlog(getTaskName(), binlogInfo);if (binlogEvent.getHeader().getNextPosition() != 0
                && StringUtils.isNotBlank(getContext().getBinlogFileName())
                && dataHandlerResult != null
                && !dataHandlerResult.isEmpty()
                && (dataHandlerResult.getData() instanceof DdlEvent || (dataHandlerResult.getData() instanceof RowChangedEvent && ((RowChangedEvent) dataHandlerResult
                .getData()).isTransactionCommit()))) {
​
​
            instanceStorageManager.setBinlogInfo(getTaskName(), binlogInfo);
        }
    }

- processBinlog方法循环接收binlogPacket,然后执行processBinlogPacket;该方法通过parser.parse获取binlogEvent,对于FORMAT_DESCRIPTION_EVENT,则更新binlogEvent.getHeader().getNextPosition()到context中;对于ROTATE_EVENT则执行processRotateEvent,否则执行processDataEvent;processRotateEvent主要是更新binlogFileName及binlogStartPos;processDataEvent则主要是通过dataHandler.process(binlogEvent, getContext())处理,然后执行dispatch(changedEvent)

closeTransport

puma/puma/src/main/java/com/dianping/puma/taskexecutor/DefaultTaskExecutor.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    private void closeTransport() {
        // Close in.
        try {
            if (this.is != null) {
                this.is.close();
            }
        } catch (IOException ioEx) {
            LOG.warn("Server " + this.getTaskName() + ", Failed to close the input stream.");
        } finally {
            this.is = null;
        }// Close os.
        try {
            if (this.os != null) {
                this.os.close();
            }
        } catch (IOException ioEx) {
            LOG.warn("Server " + this.getTaskName() + ", Failed to close the output stream");
        } finally {
            this.os = null;
        }// Close socket.
        try {
            if (this.mysqlSocket != null) {
                this.mysqlSocket.close();
            }
        } catch (IOException ioEx) {
            LOG.warn("Server " + this.getTaskName() + ", Failed to close the socket", ioEx);
        } finally {
            this.mysqlSocket = null;
        }
    }
  • closeTransport主要是关闭InputStream、OutputStream及mysqlSocket

小结

DefaultTaskExecutor继承了AbstractTaskExecutor,其doStart方法通过instanceStorageManager.getBinlogInfo获取binlogInfo,若为null且beginTime不为null则从getBinlogByTimestamp获取binlogInfo,之后执行updateTableMetaInfoFetcher、connect、initConnect、initBinlogPosition、dumpBinlog、processBinlog方法;其doStop方法主要执行closeTransport、SystemStatusManager.deleteServer(getTaskName())方法

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
2 条评论
热度
最新
谢谢楼主分享,请问,如果要实现倍数播放,加速或者慢速播放,应该怎么去实现
谢谢楼主分享,请问,如果要实现倍数播放,加速或者慢速播放,应该怎么去实现
回复回复点赞举报
感谢分享,正在学习这个~
感谢分享,正在学习这个~
回复回复点赞举报
推荐阅读
编辑精选文章
换一批
FFmpeg+SDL2 实现简易音视频同步播放器
这是实现的简易播放器的线程模型,通过这张图再结合我们之前博客中学习的内容,基本可以了解播放器的一个整体运行流程。具体代码也是根据这张图来实现。
字节流动
2023/09/04
6400
FFmpeg+SDL2 实现简易音视频同步播放器
ffplay源码分析4-音视频同步
音视频同步的目的是为了使播放的声音和显示的画面保持一致。视频按帧播放,图像显示设备每次显示一帧画面,视频播放速度由帧率确定,帧率指示每秒显示多少帧;音频按采样点播放,声音播放设备每次播放一个采样点,声音播放速度由采样率确定,采样率指示每秒播放多少个采样点。如果仅仅是视频按帧率播放,音频按采样率播放,二者没有同步机制,即使最初音视频是基本同步的,随着时间的流逝,音视频会逐渐失去同步,并且不同步的现象会越来越严重。这是因为:一、播放时间难以精确控制,二、异常及误差会随时间累积。所以,必须要采用一定的同步策略,不断对音视频的时间差作校正,使图像显示与声音播放总体保持一致。
叶余
2019/04/02
2.3K1
ffplay源码分析4-音视频同步
ffmpeg 源码分析之 ffplay 主流程
很久没有研究 ffmpeg了,版本变化很大,用一天时间先把 ffplay 翻出来看看。
字节流动
2021/06/09
9780
FFmpeg简易播放器的实现-音视频同步
前面四次实验,从最简入手,循序渐进,研究播放器的实现过程。第四次实验,虽然音频和视频都能播放出来,但是声音和图像无法同步,而没有音视频同步的播放器只是属于概念性质的播放器,无法实际使用。本次实验将实现音频和视频的同步,这样,一个能够实际使用的简易播放器才算初具雏形,在这个基础上,后续可再进行完善和优化。
叶余
2019/04/02
3.3K0
FFmpeg简易播放器的实现-音视频同步
ffplay之read_thread线程里的for循环读取数据源码解读
大家好,我是小涂,今天继续给大家分享ffplay源码解析,今天也是最后一篇关于read_thread线程的解析,分享完这个之后,会接着分享视频和音频解码线程以及音频输出、视频输出模块,大概率每个礼拜一篇,很快就会进入到实战篇写一个播放器,前期解析ffplay源码,主要是要先了解这个优秀的播放器框架,后期我们就可以在这个基础上借鉴前人的优秀思想,来做一个自己的播放器。
用户6280468
2022/03/21
1K0
ffplay之read_thread线程里的for循环读取数据源码解读
ffplay源码分析2-数据结构
栈(LIFO)是一种表,队列(FIFO)也是一种表。数组是表的一种实现方式,链表也是表的一种实现方式,例如FIFO既可以用数组实现,也可以用链表实现。PacketQueue是用链表实现的一个FIFO。
叶余
2019/04/02
1.2K0
ffplay源码分析2-数据结构
ffplay播放器原理剖析
*****************************************************************************
全栈程序员站长
2022/09/07
8260
播放器实战之ffplay数据结构解析
大家好,我是小涂,昨天晚上给大家进行了一场直播,这次直播内容主要分享了一些自己的学习方法和一些简单的理财分享,中途又再次出现了一次意外,原本是在b站上来进行直播的,后面有回音,影响直播效果,所以就备战到视频号了,后期直播就在视频号进行了,同时如果下次有直播会提前通知大家:
用户6280468
2022/03/21
7590
播放器实战之ffplay数据结构解析
FFmpeg4.0+SDL2.0笔记05:Synching Video
背景:在系统性学习FFmpeg时,发现官方推荐教程还是15年的,不少接口已经弃用,大版本也升了一级,所以在这里记录下FFmpeg4.0+SDL2.0的学习过程。
非一
2021/04/09
7410
ffplay.c 源码分析- 时间同步
之前我们对单独的音频和视频的播放进行了分析。 但是实际上播放一段影片,还需要音视频同步播放。
deep_sadness
2018/12/18
1.5K0
Ffplay源码read_thread解读(一)
大家好,我是小涂,今天继续给大家分享ffplay播放器里面的源码解读,今天原本想和大家一起解读一下下面这个三个线程函数:
用户6280468
2022/03/21
5460
Ffplay源码read_thread解读(一)
ffplay源码分析3-代码框架
本文为作者原创,转载请注明出处:https://www.cnblogs.com/leisure_chn/p/10301831.html
叶余
2019/04/02
3.3K2
音视频技术(6)-iOS音视频同步
这部分内容较多,涉及多个线程协同实现“解复用”、“解音频帧”、“解视频”、“音频&视频渲染”,前后研究了两周多,还有些代码没理解为什么这么写。 盗用一张@MzDavid画的图,逻辑很清楚
公号sumsmile
2020/03/27
2.3K2
ffplay播放器之解码器源码解读!
大家好,我是小涂,今天继续给大家分享播放器里面的相关知识,本篇文章主要是分享ffplay里面的视频解码线程相关源码,废话就不多说,开始开肝!
用户6280468
2022/03/21
1.1K0
ffplay播放器之解码器源码解读!
IjkPlayer初始化过程
最近调研做视频秒开,使用B站开源的ijkplayer作为播放器。ijkplayer基于ffmpeg的播放器。
None_Ling
2018/10/24
2K0
ffplay.c 源码分析- 音频部分
1. 读取线程-read_thread 在main方法中会启动的读取的线程。 这个和视频的线程模型中是一致的。不同的是,循环读取的数据是音频数据。
deep_sadness
2018/12/18
1.4K0
基于 FFmpeg 的 Cocos Creator 视频播放器
腾讯开心鼠项目使用的游戏引擎是 Cocos Creator,由于引擎提供的视频组件实现方式问题导致视频组件和游戏界面分了层,从而导致了以下若干问题:
陈皮皮
2021/01/04
6.5K2
基于 FFmpeg 的 Cocos Creator 视频播放器
Ffplay源码之read_thread解析(二)
大家好,我是小涂,本周继续给大家分享ffplay中的read_thread这个线程源码的解读,这算是自己的一个学习记录过程吧。
用户6280468
2022/03/21
6600
Ffplay源码之read_thread解析(二)
音视频八股文(6)-- ffmpeg大体介绍和内存模型
• 容器/文件(Conainer/File):即特定格式的多媒体文件, 比如mp4、flv、mkv等。
福大大架构师每日一题
2023/06/09
5230
音视频八股文(6)-- ffmpeg大体介绍和内存模型
ffplay源码分析7-播放控制
暂停/继续状态的切换是由用户按空格键实现的,每按一次空格键,暂停/继续的状态翻转一次。
叶余
2019/04/02
1.3K0
相关推荐
FFmpeg+SDL2 实现简易音视频同步播放器
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验