Skip to content

Commit

Permalink
list table names with snapshot (#566)
Browse files Browse the repository at this point in the history
* list table names with snapshot

* address comments

* address comments

---------

Co-authored-by: Yingjian Wu <[email protected]>
  • Loading branch information
stevie9868 and Yingjian Wu authored Jan 9, 2024
1 parent b4f73aa commit 3f55f9f
Show file tree
Hide file tree
Showing 12 changed files with 180 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,20 @@ public interface Config {
*/
boolean listDatabaseNameByDefaultOnGetCatalog();

/**
* Get the page size when listing table entities.
*
* @return True if it is.
*/
int getListTableEntitiesPageSize();

/**
* Get the page size when listing table names.
*
* @return True if it is.
*/
int getListTableNamesPageSize();

/**
* Metadata query timeout in seconds.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,16 @@ public boolean listDatabaseNameByDefaultOnGetCatalog() {
return this.metacatProperties.getService().isListDatabaseNameByDefaultOnGetCatalog();
}

@Override
public int getListTableEntitiesPageSize() {
return this.metacatProperties.getService().getListTableEntitiesPageSize();
}

@Override
public int getListTableNamesPageSize() {
return this.metacatProperties.getService().getListTableNamesPageSize();
}

@Override
public int getMetadataQueryTimeout() {
return this.metacatProperties.getUsermetadata().getQueryTimeoutInSeconds();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class ServiceProperties {
private Tables tables = new Tables();
private boolean listTableNamesByDefaultOnGetDatabase = true;
private boolean listDatabaseNameByDefaultOnGetCatalog = true;
private int listTableEntitiesPageSize = 1000;
private int listTableNamesPageSize = 10000;

/**
* Max related properties.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.netflix.metacat.connector.polaris;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.netflix.metacat.common.QualifiedName;
Expand All @@ -17,6 +18,7 @@
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit.jupiter.SpringExtension;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

Expand All @@ -31,6 +33,88 @@
@ActiveProfiles(profiles = {"polaris_functional_test"})
@AutoConfigureDataJpa
public class PolarisConnectorTableServiceFunctionalTest extends PolarisConnectorTableServiceTest {
/**
* Test get table names.
*/
@Test
public void testGetTableNames() {
final QualifiedName name1 = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table1");
final TableInfo tableInfo1 = TableInfo.builder()
.name(name1)
.metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc1"))
.build();
getPolarisTableService().create(getRequestContext(), tableInfo1);
final QualifiedName name2 = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table2");
final TableInfo tableInfo2 = TableInfo.builder()
.name(name2)
.metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc2"))
.build();
getPolarisTableService().create(getRequestContext(), tableInfo2);
final QualifiedName name3 = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table3");
final TableInfo tableInfo3 = TableInfo.builder()
.name(name3)
.metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc3"))
.build();
getPolarisTableService().create(getRequestContext(), tableInfo3);

try {
// pause execution for 10000 milliseconds (10 seconds)
Thread.sleep(10000);
} catch (InterruptedException e) {
log.debug("Sleep was interrupted");
}

final List<QualifiedName> tables = getPolarisTableService()
.getTableNames(getRequestContext(), DB_QUALIFIED_NAME, "", -1);
Assert.assertEquals(tables.size(), 3);
Assert.assertEquals(tables, ImmutableList.of(name1, name2, name3));
}

/**
* Test empty list tables.
*/
@Test
public void testListTablesEmpty() {
final QualifiedName qualifiedName = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "");

try {
// pause execution for 10000 milliseconds (10 seconds)
Thread.sleep(10000);
} catch (InterruptedException e) {
log.debug("Sleep was interrupted");
}

final List<QualifiedName> names = getPolarisTableService().listNames(
getRequestContext(), DB_QUALIFIED_NAME, qualifiedName,
new Sort(null, SortOrder.ASC), new Pageable(2, 0));
Assert.assertEquals(names, Arrays.asList());
}

/**
* Test table creation then list tables.
*/
@Test
public void testTableCreationAndList() {
final QualifiedName qualifiedName = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table1");
final TableInfo tableInfo = TableInfo.builder()
.name(qualifiedName)
.metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc1"))
.build();
getPolarisTableService().create(getRequestContext(), tableInfo);

try {
// pause execution for 10000 milliseconds (10 seconds)
Thread.sleep(10000);
} catch (InterruptedException e) {
log.debug("Sleep was interrupted");
}

final List<QualifiedName> names = getPolarisTableService().listNames(
getRequestContext(), DB_QUALIFIED_NAME, qualifiedName,
new Sort(null, SortOrder.ASC), new Pageable(2, 0));
Assert.assertEquals(names, Arrays.asList(qualifiedName));
}

/**
* Test table list.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,37 @@
@AutoConfigureDataJpa
public class PolarisStoreConnectorFunctionalTest extends PolarisStoreConnectorTest {

/**
* Test to verify that table names fetch works.
*/
@Test
public void testPaginatedFetch() {
final String dbName = generateDatabaseName();
createDB(dbName);
List<String> tblNames = getPolarisConnector().getTables(dbName, "", 1000);
Assert.assertEquals(0, tblNames.size());

final String tblNameA = "A_" + generateTableName();
final String tblNameB = "B_" + generateTableName();
final String tblNameC = "C_" + generateTableName();
createTable(dbName, tblNameA);
createTable(dbName, tblNameB);
createTable(dbName, tblNameC);

try {
// pause execution for 10000 milliseconds (10 seconds)
Thread.sleep(10000);
} catch (InterruptedException e) {
log.debug("Sleep was interrupted");
}

tblNames = getPolarisConnector().getTables(dbName, "", 1000);
Assert.assertEquals(3, tblNames.size());
Assert.assertEquals(tblNameA, tblNames.get(0));
Assert.assertEquals(tblNameB, tblNames.get(1));
Assert.assertEquals(tblNameC, tblNames.get(2));
}

/**
* Test getTableEntities.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,10 @@ public List<QualifiedName> listNames(
try {
final List<QualifiedName> qualifiedNames = Lists.newArrayList();
final String tableFilter = (prefix != null && prefix.isTableDefinition()) ? prefix.getTableName() : "";
for (String tableName : polarisStoreService.getTables(name.getDatabaseName(), tableFilter)) {
for (String tableName : polarisStoreService.getTables(name.getDatabaseName(),
tableFilter,
connectorContext.getConfig().getListTableNamesPageSize())
) {
final QualifiedName qualifiedName =
QualifiedName.ofTable(name.getCatalogName(), name.getDatabaseName(), tableName);
if (prefix != null && !qualifiedName.toString().startsWith(prefix.toString())) {
Expand Down Expand Up @@ -338,7 +341,9 @@ public List<TableInfo> list(
try {
final String tableFilter = (prefix != null && prefix.isTableDefinition()) ? prefix.getTableName() : "";
final List<PolarisTableEntity> tbls =
polarisStoreService.getTableEntities(name.getDatabaseName(), tableFilter, 1000);
polarisStoreService.getTableEntities(name.getDatabaseName(),
tableFilter,
connectorContext.getConfig().getListTableEntitiesPageSize());
if (sort != null) {
ConnectorUtils.sort(tbls, sort, Comparator.comparing(t -> t.getTblName()));
}
Expand Down Expand Up @@ -388,7 +393,10 @@ public List<QualifiedName> getTableNames(
final List<QualifiedName> result = Lists.newArrayList();
for (int i = 0; i < databaseNames.size() && limitSize > 0; i++) {
final String databaseName = databaseNames.get(i);
final List<String> tableNames = polarisStoreService.getTables(name.getDatabaseName(), "");
final List<String> tableNames = polarisStoreService.getTables(
name.getDatabaseName(),
"",
connectorContext.getConfig().getListTableNamesPageSize());
result.addAll(tableNames.stream()
.map(n -> QualifiedName.ofTable(name.getCatalogName(), databaseName, n))
.limit(limitSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import org.springframework.data.domain.Sort;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

Expand Down Expand Up @@ -158,7 +157,8 @@ public Optional<PolarisTableEntity> getTable(final String dbName, final String t
public List<PolarisTableEntity> getTableEntities(final String databaseName,
final String tableNamePrefix,
final int pageFetchSize) {
return tblRepo.findAllTablesByDbNameAndTablePrefix(databaseName, tableNamePrefix, pageFetchSize);
return (List<PolarisTableEntity>)
tblRepo.findAllTablesByDbNameAndTablePrefix(databaseName, tableNamePrefix, pageFetchSize, true);
}

/**
Expand Down Expand Up @@ -205,22 +205,9 @@ boolean tableExistsById(final String tblId) {
*/
@Override
@Transactional(propagation = Propagation.SUPPORTS)
public List<String> getTables(final String databaseName, final String tableNamePrefix) {
final int pageFetchSize = 1000;
final List<String> retval = new ArrayList<>();
final String tblPrefix = tableNamePrefix == null ? "" : tableNamePrefix;
Pageable page = PageRequest.of(0, pageFetchSize, Sort.by("tblName").ascending());
Slice<String> tblNames = null;
boolean hasNext = true;
do {
tblNames = tblRepo.findAllByDbNameAndTablePrefix(databaseName, tblPrefix, page);
retval.addAll(tblNames.toList());
hasNext = tblNames.hasNext();
if (hasNext) {
page = tblNames.nextPageable();
}
} while (hasNext);
return retval;
public List<String> getTables(final String databaseName, final String tableNamePrefix, final int pageFetchSize) {
return (List<String>)
tblRepo.findAllTablesByDbNameAndTablePrefix(databaseName, tableNamePrefix, pageFetchSize, false);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,10 @@ public interface PolarisStoreService {
* Gets tables in the database and tableName prefix.
* @param databaseName database name
* @param tableNamePrefix table name prefix
* @param pageFetchSize size of each page
* @return list of table names in the database with the table name prefix.
*/
List<String> getTables(String databaseName, String tableNamePrefix);
List<String> getTables(String databaseName, String tableNamePrefix, int pageFetchSize);

/**
* Do an atomic compare-and-swap to update the table's metadata location.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.netflix.metacat.connector.polaris.store.repos;

import com.netflix.metacat.connector.polaris.store.entities.PolarisTableEntity;
import java.util.List;

/**
Expand All @@ -12,8 +11,9 @@ public interface PolarisTableCustomRepository {
* @param dbName database name
* @param tableNamePrefix table name prefix. can be empty.
* @param pageSize target size for each page
* @param selectAllColumns if true return the PolarisEntity else return name of the entity
* @return table entities in the database.
*/
List<PolarisTableEntity> findAllTablesByDbNameAndTablePrefix(
String dbName, String tableNamePrefix, int pageSize);
List<?> findAllTablesByDbNameAndTablePrefix(
String dbName, String tableNamePrefix, int pageSize, boolean selectAllColumns);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ public class PolarisTableCustomRepositoryImpl implements PolarisTableCustomRepos
@PersistenceContext
private EntityManager entityManager;

private Slice<PolarisTableEntity> findAllTablesByDbNameAndTablePrefixForCurrentPage(
final String dbName, final String tableNamePrefix, final Pageable page) {
private <T> Slice<T> findAllTablesByDbNameAndTablePrefixForCurrentPage(
final String dbName, final String tableNamePrefix, final Pageable page, final boolean selectAllColumns) {

// Generate ORDER BY clause
String orderBy = "";
Expand All @@ -37,14 +37,21 @@ private Slice<PolarisTableEntity> findAllTablesByDbNameAndTablePrefixForCurrentP
orderBy = " ORDER BY " + orderBy;
}

final String sql = "SELECT t.* FROM TBLS t "
final String selectClause = selectAllColumns ? "t.*" : "t.tbl_name";
final String sql = "SELECT " + selectClause + " FROM TBLS t "
+ "WHERE t.db_name = :dbName AND t.tbl_name LIKE :tableNamePrefix" + orderBy;
final Query query = entityManager.createNativeQuery(sql, PolarisTableEntity.class);

Query query;
if (selectAllColumns) {
query = entityManager.createNativeQuery(sql, PolarisTableEntity.class);
} else {
query = entityManager.createNativeQuery(sql);
}
query.setParameter("dbName", dbName);
query.setParameter("tableNamePrefix", tableNamePrefix + "%");
query.setFirstResult(page.getPageNumber() * page.getPageSize());
query.setMaxResults(page.getPageSize() + 1); // Fetch one extra result to determine if there is a next page
final List<PolarisTableEntity> resultList = query.getResultList();
final List<T> resultList = query.getResultList();
// Check if there is a next page
final boolean hasNext = resultList.size() > page.getPageSize();
// If there is a next page, remove the last item from the list
Expand All @@ -56,18 +63,18 @@ private Slice<PolarisTableEntity> findAllTablesByDbNameAndTablePrefixForCurrentP

@Override
@Transactional
public List<PolarisTableEntity> findAllTablesByDbNameAndTablePrefix(
final String dbName, final String tableNamePrefix, final int pageFetchSize) {
public List<?> findAllTablesByDbNameAndTablePrefix(
final String dbName, final String tableNamePrefix, final int pageFetchSize, final boolean selectAllColumns) {
Pageable page = PageRequest.of(0, pageFetchSize, Sort.by("tbl_name").ascending());
entityManager.createNativeQuery("SET TRANSACTION AS OF SYSTEM TIME follower_read_timestamp()")
.executeUpdate();
final List<PolarisTableEntity> retval = new ArrayList<>();
final List<Object> retval = new ArrayList<>();
final String tblPrefix = tableNamePrefix == null ? "" : tableNamePrefix;
Slice<PolarisTableEntity> tbls;
Slice<?> tbls;
boolean hasNext;
do {
tbls = findAllTablesByDbNameAndTablePrefixForCurrentPage(dbName, tblPrefix, page);
retval.addAll(tbls.toList());
tbls = findAllTablesByDbNameAndTablePrefixForCurrentPage(dbName, tblPrefix, page, selectAllColumns);
retval.addAll(tbls.getContent());
hasNext = tbls.hasNext();
if (hasNext) {
page = tbls.nextPageable();
Expand Down
Loading

0 comments on commit 3f55f9f

Please sign in to comment.