From 4fdfd86c6ee5a34ca83c92e54fa0f3ccce42122b Mon Sep 17 00:00:00 2001 From: Yingjian Wu Date: Wed, 3 Jan 2024 12:22:00 -0800 Subject: [PATCH] use as of system time when listing tables to improve performance --- ...isConnectorTableServiceFunctionalTest.java | 82 ++++++++++++++ .../PolarisStoreConnectorFunctionalTest.java | 1 - .../application-polaris_functional_test.yml | 2 +- .../configs/PolarisPersistenceConfig.java | 2 +- .../polaris/store/PolarisStoreConnector.java | 19 +--- .../repos/PolarisTableCustomRepository.java | 18 ++++ .../PolarisTableCustomRepositoryImpl.java | 100 ++++++++++++++++++ .../store/repos/PolarisTableRepository.java | 15 +-- .../PolarisConnectorTableServiceTest.java | 37 ++----- 9 files changed, 213 insertions(+), 63 deletions(-) create mode 100644 metacat-connector-polaris/src/functionalTest/java/com/netflix/metacat/connector/polaris/PolarisConnectorTableServiceFunctionalTest.java create mode 100644 metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/repos/PolarisTableCustomRepository.java create mode 100644 metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/repos/PolarisTableCustomRepositoryImpl.java diff --git a/metacat-connector-polaris/src/functionalTest/java/com/netflix/metacat/connector/polaris/PolarisConnectorTableServiceFunctionalTest.java b/metacat-connector-polaris/src/functionalTest/java/com/netflix/metacat/connector/polaris/PolarisConnectorTableServiceFunctionalTest.java new file mode 100644 index 000000000..a63d6e6b1 --- /dev/null +++ b/metacat-connector-polaris/src/functionalTest/java/com/netflix/metacat/connector/polaris/PolarisConnectorTableServiceFunctionalTest.java @@ -0,0 +1,82 @@ +package com.netflix.metacat.connector.polaris; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.netflix.metacat.common.QualifiedName; +import com.netflix.metacat.common.dto.Pageable; +import com.netflix.metacat.common.dto.Sort; +import com.netflix.metacat.common.dto.SortOrder; +import com.netflix.metacat.common.server.connectors.model.TableInfo; +import com.netflix.metacat.connector.polaris.configs.PolarisPersistenceConfig; +import org.junit.Assert; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.boot.test.autoconfigure.orm.jpa.AutoConfigureDataJpa; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * Test PolarisConnectorTableService in functional test. + * Some of the tests cannot be run in unit test as it uses h2 database, which does not support all + * functionalities in crdb so include those tests here. + */ +@ExtendWith(SpringExtension.class) +@SpringBootTest(classes = {PolarisPersistenceConfig.class}) +@ActiveProfiles(profiles = {"polaris_functional_test"}) +@AutoConfigureDataJpa +public class PolarisConnectorTableServiceFunctionalTest extends PolarisConnectorTableServiceTest { + /** + * Test table list. + */ + @Test + public void testList() { + final QualifiedName name1 = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table1"); + final TableInfo tableInfo1 = TableInfo.builder() + .name(name1) + .metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc1")) + .build(); + this.getPolarisTableService().create(this.getRequestContext(), tableInfo1); + final QualifiedName name2 = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table2"); + final TableInfo tableInfo2 = TableInfo.builder() + .name(name2) + .metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc2")) + .build(); + this.getPolarisTableService().create(this.getRequestContext(), tableInfo2); + + + final QualifiedName qualifiedName = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, ""); + + try { + // pause execution for 10000 milliseconds (10 seconds) + Thread.sleep(10000); + } catch (InterruptedException e) { + System.out.println("Sleep was interrupted"); + } + + List tables = this.getPolarisTableService().list( + this.getRequestContext(), DB_QUALIFIED_NAME, qualifiedName, new Sort(null, SortOrder.ASC), + new Pageable(2, 0)); + Assert.assertEquals(tables.size(), 2); + Assert.assertEquals(tables.stream().map(TableInfo::getName).collect(Collectors.toSet()), + ImmutableSet.of(name1, name2)); + + // Create a 3rd table, but this time does not sleep so this table should not be included + final QualifiedName name3 = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table3"); + final TableInfo tableInfo3 = TableInfo.builder() + .name(name3) + .metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc2")) + .build(); + this.getPolarisTableService().create(this.getRequestContext(), tableInfo3); + + tables = this.getPolarisTableService().list( + this.getRequestContext(), DB_QUALIFIED_NAME, qualifiedName, new Sort(null, SortOrder.ASC), + new Pageable(2, 0)); + Assert.assertEquals(tables.size(), 2); + Assert.assertEquals(tables.stream().map(TableInfo::getName).collect(Collectors.toSet()), + ImmutableSet.of(name1, name2)); + } +} diff --git a/metacat-connector-polaris/src/functionalTest/java/com/netflix/metacat/connector/polaris/PolarisStoreConnectorFunctionalTest.java b/metacat-connector-polaris/src/functionalTest/java/com/netflix/metacat/connector/polaris/PolarisStoreConnectorFunctionalTest.java index 7b907d993..ce53d93b7 100644 --- a/metacat-connector-polaris/src/functionalTest/java/com/netflix/metacat/connector/polaris/PolarisStoreConnectorFunctionalTest.java +++ b/metacat-connector-polaris/src/functionalTest/java/com/netflix/metacat/connector/polaris/PolarisStoreConnectorFunctionalTest.java @@ -17,5 +17,4 @@ @ActiveProfiles(profiles = {"polaris_functional_test"}) @AutoConfigureDataJpa public class PolarisStoreConnectorFunctionalTest extends PolarisStoreConnectorTest { - } diff --git a/metacat-connector-polaris/src/functionalTest/resources/application-polaris_functional_test.yml b/metacat-connector-polaris/src/functionalTest/resources/application-polaris_functional_test.yml index 0201ef0ef..5c6a59feb 100644 --- a/metacat-connector-polaris/src/functionalTest/resources/application-polaris_functional_test.yml +++ b/metacat-connector-polaris/src/functionalTest/resources/application-polaris_functional_test.yml @@ -23,7 +23,7 @@ spring: init: schema-locations: classpath:schema.sql mode: always - platform: h2db + platform: postgresql logging: level: diff --git a/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/configs/PolarisPersistenceConfig.java b/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/configs/PolarisPersistenceConfig.java index 31d7f96f7..7acf793cf 100644 --- a/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/configs/PolarisPersistenceConfig.java +++ b/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/configs/PolarisPersistenceConfig.java @@ -71,7 +71,7 @@ public DataSourceProperties dataSourceProperties() { */ @Bean public PolarisStoreService polarisStoreService( - final PolarisDatabaseRepository repo, final PolarisTableRepository tblRepo) { + final PolarisDatabaseRepository repo, final PolarisTableRepository tblRepo) { return new PolarisStoreConnector(repo, tblRepo); } } diff --git a/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/PolarisStoreConnector.java b/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/PolarisStoreConnector.java index 5c3b203e9..1c276b553 100644 --- a/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/PolarisStoreConnector.java +++ b/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/PolarisStoreConnector.java @@ -155,22 +155,9 @@ public Optional getTable(final String dbName, final String t */ @Override @Transactional(propagation = Propagation.SUPPORTS) - public List getTableEntities(final String databaseName, final String tableNamePrefix) { - final int pageFetchSize = 1000; - final List retval = new ArrayList<>(); - final String tblPrefix = tableNamePrefix == null ? "" : tableNamePrefix; - Pageable page = PageRequest.of(0, pageFetchSize, Sort.by("tblName").ascending()); - Slice tbls; - boolean hasNext; - do { - tbls = tblRepo.findAllTablesByDbNameAndTablePrefix(databaseName, tblPrefix, page); - retval.addAll(tbls.toList()); - hasNext = tbls.hasNext(); - if (hasNext) { - page = tbls.nextPageable(); - } - } while (hasNext); - return retval; + public List getTableEntities(final String databaseName, + final String tableNamePrefix) { + return tblRepo.findAllTablesByDbNameAndTablePrefix(databaseName, tableNamePrefix); } /** diff --git a/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/repos/PolarisTableCustomRepository.java b/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/repos/PolarisTableCustomRepository.java new file mode 100644 index 000000000..aca16f8da --- /dev/null +++ b/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/repos/PolarisTableCustomRepository.java @@ -0,0 +1,18 @@ +package com.netflix.metacat.connector.polaris.store.repos; + +import com.netflix.metacat.connector.polaris.store.entities.PolarisTableEntity; +import java.util.List; + +/** + * Custom JPA repository implementation for storing PolarisTableEntity. + */ +public interface PolarisTableCustomRepository { + /** + * Fetch table entities for given database using AS OF SYSTEM TIME follower_read_timestamp(). + * @param dbName database name + * @param tableNamePrefix table name prefix. can be empty. + * @return table entities in the database. + */ + List findAllTablesByDbNameAndTablePrefix( + String dbName, String tableNamePrefix); +} diff --git a/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/repos/PolarisTableCustomRepositoryImpl.java b/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/repos/PolarisTableCustomRepositoryImpl.java new file mode 100644 index 000000000..cdb339ac9 --- /dev/null +++ b/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/repos/PolarisTableCustomRepositoryImpl.java @@ -0,0 +1,100 @@ +package com.netflix.metacat.connector.polaris.store.repos; + +import javax.persistence.EntityManagerFactory; +import javax.persistence.EntityManager; +import javax.persistence.Query; +import javax.persistence.EntityTransaction; + +import com.netflix.metacat.connector.polaris.store.entities.PolarisTableEntity; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.domain.Slice; +import org.springframework.data.domain.SliceImpl; +import org.springframework.data.domain.Pageable; +import org.springframework.stereotype.Repository; +import org.springframework.data.domain.PageRequest; +import org.springframework.data.domain.Sort; + +import java.util.ArrayList; +import java.util.List; + +/** + * Implementation for Custom JPA repository implementation for storing PolarisTableEntity. + */ +@Repository +public class PolarisTableCustomRepositoryImpl implements PolarisTableCustomRepository { + @Autowired + private EntityManagerFactory entityManagerFactory; + + private Slice findAllTablesByDbNameAndTablePrefixForCurrentPage( + final String dbName, final String tableNamePrefix, final Pageable page, final EntityManager entityManager) { + + final String sql = "SELECT t.* FROM TBLS t " + + "WHERE t.db_name = :dbName AND t.tbl_name LIKE :tableNamePrefix"; + + final Query query = entityManager.createNativeQuery(sql, PolarisTableEntity.class); + query.setParameter("dbName", dbName); + query.setParameter("tableNamePrefix", tableNamePrefix + "%"); + + query.setFirstResult(page.getPageNumber() * page.getPageSize()); + query.setMaxResults(page.getPageSize()); + + final List resultList = query.getResultList(); + + // Check if there is a next page + final boolean hasNext = resultList.size() > page.getPageSize(); + + // If there is a next page, remove the last item from the list + if (hasNext) { + resultList.remove(resultList.size() - 1); + } + + return new SliceImpl<>(resultList, page, hasNext); + } + + + @SuppressWarnings("checkstyle:FinalParameters") + @Override + public List findAllTablesByDbNameAndTablePrefix( + final String dbName, final String tableNamePrefix) { + final int pageFetchSize = 1000; + Pageable page = PageRequest.of(0, pageFetchSize, Sort.by("tbl_name").ascending()); + + final EntityManager entityManager = entityManagerFactory.createEntityManager(); + final EntityTransaction transaction = entityManager.getTransaction(); + + try { + transaction.begin(); + + // Execute your SQL statement + entityManager.createNativeQuery("SET TRANSACTION AS OF SYSTEM TIME follower_read_timestamp()") + .executeUpdate(); + + final List retval = new ArrayList<>(); + final String tblPrefix = tableNamePrefix == null ? "" : tableNamePrefix; + Slice tbls; + boolean hasNext; + + do { + tbls = findAllTablesByDbNameAndTablePrefixForCurrentPage(dbName, tblPrefix, page, entityManager); + retval.addAll(tbls.toList()); + hasNext = tbls.hasNext(); + if (hasNext) { + page = tbls.nextPageable(); + } + } while (hasNext); + + transaction.commit(); + + return retval; + } catch (Exception e) { + // If there's an exception, roll back the transaction + if (transaction.isActive()) { + transaction.rollback(); + } + // Handle or rethrow the exception + throw e; + } finally { + entityManager.close(); + } + } +} diff --git a/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/repos/PolarisTableRepository.java b/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/repos/PolarisTableRepository.java index a707caad8..48c9a202d 100644 --- a/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/repos/PolarisTableRepository.java +++ b/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/repos/PolarisTableRepository.java @@ -19,7 +19,7 @@ */ @Repository public interface PolarisTableRepository extends JpaRepository, - JpaSpecificationExecutor { + JpaSpecificationExecutor, PolarisTableCustomRepository { /** * Delete table entry by name. @@ -67,19 +67,6 @@ boolean existsByDbNameAndTblName( @Param("dbName") final String dbName, @Param("tblName") final String tblName); - /** - * Fetch table entities in database. - * @param dbName database name - * @param tableNamePrefix table name prefix. can be empty. - * @param page pageable. - * @return table entities that belong to the database. - */ - @Query("SELECT e FROM PolarisTableEntity e WHERE e.dbName = :dbName AND e.tblName LIKE :tableNamePrefix%") - Slice findAllTablesByDbNameAndTablePrefix( - @Param("dbName") final String dbName, - @Param("tableNamePrefix") final String tableNamePrefix, - Pageable page); - /** * Do an atomic compare-and-swap on the metadata location of the table. * @param dbName database name of the table diff --git a/metacat-connector-polaris/src/test/java/com/netflix/metacat/connector/polaris/PolarisConnectorTableServiceTest.java b/metacat-connector-polaris/src/test/java/com/netflix/metacat/connector/polaris/PolarisConnectorTableServiceTest.java index 398a1509d..1a470dcba 100644 --- a/metacat-connector-polaris/src/test/java/com/netflix/metacat/connector/polaris/PolarisConnectorTableServiceTest.java +++ b/metacat-connector-polaris/src/test/java/com/netflix/metacat/connector/polaris/PolarisConnectorTableServiceTest.java @@ -3,7 +3,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import com.netflix.metacat.common.QualifiedName; import com.netflix.metacat.common.dto.Pageable; @@ -28,6 +27,7 @@ import com.netflix.metacat.connector.polaris.mappers.PolarisTableMapper; import com.netflix.metacat.connector.polaris.store.PolarisStoreService; import com.netflix.spectator.api.NoopRegistry; +import lombok.Getter; import org.junit.Assert; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -47,7 +47,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; /** @@ -58,10 +57,11 @@ @ActiveProfiles(profiles = {"polarisconnectortest"}) @DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD) @AutoConfigureDataJpa +@Getter public class PolarisConnectorTableServiceTest { - private static final String CATALOG_NAME = "catalog_name"; - private static final String DB_NAME = "db_name"; - private static final QualifiedName DB_QUALIFIED_NAME = QualifiedName.ofDatabase(CATALOG_NAME, DB_NAME); + public static final String CATALOG_NAME = "catalog_name"; + public static final String DB_NAME = "db_name"; + public static final QualifiedName DB_QUALIFIED_NAME = QualifiedName.ofDatabase(CATALOG_NAME, DB_NAME); @Autowired private PolarisStoreService polarisStoreService; @@ -86,6 +86,8 @@ public class PolarisConnectorTableServiceTest { */ @BeforeEach public void init() { + final String location = "file://temp"; + polarisStoreService.createDatabase(DB_NAME, location, "metacat_user"); connectorContext = new ConnectorContext(CATALOG_NAME, CATALOG_NAME, "polaris", new DefaultConfigImpl(new MetacatProperties()), new NoopRegistry(), null, Maps.newHashMap()); polarisDBService = new PolarisConnectorDatabaseService(polarisStoreService, connectorContext); @@ -130,31 +132,6 @@ public void testTableExists() { Assert.assertTrue(exists); } - /** - * Test table list. - */ - @Test - public void testList() { - final QualifiedName name1 = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table1"); - final TableInfo tableInfo1 = TableInfo.builder() - .name(name1) - .metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc1")) - .build(); - polarisTableService.create(requestContext, tableInfo1); - final QualifiedName name2 = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table2"); - final TableInfo tableInfo2 = TableInfo.builder() - .name(name2) - .metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc2")) - .build(); - polarisTableService.create(requestContext, tableInfo2); - final QualifiedName qualifiedName = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, ""); - final List tables = polarisTableService.list( - requestContext, DB_QUALIFIED_NAME, qualifiedName, new Sort(null, SortOrder.ASC), new Pageable(2, 0)); - Assert.assertEquals(tables.size(), 2); - Assert.assertEquals(tables.stream().map(TableInfo::getName).collect(Collectors.toSet()), - ImmutableSet.of(name1, name2)); - } - /** * Test table creation then list tables. */