Skip to content

Commit

Permalink
use as of system time when listing tables to improve performance
Browse files Browse the repository at this point in the history
  • Loading branch information
Yingjian Wu committed Jan 4, 2024
1 parent 2c0f438 commit 1958117
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package com.netflix.metacat.connector.polaris;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.netflix.metacat.common.QualifiedName;
import com.netflix.metacat.common.dto.Pageable;
import com.netflix.metacat.common.dto.Sort;
import com.netflix.metacat.common.dto.SortOrder;
import com.netflix.metacat.common.server.connectors.model.TableInfo;
import com.netflix.metacat.connector.polaris.configs.PolarisPersistenceConfig;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.boot.test.autoconfigure.orm.jpa.AutoConfigureDataJpa;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit.jupiter.SpringExtension;

import java.util.List;
import java.util.stream.Collectors;

/**
* Test PolarisConnectorTableService in functional test.
* Some of the tests cannot be run in unit test as it uses h2 database, which does not support all
* functionalities in crdb so include those tests here.
*/
@ExtendWith(SpringExtension.class)
@SpringBootTest(classes = {PolarisPersistenceConfig.class})
@ActiveProfiles(profiles = {"polaris_functional_test"})
@AutoConfigureDataJpa
public class PolarisConnectorTableServiceFunctionalTest extends PolarisConnectorTableServiceTest {
/**
* Test table list.
*/
@Test
public void testList() {
final QualifiedName name1 = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table1");
final TableInfo tableInfo1 = TableInfo.builder()
.name(name1)
.metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc1"))
.build();
this.getPolarisTableService().create(this.getRequestContext(), tableInfo1);
final QualifiedName name2 = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table2");
final TableInfo tableInfo2 = TableInfo.builder()
.name(name2)
.metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc2"))
.build();
this.getPolarisTableService().create(this.getRequestContext(), tableInfo2);


final QualifiedName qualifiedName = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "");

try {
// pause execution for 10000 milliseconds (10 seconds)
Thread.sleep(10000);
} catch (InterruptedException e) {
System.out.println("Sleep was interrupted");
}

List<TableInfo> tables = this.getPolarisTableService().list(
this.getRequestContext(), DB_QUALIFIED_NAME, qualifiedName, new Sort(null, SortOrder.ASC),
new Pageable(2, 0));
Assert.assertEquals(tables.size(), 2);
Assert.assertEquals(tables.stream().map(TableInfo::getName).collect(Collectors.toSet()),
ImmutableSet.of(name1, name2));

// Create a 3rd table, but this time does not sleep so this table should not be included
final QualifiedName name3 = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table3");
final TableInfo tableInfo3 = TableInfo.builder()
.name(name3)
.metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc2"))
.build();
this.getPolarisTableService().create(this.getRequestContext(), tableInfo3);

tables = this.getPolarisTableService().list(
this.getRequestContext(), DB_QUALIFIED_NAME, qualifiedName, new Sort(null, SortOrder.ASC),
new Pageable(2, 0));
Assert.assertEquals(tables.size(), 2);
Assert.assertEquals(tables.stream().map(TableInfo::getName).collect(Collectors.toSet()),
ImmutableSet.of(name1, name2));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -155,15 +156,17 @@ public Optional<PolarisTableEntity> getTable(final String dbName, final String t
*/
@Override
@Transactional(propagation = Propagation.SUPPORTS)
public List<PolarisTableEntity> getTableEntities(final String databaseName, final String tableNamePrefix) {
public List<PolarisTableEntity> getTableEntities(final String databaseName,
final String tableNamePrefix) {
final int pageFetchSize = 1000;
final List<PolarisTableEntity> retval = new ArrayList<>();
final String tblPrefix = tableNamePrefix == null ? "" : tableNamePrefix;
Pageable page = PageRequest.of(0, pageFetchSize, Sort.by("tblName").ascending());
Slice<PolarisTableEntity> tbls;
final Timestamp ts = tblRepo.findFollowerReadTimestamp();
boolean hasNext;
do {
tbls = tblRepo.findAllTablesByDbNameAndTablePrefix(databaseName, tblPrefix, page);
tbls = tblRepo.findAllTablesByDbNameAndTablePrefix(databaseName, tblPrefix, ts.toString(), page);
retval.addAll(tbls.toList());
hasNext = tbls.hasNext();
if (hasNext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public interface PolarisStoreService {
* @param tableNamePrefix table name prefix. can be empty.
* @return table entities in the database.
*/
List<PolarisTableEntity> getTableEntities(final String databaseName, final String tableNamePrefix);
List<PolarisTableEntity> getTableEntities(final String databaseName,
final String tableNamePrefix);

/**
* Updates existing or creates new table entry.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;

import java.sql.Timestamp;
import java.time.Instant;
import java.util.Optional;

Expand Down Expand Up @@ -71,13 +72,17 @@ boolean existsByDbNameAndTblName(
* Fetch table entities in database.
* @param dbName database name
* @param tableNamePrefix table name prefix. can be empty.
* @param snapshotTime snapshotTime to use in query
* @param page pageable.
* @return table entities that belong to the database.
*/
@Query("SELECT e FROM PolarisTableEntity e WHERE e.dbName = :dbName AND e.tblName LIKE :tableNamePrefix%")

@Query(nativeQuery = true, value = "SELECT t.* FROM tbls t AS OF SYSTEM TIME :snapshotTime "
+ "WHERE t.dbName = :dbName AND t.tblName LIKE :tableNamePrefix%")
Slice<PolarisTableEntity> findAllTablesByDbNameAndTablePrefix(
@Param("dbName") final String dbName,
@Param("tableNamePrefix") final String tableNamePrefix,
@Param("snapshotTime") final String snapshotTime,
Pageable page);

/**
Expand All @@ -103,4 +108,11 @@ int updateMetadataLocation(
@Param("newLocation") final String newLocation,
@Param("lastModifiedBy") final String lastModifiedBy,
@Param("lastModifiedDate") final Instant lastModifiedDate);

/**
* Return the follower reader timestamp from crdb.
* @return follower reader timestamp
*/
@Query(value = "SELECT follower_read_timestamp()", nativeQuery = true)
Timestamp findFollowerReadTimestamp();
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.netflix.metacat.common.QualifiedName;
import com.netflix.metacat.common.dto.Pageable;
Expand All @@ -28,6 +27,7 @@
import com.netflix.metacat.connector.polaris.mappers.PolarisTableMapper;
import com.netflix.metacat.connector.polaris.store.PolarisStoreService;
import com.netflix.spectator.api.NoopRegistry;
import lombok.Getter;
import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -47,7 +47,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;


/**
Expand All @@ -58,10 +57,11 @@
@ActiveProfiles(profiles = {"polarisconnectortest"})
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
@AutoConfigureDataJpa
@Getter
public class PolarisConnectorTableServiceTest {
private static final String CATALOG_NAME = "catalog_name";
private static final String DB_NAME = "db_name";
private static final QualifiedName DB_QUALIFIED_NAME = QualifiedName.ofDatabase(CATALOG_NAME, DB_NAME);
public static final String CATALOG_NAME = "catalog_name";
public static final String DB_NAME = "db_name";
public static final QualifiedName DB_QUALIFIED_NAME = QualifiedName.ofDatabase(CATALOG_NAME, DB_NAME);

@Autowired
private PolarisStoreService polarisStoreService;
Expand Down Expand Up @@ -100,6 +100,13 @@ public void init() {
new IcebergTableOpsProxy()),
new PolarisTableMapper(CATALOG_NAME),
connectorContext);

// For crdb, we need to insert the db record otherwise it will throw
// org.postgresql.util.PSQLException: ERROR:
// insert on table "tbls" violates foreign key constraint "fk_db_name_ref_dbs"
// for h2db, this is not a hard requirement
final String location = "file://temp";
getPolarisStoreService().createDatabase(DB_NAME, location, "metacat_user");
}

/**
Expand Down Expand Up @@ -130,31 +137,6 @@ public void testTableExists() {
Assert.assertTrue(exists);
}

/**
* Test table list.
*/
@Test
public void testList() {
final QualifiedName name1 = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table1");
final TableInfo tableInfo1 = TableInfo.builder()
.name(name1)
.metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc1"))
.build();
polarisTableService.create(requestContext, tableInfo1);
final QualifiedName name2 = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table2");
final TableInfo tableInfo2 = TableInfo.builder()
.name(name2)
.metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc2"))
.build();
polarisTableService.create(requestContext, tableInfo2);
final QualifiedName qualifiedName = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "");
final List<TableInfo> tables = polarisTableService.list(
requestContext, DB_QUALIFIED_NAME, qualifiedName, new Sort(null, SortOrder.ASC), new Pageable(2, 0));
Assert.assertEquals(tables.size(), 2);
Assert.assertEquals(tables.stream().map(TableInfo::getName).collect(Collectors.toSet()),
ImmutableSet.of(name1, name2));
}

/**
* Test table creation then list tables.
*/
Expand Down

0 comments on commit 1958117

Please sign in to comment.