From b1ebb3a84fc7414b2affa9711b0e8a250953d42a Mon Sep 17 00:00:00 2001 From: Yingjian Wu Date: Wed, 3 Jan 2024 12:22:00 -0800 Subject: [PATCH] use as of system time when listing tables to improve performance --- ...isConnectorTableServiceFunctionalTest.java | 82 +++++++++++++++++++ .../polaris/store/PolarisStoreConnector.java | 7 +- .../polaris/store/PolarisStoreService.java | 3 +- .../store/repos/PolarisTableRepository.java | 14 +++- .../PolarisConnectorTableServiceTest.java | 15 +++- 5 files changed, 114 insertions(+), 7 deletions(-) create mode 100644 metacat-connector-polaris/src/functionalTest/java/com/netflix/metacat/connector/polaris/PolarisConnectorTableServiceFunctionalTest.java diff --git a/metacat-connector-polaris/src/functionalTest/java/com/netflix/metacat/connector/polaris/PolarisConnectorTableServiceFunctionalTest.java b/metacat-connector-polaris/src/functionalTest/java/com/netflix/metacat/connector/polaris/PolarisConnectorTableServiceFunctionalTest.java new file mode 100644 index 000000000..a63d6e6b1 --- /dev/null +++ b/metacat-connector-polaris/src/functionalTest/java/com/netflix/metacat/connector/polaris/PolarisConnectorTableServiceFunctionalTest.java @@ -0,0 +1,82 @@ +package com.netflix.metacat.connector.polaris; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.netflix.metacat.common.QualifiedName; +import com.netflix.metacat.common.dto.Pageable; +import com.netflix.metacat.common.dto.Sort; +import com.netflix.metacat.common.dto.SortOrder; +import com.netflix.metacat.common.server.connectors.model.TableInfo; +import com.netflix.metacat.connector.polaris.configs.PolarisPersistenceConfig; +import org.junit.Assert; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.boot.test.autoconfigure.orm.jpa.AutoConfigureDataJpa; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * Test PolarisConnectorTableService in functional test. + * Some of the tests cannot be run in unit test as it uses h2 database, which does not support all + * functionalities in crdb so include those tests here. + */ +@ExtendWith(SpringExtension.class) +@SpringBootTest(classes = {PolarisPersistenceConfig.class}) +@ActiveProfiles(profiles = {"polaris_functional_test"}) +@AutoConfigureDataJpa +public class PolarisConnectorTableServiceFunctionalTest extends PolarisConnectorTableServiceTest { + /** + * Test table list. + */ + @Test + public void testList() { + final QualifiedName name1 = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table1"); + final TableInfo tableInfo1 = TableInfo.builder() + .name(name1) + .metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc1")) + .build(); + this.getPolarisTableService().create(this.getRequestContext(), tableInfo1); + final QualifiedName name2 = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table2"); + final TableInfo tableInfo2 = TableInfo.builder() + .name(name2) + .metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc2")) + .build(); + this.getPolarisTableService().create(this.getRequestContext(), tableInfo2); + + + final QualifiedName qualifiedName = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, ""); + + try { + // pause execution for 10000 milliseconds (10 seconds) + Thread.sleep(10000); + } catch (InterruptedException e) { + System.out.println("Sleep was interrupted"); + } + + List tables = this.getPolarisTableService().list( + this.getRequestContext(), DB_QUALIFIED_NAME, qualifiedName, new Sort(null, SortOrder.ASC), + new Pageable(2, 0)); + Assert.assertEquals(tables.size(), 2); + Assert.assertEquals(tables.stream().map(TableInfo::getName).collect(Collectors.toSet()), + ImmutableSet.of(name1, name2)); + + // Create a 3rd table, but this time does not sleep so this table should not be included + final QualifiedName name3 = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table3"); + final TableInfo tableInfo3 = TableInfo.builder() + .name(name3) + .metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc2")) + .build(); + this.getPolarisTableService().create(this.getRequestContext(), tableInfo3); + + tables = this.getPolarisTableService().list( + this.getRequestContext(), DB_QUALIFIED_NAME, qualifiedName, new Sort(null, SortOrder.ASC), + new Pageable(2, 0)); + Assert.assertEquals(tables.size(), 2); + Assert.assertEquals(tables.stream().map(TableInfo::getName).collect(Collectors.toSet()), + ImmutableSet.of(name1, name2)); + } +} diff --git a/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/PolarisStoreConnector.java b/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/PolarisStoreConnector.java index 5c3b203e9..c85ed2aee 100644 --- a/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/PolarisStoreConnector.java +++ b/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/PolarisStoreConnector.java @@ -13,6 +13,7 @@ import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; +import java.sql.Timestamp; import java.time.Instant; import java.util.ArrayList; import java.util.List; @@ -155,15 +156,17 @@ public Optional getTable(final String dbName, final String t */ @Override @Transactional(propagation = Propagation.SUPPORTS) - public List getTableEntities(final String databaseName, final String tableNamePrefix) { + public List getTableEntities(final String databaseName, + final String tableNamePrefix) { final int pageFetchSize = 1000; final List retval = new ArrayList<>(); final String tblPrefix = tableNamePrefix == null ? "" : tableNamePrefix; Pageable page = PageRequest.of(0, pageFetchSize, Sort.by("tblName").ascending()); Slice tbls; + final Timestamp ts = tblRepo.findFollowerReadTimestamp(); boolean hasNext; do { - tbls = tblRepo.findAllTablesByDbNameAndTablePrefix(databaseName, tblPrefix, page); + tbls = tblRepo.findAllTablesByDbNameAndTablePrefix(databaseName, tblPrefix, ts.toString(), page); retval.addAll(tbls.toList()); hasNext = tbls.hasNext(); if (hasNext) { diff --git a/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/PolarisStoreService.java b/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/PolarisStoreService.java index b1b32013b..ecec08d9a 100644 --- a/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/PolarisStoreService.java +++ b/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/PolarisStoreService.java @@ -77,7 +77,8 @@ public interface PolarisStoreService { * @param tableNamePrefix table name prefix. can be empty. * @return table entities in the database. */ - List getTableEntities(final String databaseName, final String tableNamePrefix); + List getTableEntities(final String databaseName, + final String tableNamePrefix); /** * Updates existing or creates new table entry. diff --git a/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/repos/PolarisTableRepository.java b/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/repos/PolarisTableRepository.java index a707caad8..fcd76327a 100644 --- a/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/repos/PolarisTableRepository.java +++ b/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/repos/PolarisTableRepository.java @@ -11,6 +11,7 @@ import org.springframework.stereotype.Repository; import org.springframework.transaction.annotation.Transactional; +import java.sql.Timestamp; import java.time.Instant; import java.util.Optional; @@ -71,13 +72,17 @@ boolean existsByDbNameAndTblName( * Fetch table entities in database. * @param dbName database name * @param tableNamePrefix table name prefix. can be empty. + * @param snapshotTime snapshotTime to use in query * @param page pageable. * @return table entities that belong to the database. */ - @Query("SELECT e FROM PolarisTableEntity e WHERE e.dbName = :dbName AND e.tblName LIKE :tableNamePrefix%") + + @Query(nativeQuery = true, value = "SELECT t.* FROM tbls t AS OF SYSTEM TIME :snapshotTime " + + "WHERE t.dbName = :dbName AND t.tblName LIKE :tableNamePrefix%") Slice findAllTablesByDbNameAndTablePrefix( @Param("dbName") final String dbName, @Param("tableNamePrefix") final String tableNamePrefix, + @Param("snapshotTime") final String snapshotTime, Pageable page); /** @@ -103,4 +108,11 @@ int updateMetadataLocation( @Param("newLocation") final String newLocation, @Param("lastModifiedBy") final String lastModifiedBy, @Param("lastModifiedDate") final Instant lastModifiedDate); + + /** + * Return the follower reader timestamp from crdb. + * @return follower reader timestamp + */ + @Query(value = "SELECT follower_read_timestamp()", nativeQuery = true) + Timestamp findFollowerReadTimestamp(); } diff --git a/metacat-connector-polaris/src/test/java/com/netflix/metacat/connector/polaris/PolarisConnectorTableServiceTest.java b/metacat-connector-polaris/src/test/java/com/netflix/metacat/connector/polaris/PolarisConnectorTableServiceTest.java index 398a1509d..0c8927352 100644 --- a/metacat-connector-polaris/src/test/java/com/netflix/metacat/connector/polaris/PolarisConnectorTableServiceTest.java +++ b/metacat-connector-polaris/src/test/java/com/netflix/metacat/connector/polaris/PolarisConnectorTableServiceTest.java @@ -28,6 +28,7 @@ import com.netflix.metacat.connector.polaris.mappers.PolarisTableMapper; import com.netflix.metacat.connector.polaris.store.PolarisStoreService; import com.netflix.spectator.api.NoopRegistry; +import lombok.Getter; import org.junit.Assert; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -58,10 +59,11 @@ @ActiveProfiles(profiles = {"polarisconnectortest"}) @DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD) @AutoConfigureDataJpa +@Getter public class PolarisConnectorTableServiceTest { - private static final String CATALOG_NAME = "catalog_name"; - private static final String DB_NAME = "db_name"; - private static final QualifiedName DB_QUALIFIED_NAME = QualifiedName.ofDatabase(CATALOG_NAME, DB_NAME); + public static final String CATALOG_NAME = "catalog_name"; + public static final String DB_NAME = "db_name"; + public static final QualifiedName DB_QUALIFIED_NAME = QualifiedName.ofDatabase(CATALOG_NAME, DB_NAME); @Autowired private PolarisStoreService polarisStoreService; @@ -100,6 +102,13 @@ public void init() { new IcebergTableOpsProxy()), new PolarisTableMapper(CATALOG_NAME), connectorContext); + + // For crdb, we need to insert the db record otherwise it will throw + // org.postgresql.util.PSQLException: ERROR: + // insert on table "tbls" violates foreign key constraint "fk_db_name_ref_dbs" + // for h2db, this is not a hard requirement + final String location = "file://temp"; + getPolarisStoreService().createDatabase(DB_NAME, location, "metacat_user"); } /**