Skip to content

Commit

Permalink
use as of system time when listing tables to improve performance
Browse files Browse the repository at this point in the history
  • Loading branch information
Yingjian Wu committed Jan 4, 2024
1 parent 2c0f438 commit 4fdfd86
Show file tree
Hide file tree
Showing 9 changed files with 213 additions and 63 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package com.netflix.metacat.connector.polaris;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.netflix.metacat.common.QualifiedName;
import com.netflix.metacat.common.dto.Pageable;
import com.netflix.metacat.common.dto.Sort;
import com.netflix.metacat.common.dto.SortOrder;
import com.netflix.metacat.common.server.connectors.model.TableInfo;
import com.netflix.metacat.connector.polaris.configs.PolarisPersistenceConfig;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.boot.test.autoconfigure.orm.jpa.AutoConfigureDataJpa;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit.jupiter.SpringExtension;

import java.util.List;
import java.util.stream.Collectors;

/**
* Test PolarisConnectorTableService in functional test.
* Some of the tests cannot be run in unit test as it uses h2 database, which does not support all
* functionalities in crdb so include those tests here.
*/
@ExtendWith(SpringExtension.class)
@SpringBootTest(classes = {PolarisPersistenceConfig.class})
@ActiveProfiles(profiles = {"polaris_functional_test"})
@AutoConfigureDataJpa
public class PolarisConnectorTableServiceFunctionalTest extends PolarisConnectorTableServiceTest {
/**
* Test table list.
*/
@Test
public void testList() {
final QualifiedName name1 = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table1");
final TableInfo tableInfo1 = TableInfo.builder()
.name(name1)
.metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc1"))
.build();
this.getPolarisTableService().create(this.getRequestContext(), tableInfo1);
final QualifiedName name2 = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table2");
final TableInfo tableInfo2 = TableInfo.builder()
.name(name2)
.metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc2"))
.build();
this.getPolarisTableService().create(this.getRequestContext(), tableInfo2);


final QualifiedName qualifiedName = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "");

try {
// pause execution for 10000 milliseconds (10 seconds)
Thread.sleep(10000);
} catch (InterruptedException e) {
System.out.println("Sleep was interrupted");
}

List<TableInfo> tables = this.getPolarisTableService().list(
this.getRequestContext(), DB_QUALIFIED_NAME, qualifiedName, new Sort(null, SortOrder.ASC),
new Pageable(2, 0));
Assert.assertEquals(tables.size(), 2);
Assert.assertEquals(tables.stream().map(TableInfo::getName).collect(Collectors.toSet()),
ImmutableSet.of(name1, name2));

// Create a 3rd table, but this time does not sleep so this table should not be included
final QualifiedName name3 = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table3");
final TableInfo tableInfo3 = TableInfo.builder()
.name(name3)
.metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc2"))
.build();
this.getPolarisTableService().create(this.getRequestContext(), tableInfo3);

tables = this.getPolarisTableService().list(
this.getRequestContext(), DB_QUALIFIED_NAME, qualifiedName, new Sort(null, SortOrder.ASC),
new Pageable(2, 0));
Assert.assertEquals(tables.size(), 2);
Assert.assertEquals(tables.stream().map(TableInfo::getName).collect(Collectors.toSet()),
ImmutableSet.of(name1, name2));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,4 @@
@ActiveProfiles(profiles = {"polaris_functional_test"})
@AutoConfigureDataJpa
public class PolarisStoreConnectorFunctionalTest extends PolarisStoreConnectorTest {

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ spring:
init:
schema-locations: classpath:schema.sql
mode: always
platform: h2db
platform: postgresql

logging:
level:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public DataSourceProperties dataSourceProperties() {
*/
@Bean
public PolarisStoreService polarisStoreService(
final PolarisDatabaseRepository repo, final PolarisTableRepository tblRepo) {
final PolarisDatabaseRepository repo, final PolarisTableRepository tblRepo) {
return new PolarisStoreConnector(repo, tblRepo);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,22 +155,9 @@ public Optional<PolarisTableEntity> getTable(final String dbName, final String t
*/
@Override
@Transactional(propagation = Propagation.SUPPORTS)
public List<PolarisTableEntity> getTableEntities(final String databaseName, final String tableNamePrefix) {
final int pageFetchSize = 1000;
final List<PolarisTableEntity> retval = new ArrayList<>();
final String tblPrefix = tableNamePrefix == null ? "" : tableNamePrefix;
Pageable page = PageRequest.of(0, pageFetchSize, Sort.by("tblName").ascending());
Slice<PolarisTableEntity> tbls;
boolean hasNext;
do {
tbls = tblRepo.findAllTablesByDbNameAndTablePrefix(databaseName, tblPrefix, page);
retval.addAll(tbls.toList());
hasNext = tbls.hasNext();
if (hasNext) {
page = tbls.nextPageable();
}
} while (hasNext);
return retval;
public List<PolarisTableEntity> getTableEntities(final String databaseName,
final String tableNamePrefix) {
return tblRepo.findAllTablesByDbNameAndTablePrefix(databaseName, tableNamePrefix);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.netflix.metacat.connector.polaris.store.repos;

import com.netflix.metacat.connector.polaris.store.entities.PolarisTableEntity;
import java.util.List;

/**
* Custom JPA repository implementation for storing PolarisTableEntity.
*/
public interface PolarisTableCustomRepository {
/**
* Fetch table entities for given database using AS OF SYSTEM TIME follower_read_timestamp().
* @param dbName database name
* @param tableNamePrefix table name prefix. can be empty.
* @return table entities in the database.
*/
List<PolarisTableEntity> findAllTablesByDbNameAndTablePrefix(
String dbName, String tableNamePrefix);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package com.netflix.metacat.connector.polaris.store.repos;

import javax.persistence.EntityManagerFactory;
import javax.persistence.EntityManager;
import javax.persistence.Query;
import javax.persistence.EntityTransaction;

import com.netflix.metacat.connector.polaris.store.entities.PolarisTableEntity;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Slice;
import org.springframework.data.domain.SliceImpl;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Repository;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;

import java.util.ArrayList;
import java.util.List;

/**
* Implementation for Custom JPA repository implementation for storing PolarisTableEntity.
*/
@Repository
public class PolarisTableCustomRepositoryImpl implements PolarisTableCustomRepository {
@Autowired
private EntityManagerFactory entityManagerFactory;

private Slice<PolarisTableEntity> findAllTablesByDbNameAndTablePrefixForCurrentPage(
final String dbName, final String tableNamePrefix, final Pageable page, final EntityManager entityManager) {

final String sql = "SELECT t.* FROM TBLS t "
+ "WHERE t.db_name = :dbName AND t.tbl_name LIKE :tableNamePrefix";

final Query query = entityManager.createNativeQuery(sql, PolarisTableEntity.class);
query.setParameter("dbName", dbName);
query.setParameter("tableNamePrefix", tableNamePrefix + "%");

query.setFirstResult(page.getPageNumber() * page.getPageSize());
query.setMaxResults(page.getPageSize());

final List<PolarisTableEntity> resultList = query.getResultList();

// Check if there is a next page
final boolean hasNext = resultList.size() > page.getPageSize();

// If there is a next page, remove the last item from the list
if (hasNext) {
resultList.remove(resultList.size() - 1);
}

return new SliceImpl<>(resultList, page, hasNext);
}


@SuppressWarnings("checkstyle:FinalParameters")
@Override
public List<PolarisTableEntity> findAllTablesByDbNameAndTablePrefix(
final String dbName, final String tableNamePrefix) {
final int pageFetchSize = 1000;
Pageable page = PageRequest.of(0, pageFetchSize, Sort.by("tbl_name").ascending());

final EntityManager entityManager = entityManagerFactory.createEntityManager();
final EntityTransaction transaction = entityManager.getTransaction();

try {
transaction.begin();

// Execute your SQL statement
entityManager.createNativeQuery("SET TRANSACTION AS OF SYSTEM TIME follower_read_timestamp()")
.executeUpdate();

final List<PolarisTableEntity> retval = new ArrayList<>();
final String tblPrefix = tableNamePrefix == null ? "" : tableNamePrefix;
Slice<PolarisTableEntity> tbls;
boolean hasNext;

do {
tbls = findAllTablesByDbNameAndTablePrefixForCurrentPage(dbName, tblPrefix, page, entityManager);
retval.addAll(tbls.toList());
hasNext = tbls.hasNext();
if (hasNext) {
page = tbls.nextPageable();
}
} while (hasNext);

transaction.commit();

return retval;
} catch (Exception e) {
// If there's an exception, roll back the transaction
if (transaction.isActive()) {
transaction.rollback();
}
// Handle or rethrow the exception
throw e;
} finally {
entityManager.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
*/
@Repository
public interface PolarisTableRepository extends JpaRepository<PolarisTableEntity, String>,
JpaSpecificationExecutor {
JpaSpecificationExecutor, PolarisTableCustomRepository {

/**
* Delete table entry by name.
Expand Down Expand Up @@ -67,19 +67,6 @@ boolean existsByDbNameAndTblName(
@Param("dbName") final String dbName,
@Param("tblName") final String tblName);

/**
* Fetch table entities in database.
* @param dbName database name
* @param tableNamePrefix table name prefix. can be empty.
* @param page pageable.
* @return table entities that belong to the database.
*/
@Query("SELECT e FROM PolarisTableEntity e WHERE e.dbName = :dbName AND e.tblName LIKE :tableNamePrefix%")
Slice<PolarisTableEntity> findAllTablesByDbNameAndTablePrefix(
@Param("dbName") final String dbName,
@Param("tableNamePrefix") final String tableNamePrefix,
Pageable page);

/**
* Do an atomic compare-and-swap on the metadata location of the table.
* @param dbName database name of the table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.netflix.metacat.common.QualifiedName;
import com.netflix.metacat.common.dto.Pageable;
Expand All @@ -28,6 +27,7 @@
import com.netflix.metacat.connector.polaris.mappers.PolarisTableMapper;
import com.netflix.metacat.connector.polaris.store.PolarisStoreService;
import com.netflix.spectator.api.NoopRegistry;
import lombok.Getter;
import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -47,7 +47,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;


/**
Expand All @@ -58,10 +57,11 @@
@ActiveProfiles(profiles = {"polarisconnectortest"})
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
@AutoConfigureDataJpa
@Getter
public class PolarisConnectorTableServiceTest {
private static final String CATALOG_NAME = "catalog_name";
private static final String DB_NAME = "db_name";
private static final QualifiedName DB_QUALIFIED_NAME = QualifiedName.ofDatabase(CATALOG_NAME, DB_NAME);
public static final String CATALOG_NAME = "catalog_name";
public static final String DB_NAME = "db_name";
public static final QualifiedName DB_QUALIFIED_NAME = QualifiedName.ofDatabase(CATALOG_NAME, DB_NAME);

@Autowired
private PolarisStoreService polarisStoreService;
Expand All @@ -86,6 +86,8 @@ public class PolarisConnectorTableServiceTest {
*/
@BeforeEach
public void init() {
final String location = "file://temp";
polarisStoreService.createDatabase(DB_NAME, location, "metacat_user");
connectorContext = new ConnectorContext(CATALOG_NAME, CATALOG_NAME, "polaris",
new DefaultConfigImpl(new MetacatProperties()), new NoopRegistry(), null, Maps.newHashMap());
polarisDBService = new PolarisConnectorDatabaseService(polarisStoreService, connectorContext);
Expand Down Expand Up @@ -130,31 +132,6 @@ public void testTableExists() {
Assert.assertTrue(exists);
}

/**
* Test table list.
*/
@Test
public void testList() {
final QualifiedName name1 = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table1");
final TableInfo tableInfo1 = TableInfo.builder()
.name(name1)
.metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc1"))
.build();
polarisTableService.create(requestContext, tableInfo1);
final QualifiedName name2 = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table2");
final TableInfo tableInfo2 = TableInfo.builder()
.name(name2)
.metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc2"))
.build();
polarisTableService.create(requestContext, tableInfo2);
final QualifiedName qualifiedName = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "");
final List<TableInfo> tables = polarisTableService.list(
requestContext, DB_QUALIFIED_NAME, qualifiedName, new Sort(null, SortOrder.ASC), new Pageable(2, 0));
Assert.assertEquals(tables.size(), 2);
Assert.assertEquals(tables.stream().map(TableInfo::getName).collect(Collectors.toSet()),
ImmutableSet.of(name1, name2));
}

/**
* Test table creation then list tables.
*/
Expand Down

0 comments on commit 4fdfd86

Please sign in to comment.