Skip to content

Commit dc527cb

Browse files
Merge pull request #10 from fivetran/negative_time_fix
Fix deserialization for TIME values
2 parents 06d8851 + e4f6b94 commit dc527cb

File tree

2 files changed

+75
-3
lines changed

2 files changed

+75
-3
lines changed

src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AbstractRowsEventDataDeserializer.java

+69-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.io.IOException;
2323
import java.io.Serializable;
2424
import java.math.BigDecimal;
25-
import java.sql.Time;
2625
import java.util.BitSet;
2726
import java.util.Calendar;
2827
import java.util.Map;
@@ -69,6 +68,8 @@ public abstract class AbstractRowsEventDataDeserializer<T extends EventData> imp
6968

7069
private static final int DIG_PER_DEC = 9;
7170
private static final int[] DIG_TO_BYTES = {0, 1, 1, 2, 2, 3, 3, 4, 4, 4};
71+
private static final long TIMEF_OFS = 0x800000000000L;
72+
private static final long TIMEF_INT_OFS = 0x800000;
7273

7374
private final Map<Long, TableMapEventData> tableMapEventByTableId;
7475

@@ -77,6 +78,7 @@ public abstract class AbstractRowsEventDataDeserializer<T extends EventData> imp
7778
private boolean microsecondsPrecision;
7879
private boolean deserializeCharAndBinaryAsByteArray;
7980
private boolean deserializeIntegerAsByteArray;
81+
private boolean deserializeWithNewTimeV2;
8082

8183
public AbstractRowsEventDataDeserializer(Map<Long, TableMapEventData> tableMapEventByTableId) {
8284
this.tableMapEventByTableId = tableMapEventByTableId;
@@ -103,6 +105,10 @@ void setDeserializeIntegerAsByteArray(boolean deserializeIntegerAsByteArray) {
103105
this.deserializeIntegerAsByteArray = deserializeIntegerAsByteArray;
104106
}
105107

108+
void setDeserializeWithNewTimeV2(boolean deserializeWithNewTimeV2) {
109+
this.deserializeWithNewTimeV2 = deserializeWithNewTimeV2;
110+
}
111+
106112
protected Serializable[] deserializeRow(long tableId, BitSet includedColumns, ByteArrayInputStream inputStream)
107113
throws IOException {
108114
TableMapEventData tableMapEvent = tableMapEventByTableId.get(tableId);
@@ -173,7 +179,11 @@ protected Serializable deserializeCell(ColumnType type, int meta, int length, By
173179
case TIME:
174180
return deserializeTime(inputStream);
175181
case TIME_V2:
176-
return deserializeTimeV2(meta, inputStream);
182+
if (deserializeWithNewTimeV2) {
183+
return deserializeTimeV2New(meta, inputStream);
184+
} else {
185+
return deserializeTimeV2(meta, inputStream);
186+
}
177187
case TIMESTAMP:
178188
return deserializeTimestamp(inputStream);
179189
case TIMESTAMP_V2:
@@ -317,6 +327,63 @@ protected Serializable deserializeTimeV2(int meta, ByteArrayInputStream inputStr
317327
return timestamp != null ? convertLongTimestamptWithFSP(timestamp, fsp) : null;
318328
}
319329

330+
protected Serializable deserializeTimeV2New(int meta, ByteArrayInputStream inputStream) throws IOException {
331+
long result;
332+
long intPart;
333+
long fracPart;
334+
335+
switch (meta) {
336+
case 1: case 2:
337+
intPart = bigEndianLong(inputStream.read(3), 0, 3) - TIMEF_INT_OFS;
338+
fracPart = bigEndianLong(inputStream.read(1), 0, 1);
339+
340+
if (intPart < 0 && fracPart != 0) {
341+
/*
342+
Negative values are stored with reverse fractional part order,
343+
for binary sort compatibility.
344+
345+
Disk value intpart fracPart Time value Memory value
346+
800000.00 0 0 00:00:00.00 0000000000.000000
347+
7FFFFF.FF -1 255 -00:00:00.01 FFFFFFFFFF.FFD8F0
348+
7FFFFF.9D -1 99 -00:00:00.99 FFFFFFFFFF.F0E4D0
349+
7FFFFF.00 -1 0 -00:00:01.00 FFFFFFFFFF.000000
350+
7FFFFE.FF -1 255 -00:00:01.01 FFFFFFFFFE.FFD8F0
351+
7FFFFE.F6 -2 246 -00:00:01.10 FFFFFFFFFE.FE7960
352+
353+
Formula to convert fractional part from disk format
354+
(now stored in "fracPart" variable) to absolute value: "0x100 - fracPart".
355+
To reconstruct in-memory value, we shift
356+
to the next integer value and then substruct fractional part.
357+
*/
358+
intPart++; /* Shift to the next integer value */
359+
fracPart -= 0x100; /* -(0x100 - fracPart) */
360+
}
361+
result = (intPart << 24) + (fracPart * 10000);
362+
break;
363+
case 3: case 4:
364+
intPart = bigEndianLong(inputStream.read(3), 0, 3) - TIMEF_INT_OFS;
365+
fracPart = bigEndianLong(inputStream.read(2), 0, 2);
366+
if (intPart < 0 && fracPart != 0) {
367+
/*
368+
Fix reverse fractional part order: "0x10000 - fracPart".
369+
See comments for FSP=1 and FSP=2 above.
370+
*/
371+
intPart++; /* Shift to the next integer value */
372+
fracPart -= 0x10000; /* -(0x10000-fracPart) */
373+
}
374+
result = (intPart << 24) + (fracPart * 100);
375+
break;
376+
case 5: case 6:
377+
result = bigEndianLong(inputStream.read(6), 0, 6) - TIMEF_OFS;
378+
break;
379+
default:
380+
intPart = bigEndianLong(inputStream.read(3), 0, 3) - TIMEF_INT_OFS;
381+
result = intPart << 24;
382+
}
383+
384+
return result;
385+
}
386+
320387
protected Serializable deserializeTimestamp(ByteArrayInputStream inputStream) throws IOException {
321388
long timestamp = inputStream.readLong(4) * 1000;
322389
if (deserializeDateAndTimeAsLong) {

src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ private void ensureCompatibility(EventDataDeserializer eventDataDeserializer) {
217217
deserializer.setDeserializeIntegerAsByteArray(
218218
compatibilitySet.contains(CompatibilityMode.INTEGER_AS_BYTE_ARRAY)
219219
);
220+
deserializer.setDeserializeWithNewTimeV2(compatibilitySet.contains(CompatibilityMode.USE_NEW_TIME_DESERIALIZER));
220221
}
221222
}
222223

@@ -396,7 +397,11 @@ public enum CompatibilityMode {
396397
/**
397398
* Return TINY/SHORT/INT24/LONG/LONGLONG values as byte[]|s (instead of int|s).
398399
*/
399-
INTEGER_AS_BYTE_ARRAY
400+
INTEGER_AS_BYTE_ARRAY,
401+
/**
402+
* Use new time deserializer; fix for negative TIME values
403+
*/
404+
USE_NEW_TIME_DESERIALIZER
400405
}
401406

402407
/**

0 commit comments

Comments
 (0)