本文主要研究一下SpinalTap的MysqlEventFilter
SpinalTap/spinaltap-mysql/src/main/java/com/airbnb/spinaltap/mysql/event/filter/MysqlEventFilter.java
public abstract class MysqlEventFilter implements Filter<BinlogEvent> {
public static Filter<BinlogEvent> create(
@NonNull final TableCache tableCache,
@NonNull final Set<String> tableNames,
@NonNull final AtomicReference<SourceState> state) {
return ChainedFilter.<BinlogEvent>builder()
.addFilter(new EventTypeFilter())
.addFilter(new TableFilter(tableCache, tableNames))
.addFilter(new DuplicateFilter(state))
.build();
}
}
SpinalTap/spinaltap-mysql/src/main/java/com/airbnb/spinaltap/mysql/event/filter/EventTypeFilter.java
@RequiredArgsConstructor
final class EventTypeFilter extends MysqlEventFilter {
@SuppressWarnings("unchecked")
private static final Set<Class<? extends BinlogEvent>> WHITELISTED_EVENT_TYPES =
ImmutableSet.of(
TableMapEvent.class,
WriteEvent.class,
UpdateEvent.class,
DeleteEvent.class,
XidEvent.class,
QueryEvent.class,
StartEvent.class,
GTIDEvent.class);
public boolean apply(@NonNull final BinlogEvent event) {
return WHITELISTED_EVENT_TYPES.contains(event.getClass());
}
}
SpinalTap/spinaltap-mysql/src/main/java/com/airbnb/spinaltap/mysql/event/filter/TableFilter.java
@RequiredArgsConstructor
final class TableFilter extends MysqlEventFilter {
@NonNull private final TableCache tableCache;
@NonNull private final Set<String> tableNames;
public boolean apply(@NonNull final BinlogEvent event) {
if (event instanceof TableMapEvent) {
TableMapEvent tableMap = (TableMapEvent) event;
return tableNames.contains(
Table.canonicalNameOf(tableMap.getDatabase(), tableMap.getTable()));
} else if (event.isMutation()) {
return tableCache.contains(event.getTableId());
}
return true;
}
}
SpinalTap/spinaltap-mysql/src/main/java/com/airbnb/spinaltap/mysql/event/filter/DuplicateFilter.java
@RequiredArgsConstructor
public final class DuplicateFilter extends MysqlEventFilter {
@NonNull private final AtomicReference<SourceState> state;
public boolean apply(@NonNull final BinlogEvent event) {
// Only applies to mutation events
if (!event.isMutation()) {
return true;
}
// We need to tell if position in `event` and in `state` are from the same source
// MySQL server, because a failover may have happened and we are currently streaming
// from the new master.
// If they are from the same source server, we can just use the binlog filename and
// position (offset) to tell whether we should skip this event.
BinlogFilePos eventBinlogPos = event.getBinlogFilePos();
BinlogFilePos savedBinlogPos = state.get().getLastPosition();
// Use the same logic in BinlogFilePos.compareTo() here...
if (BinlogFilePos.shouldCompareUsingFilePosition(eventBinlogPos, savedBinlogPos)) {
return event.getOffset() > state.get().getLastOffset();
}
// If this point is reached, a master failover might have happened.
// We can only use GTIDSet to tell whether this event should be skipped.
// We should only skip this event if GTIDSet in event is a "proper subset" of the GTIDSet
// in saved state, because it is possible that the last transaction we streamed before the
// failover is in the middle of a transaction.
GtidSet eventGtidSet = eventBinlogPos.getGtidSet();
GtidSet savedGtidSet = savedBinlogPos.getGtidSet();
return !eventGtidSet.isContainedWithin(savedGtidSet) && !eventGtidSet.equals(savedGtidSet);
}
}
MysqlEventFilter提供了create方法,使用ChainedFilter来构造BinlogEvent的Filter,默认添加了EventTypeFilter、TableFilter、DuplicateFilter