Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion .github/workflows/core-hadoop3-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,15 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
jdk: [ '8', '11' ]
jdk: [ 8, 11, 17 ]
spark: [ '3.3', '3.5' ]
exclude:
- jdk: 8
spark: '3.5'
- jdk: 11
spark: '3.5'
- jdk: 17
spark: '3.3'
name: Build Amoro with JDK ${{ matrix.jdk }} Spark-${{ matrix.spark }}
steps:
- uses: actions/checkout@v3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.iceberg.util.StructLikeMap;
import org.apache.iceberg.util.StructLikeSet;

import java.io.IOException;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -61,7 +63,7 @@ public class KeyedTableDataView extends AbstractTableDataView {

private final StructLikeMap<Record> view;

private final List<RecordWithAction> changeLog = new ArrayList<>();
private final List<TestRecordWithAction> changeLog = new ArrayList<>();

private final RandomRecordGenerator generator;

Expand Down Expand Up @@ -129,11 +131,11 @@ public KeyedTableDataView(

public WriteResult append(int count) throws IOException {
Preconditions.checkArgument(count <= primaryUpperBound - view.size());
List<RecordWithAction> records = new ArrayList<>();
List<TestRecordWithAction> records = new ArrayList<>();
for (int i = 0; i < primaryUpperBound; i++) {
Record record = generator.randomRecord(i);
if (!view.containsKey(record)) {
records.add(new RecordWithAction(record, ChangeAction.INSERT));
records.add(new TestRecordWithAction(record, ChangeAction.INSERT));
}
if (records.size() == count) {
break;
Expand All @@ -144,39 +146,39 @@ public WriteResult append(int count) throws IOException {

public WriteResult upsert(int count) throws IOException {
List<Record> scatter = randomRecord(count);
List<RecordWithAction> upsert = new ArrayList<>();
List<TestRecordWithAction> upsert = new ArrayList<>();
for (Record record : scatter) {
upsert.add(new RecordWithAction(record, ChangeAction.DELETE));
upsert.add(new RecordWithAction(record, ChangeAction.INSERT));
upsert.add(new TestRecordWithAction(record, ChangeAction.DELETE));
upsert.add(new TestRecordWithAction(record, ChangeAction.INSERT));
}
return doWrite(upsert);
}

public WriteResult cdc(int count) throws IOException {
List<Record> scatter = randomRecord(count);
List<RecordWithAction> cdc = new ArrayList<>();
List<TestRecordWithAction> cdc = new ArrayList<>();
for (Record record : scatter) {
if (view.containsKey(record)) {
if (random.nextBoolean()) {
// delete
cdc.add(new RecordWithAction(view.get(record), ChangeAction.DELETE));
cdc.add(new TestRecordWithAction(view.get(record), ChangeAction.DELETE));
} else {
// update
cdc.add(new RecordWithAction(view.get(record), ChangeAction.UPDATE_BEFORE));
cdc.add(new RecordWithAction(record, ChangeAction.UPDATE_AFTER));
cdc.add(new TestRecordWithAction(view.get(record), ChangeAction.UPDATE_BEFORE));
cdc.add(new TestRecordWithAction(record, ChangeAction.UPDATE_AFTER));
}
} else {
cdc.add(new RecordWithAction(record, ChangeAction.DELETE));
cdc.add(new TestRecordWithAction(record, ChangeAction.DELETE));
}
}
return doWrite(cdc);
}

public WriteResult onlyDelete(int count) throws IOException {
List<Record> scatter = randomRecord(count);
List<RecordWithAction> delete =
List<TestRecordWithAction> delete =
scatter.stream()
.map(s -> new RecordWithAction(s, ChangeAction.DELETE))
.map(s -> new TestRecordWithAction(s, ChangeAction.DELETE))
.collect(Collectors.toList());
return doWrite(delete);
}
Expand All @@ -188,10 +190,10 @@ public WriteResult custom(CustomData customData) throws IOException {
}

public WriteResult custom(List<PKWithAction> data) throws IOException {
List<RecordWithAction> records = new ArrayList<>();
List<TestRecordWithAction> records = new ArrayList<>();
for (PKWithAction pkWithAction : data) {
records.add(
new RecordWithAction(generator.randomRecord(pkWithAction.pk), pkWithAction.action));
new TestRecordWithAction(generator.randomRecord(pkWithAction.pk), pkWithAction.action));
}
return doWrite(records);
}
Expand Down Expand Up @@ -238,9 +240,9 @@ public MatchResult match(List<Record> records) {
return MatchResult.of(notInView, inViewButDuplicate, missInView);
}

private WriteResult doWrite(List<RecordWithAction> upsert) throws IOException {
private WriteResult doWrite(List<TestRecordWithAction> upsert) throws IOException {
writeView(upsert);
WriteResult writeResult = writeFile(upsert);
WriteResult writeResult = writeFile(new ArrayList<RecordWithAction>(upsert));
upsertCommit(writeResult);
return writeResult;
}
Expand Down Expand Up @@ -299,8 +301,8 @@ private void upsertCommit(WriteResult writeResult) {
}
}

private void writeView(List<RecordWithAction> records) {
for (RecordWithAction record : records) {
private void writeView(List<TestRecordWithAction> records) {
for (TestRecordWithAction record : records) {
changeLog.add(record);
ChangeAction action = record.getAction();
if (action == ChangeAction.DELETE || action == ChangeAction.UPDATE_BEFORE) {
Expand Down Expand Up @@ -339,4 +341,26 @@ protected final boolean alreadyExists(Record record) {
return view.containsKey(record);
}
}

public static class TestRecordWithAction extends RecordWithAction {
public TestRecordWithAction(Record record, ChangeAction action) {
super(record, action);
}

@Override
public <T> T get(int pos, Class<T> javaClass) {
Object value = get(pos);
if (value instanceof LocalDateTime && javaClass == Long.class) {
@SuppressWarnings("unchecked")
T result = (T) (Long) DateTimeUtil.microsFromTimestamp((LocalDateTime) value);
return result;
} else if (value instanceof OffsetDateTime && javaClass == Long.class) {
@SuppressWarnings("unchecked")
T result =
(T) (Long) DateTimeUtil.microsFromTimestamp(((OffsetDateTime) value).toLocalDateTime());
return result;
}
return super.get(pos, javaClass);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,6 @@
<value>org.apache.amoro.listener.AmoroRunListener</value>
</property>
</properties>
<argLine>-verbose:class</argLine>
</configuration>
</plugin>
<plugin>
Expand Down
35 changes: 35 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,27 @@
<rocksdb-dependency-scope>compile</rocksdb-dependency-scope>
<lucene-dependency-scope>compile</lucene-dependency-scope>
<aliyun-sdk-dependency-scope>provided</aliyun-sdk-dependency-scope>

<!-- for JDK-17 test-->
<extraJavaTestArgs>-XX:+IgnoreUnrecognizedVMOptions
--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED
--add-opens=java.base/java.io=ALL-UNNAMED
--add-opens=java.base/java.net=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED
--add-opens=java.base/sun.security.action=ALL-UNNAMED
--add-opens=java.base/sun.security.tools.keytool=ALL-UNNAMED
--add-opens=java.base/sun.security.x509=ALL-UNNAMED
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED
-Djdk.reflect.useDirectMethodHandle=false
-Dio.netty.tryReflectionSetAccessible=true</extraJavaTestArgs>
</properties>

<dependencies>
Expand Down Expand Up @@ -1076,6 +1097,9 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${maven-surefire-plugin.version}</version>
<configuration>
<argLine>${argLine} -ea -Xmx4g -Xss4m -XX:MaxMetaspaceSize=2g -XX:ReservedCodeCacheSize=128m ${extraJavaTestArgs} -verbose:class</argLine>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down Expand Up @@ -1449,9 +1473,20 @@
<jdk>[11,)</jdk>
</activation>
<properties>
<java.source.version>11</java.source.version>
<java.target.version>11</java.target.version>
</properties>
</profile>
<profile>
<id>java17</id>
<activation>
<jdk>17</jdk>
</activation>
<properties>
<java.source.version>17</java.source.version>
<java.target.version>17</java.target.version>
</properties>
</profile>
<profile>
<id>spark-3.3</id>
<properties>
Expand Down
Loading