Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize binlog event deserialization #11

Draft
wants to merge 20 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@
<includes>
<include>**/*IntegrationTest.java</include>
</includes>
<!-- Needed because the project uses the old mockito version -->
<argLine>--add-opens=java.base/java.lang=ALL-UNNAMED</argLine>
</configuration>
<executions>
<execution>
Expand Down Expand Up @@ -166,6 +168,10 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.4.0</version>
<configuration>
<!-- Fixes https://bugs.openjdk.org/browse/JDK-8223552 -->
<doclint>-html</doclint>
</configuration>
<executions>
<execution>
<id>attach-javadocs</id>
Expand Down
83 changes: 71 additions & 12 deletions src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,7 @@
*/
package com.github.shyiko.mysql.binlog;

import com.github.shyiko.mysql.binlog.event.AnnotateRowsEventData;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventHeader;
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.GtidEventData;
import com.github.shyiko.mysql.binlog.event.MariadbGtidEventData;
import com.github.shyiko.mysql.binlog.event.MariadbGtidListEventData;
import com.github.shyiko.mysql.binlog.event.QueryEventData;
import com.github.shyiko.mysql.binlog.event.RotateEventData;
import com.github.shyiko.mysql.binlog.event.*;
import com.github.shyiko.mysql.binlog.event.deserialization.AnnotateRowsEventDataDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.ChecksumType;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException;
Expand All @@ -37,6 +28,7 @@
import com.github.shyiko.mysql.binlog.event.deserialization.QueryEventDataDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.RotateEventDataDeserializer;
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
import com.github.shyiko.mysql.binlog.io.RawEventsReader;
import com.github.shyiko.mysql.binlog.jmx.BinaryLogClientMXBean;
import com.github.shyiko.mysql.binlog.network.AuthenticationException;
import com.github.shyiko.mysql.binlog.network.Authenticator;
Expand Down Expand Up @@ -174,6 +166,7 @@ public X509Certificate[] getAcceptedIssuers() {
private final Lock connectLock = new ReentrantLock();
private final Lock keepAliveThreadExecutorLock = new ReentrantLock();
private boolean useSendAnnotateRowsEvent;
private boolean useBufferedDeserialization;

private BinaryLogDatabaseVersion databaseVersion;
private int mariaDbSlaveCapability = 4;
Expand Down Expand Up @@ -549,6 +542,10 @@ public void setUseSendAnnotateRowsEvent(boolean useSendAnnotateRowsEvent) {
this.useSendAnnotateRowsEvent = useSendAnnotateRowsEvent;
}

public void setUseBufferedDeserialization(boolean useBufferedDeserialization) {
this.useBufferedDeserialization = useBufferedDeserialization;
}

/**
* @return the configured MariaDB slave compatibility level, defaults to 4.
*/
Expand Down Expand Up @@ -662,7 +659,12 @@ public void connect() throws IOException, IllegalStateException {
ensureGtidEventDataDeserializer();
}
}
listenForEventPackets();

if (useBufferedDeserialization) {
bufferedListenForEventPackets();
} else {
listenForEventPackets();
}
} finally {
connectLock.unlock();
if (notifyWhenDisconnected) {
Expand Down Expand Up @@ -1068,6 +1070,62 @@ private void confirmSupportOfChecksum(ChecksumType checksumType) throws IOExcept
eventDeserializer.setChecksumType(checksumType);
}

protected void bufferedListenForEventPackets() throws IOException {
abortRequest = false;
ByteArrayInputStream inputStream = channel.getInputStream();
RawEventsReader eventsReader = new RawEventsReader(inputStream);
boolean completeShutdown = false;
try {
RawBinaryLogEvent rawEvent;
while (!abortRequest && (rawEvent = eventsReader.nextRawEvent()) != null) {
if (rawEvent.isServerEof() && !blocking) {
completeShutdown = true;
break;
}

Event event;
try {
event = eventDeserializer.deserializeEvent(rawEvent.getEventDataReader());
if (event == null) {
throw new EOFException();
}
} catch (Exception e) {
Throwable cause = e instanceof EventDataDeserializationException ? e.getCause() : e;
if (cause instanceof EOFException || cause instanceof SocketException) {
throw e;
}
if (isConnected()) {
for (LifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onEventDeserializationFailure(this, e);
}
}
continue;
}
if (isConnected()) {
eventLastSeen = System.currentTimeMillis();
updateGtidSet(event);
notifyEventListeners(event);
updateClientBinlogFilenameAndPosition(event);
}
}
} catch (Exception e) {
if (isConnected()) {
for (LifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onCommunicationFailure(this, e);
}
}
} finally {
abortRequest = false;
if (isConnected()) {
if (completeShutdown) {
disconnect(); // initiate complete shutdown sequence (which includes keep alive thread)
} else {
disconnectChannel();
}
}
}
}

protected void listenForEventPackets() throws IOException {
abortRequest = false;
ByteArrayInputStream inputStream = channel.getInputStream();
Expand Down Expand Up @@ -1278,7 +1336,7 @@ public void unregisterEventListener(EventListener eventListener) {
eventListeners.remove(eventListener);
}

private void notifyEventListeners(Event event) {
protected void notifyEventListeners(Event event) {
if (event.getData() instanceof EventDataWrapper) {
event = new Event(event.getHeader(), ((EventDataWrapper) event.getData()).getExternal());
}
Expand Down Expand Up @@ -1388,6 +1446,7 @@ private static boolean tryLockInterruptibly(Lock lock, long time, TimeUnit unit)
private void disconnectChannel() throws IOException {
disconnectChannel(false);
}

private void disconnectChannel(boolean force) throws IOException {
connected = false;
if (channel != null && channel.isOpen()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ public MySqlGtid(UUID serverId, long transactionId) {
}

public static MySqlGtid fromString(String gtid) {
String[] split = gtid.split(":");
String sourceId = split[0];
long transactionId = Long.parseLong(split[1]);
return new MySqlGtid(UUID.fromString(sourceId), transactionId);
int separatorPos = gtid.indexOf(":");
// Consider switching to Long.parseLong(CharSequence, int, int, int) after upgrading to JDK 9 or above
// for a substantial performance boost.
long transactionId = Long.parseLong(gtid.substring(separatorPos + 1));
return new MySqlGtid(UUID.fromString(gtid.substring(0, separatorPos)), transactionId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.github.shyiko.mysql.binlog.event;

import com.github.shyiko.mysql.binlog.event.deserialization.BinaryLogEventDataReader;

import java.nio.ByteBuffer;

/**
* The class represents a fully read binary log event that hasn't been parsed yet. Events spanning multiple
* packets are still represented by a single {@link RawBinaryLogEvent}.
* <p>
* It can also represent an explicit EOF event sent by the server. Such an event doesn't have any data.
*/
public class RawBinaryLogEvent {
private final boolean isServerEof;
private final ByteBuffer buffer;


public BinaryLogEventDataReader getEventDataReader() {
return new BinaryLogEventDataReader(buffer.duplicate());
}

public boolean isServerEof() {
return isServerEof;
}

public static RawBinaryLogEvent createServerEof() {
shirolimit marked this conversation as resolved.
Show resolved Hide resolved
return new RawBinaryLogEvent(null, true);
}

public static RawBinaryLogEvent createDataEvent(byte[] bytes) {
return new RawBinaryLogEvent(bytes, false);
}

private RawBinaryLogEvent(byte[] bytes, boolean isServerEof) {
this.buffer = bytes != null ? ByteBuffer.wrap(bytes) : ByteBuffer.allocate(0);
this.isServerEof = isServerEof;
}
}
Loading