Skip to content

Commit

Permalink
[procedure] use MigrateIcebergTableProcedure for migration
Browse files Browse the repository at this point in the history
# Conflicts:
#	paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
  • Loading branch information
LsomeYeah committed Jan 20, 2025
1 parent 7c4a0c3 commit f73ecd1
Show file tree
Hide file tree
Showing 12 changed files with 575 additions and 264 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.utils.TableMigrationUtils;
import org.apache.paimon.migrate.Migrator;
import org.apache.paimon.utils.ParameterUtils;

import org.apache.flink.table.procedure.ProcedureContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Migrate procedure to migrate iceberg table to paimon table. */
public class MigrateIcebergTableProcedure extends ProcedureBase {

private static final Logger LOG = LoggerFactory.getLogger(MigrateIcebergTableProcedure.class);

private static final String PAIMON_SUFFIX = "_paimon_";

@Override
public String identifier() {
return "migrate_iceberg_table";
}

public String[] call(
ProcedureContext procedureContext, String sourceTablePath, String icebergProperties)
throws Exception {

return call(procedureContext, sourceTablePath, icebergProperties, "");
}

public String[] call(
ProcedureContext procedureContext,
String sourceTablePath,
String icebergProperties,
String properties)
throws Exception {

return call(
procedureContext,
sourceTablePath,
icebergProperties,
properties,
Runtime.getRuntime().availableProcessors());
}

public String[] call(
ProcedureContext procedureContext,
String sourceTablePath,
String icebergProperties,
String properties,
Integer parallelism)
throws Exception {
String targetTablePath = sourceTablePath + PAIMON_SUFFIX;

Identifier sourceTableId = Identifier.fromString(sourceTablePath);
Identifier targetTableId = Identifier.fromString(targetTablePath);

Migrator migrator =
TableMigrationUtils.getIcebergImporter(
catalog,
sourceTableId.getDatabaseName(),
sourceTableId.getObjectName(),
targetTableId.getDatabaseName(),
targetTableId.getObjectName(),
parallelism,
ParameterUtils.parseCommaSeparatedKeyValues(properties),
ParameterUtils.parseCommaSeparatedKeyValues(icebergProperties));
LOG.info("create migrator success.");
migrator.executeMigrate();

migrator.renameTable(false);
return new String[] {"Success"};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,6 @@ public String[] call(
String properties,
Integer parallelism)
throws Exception {

return call(procedureContext, connector, sourceTablePath, properties, parallelism, "");
}

public String[] call(
ProcedureContext procedureContext,
String connector,
String sourceTablePath,
String properties,
Integer parallelism,
String icebergProperties)
throws Exception {
String targetTablePath = sourceTablePath + PAIMON_SUFFIX;

Identifier sourceTableId = Identifier.fromString(sourceTablePath);
Expand All @@ -93,8 +81,7 @@ public String[] call(
targetTableId.getDatabaseName(),
targetTableId.getObjectName(),
parallelism,
ParameterUtils.parseCommaSeparatedKeyValues(properties),
ParameterUtils.parseCommaSeparatedKeyValues(icebergProperties));
ParameterUtils.parseCommaSeparatedKeyValues(properties));
LOG.info("create migrator success.");
migrator.executeMigrate();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import org.apache.paimon.flink.procedure.MigrateIcebergTableProcedure;

import org.apache.flink.table.procedure.DefaultProcedureContext;

import java.util.Map;

/** Migrate from iceberg table to paimon table. */
public class MigrateIcebergTableAction extends ActionBase {

private final String sourceTableFullName;
private final String tableProperties;
private final Integer parallelism;

private final String icebergProperties;

public MigrateIcebergTableAction(
String sourceTableFullName,
Map<String, String> catalogConfig,
String icebergProperties,
String tableProperties,
Integer parallelism) {
super(catalogConfig);
this.sourceTableFullName = sourceTableFullName;
this.tableProperties = tableProperties;
this.parallelism = parallelism;
this.icebergProperties = icebergProperties;
}

@Override
public void run() throws Exception {
MigrateIcebergTableProcedure migrateIcebergTableProcedure =
new MigrateIcebergTableProcedure();
migrateIcebergTableProcedure.withCatalog(catalog);
migrateIcebergTableProcedure.call(
new DefaultProcedureContext(env),
sourceTableFullName,
icebergProperties,
tableProperties,
parallelism);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import java.util.Map;
import java.util.Optional;

/** Action Factory for {@link MigrateIcebergTableAction}. */
public class MigrateIcebergTableActionFactory implements ActionFactory {

public static final String IDENTIFIER = "migrate_iceberg_table";

private static final String OPTIONS = "options";
private static final String PARALLELISM = "parallelism";

private static final String ICEBERG_OPTIONS = "iceberg_options";

@Override
public String identifier() {
return IDENTIFIER;
}

@Override
public Optional<Action> create(MultipleParameterToolAdapter params) {

String sourceTable = params.get(TABLE);
Map<String, String> catalogConfig = catalogConfigMap(params);
String tableConf = params.get(OPTIONS);
Integer parallelism =
params.get(PARALLELISM) == null ? null : Integer.parseInt(params.get(PARALLELISM));

String icebergOptions = params.get(ICEBERG_OPTIONS);

MigrateIcebergTableAction migrateIcebergTableAction =
new MigrateIcebergTableAction(
sourceTable, catalogConfig, icebergOptions, tableConf, parallelism);
return Optional.of(migrateIcebergTableAction);
}

@Override
public void printHelp() {
System.out.println(
"Action \"migrate_iceberg_table\" runs a migrating job from iceberg to paimon.");
System.out.println();

System.out.println("Syntax:");
System.out.println(
" migrate_iceberg_table"
+ "--table <database.table_name> "
+ "--iceberg_options <key>=<value>[,<key>=<value>,...]"
+ "[--catalog_conf <key>=<value] "
+ "[--options <key>=<value>,<key>=<value>,...]");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,39 +28,21 @@
public class MigrateTableAction extends ActionBase {

private final String connector;
private final String sourceTableFullName;
private final String hiveTableFullName;
private final String tableProperties;
private final Integer parallelism;

private final String icebergProperties;

public MigrateTableAction(
String connector,
String sourceTableFullName,
String hiveTableFullName,
Map<String, String> catalogConfig,
String tableProperties,
Integer parallelism) {
super(catalogConfig);
this.connector = connector;
this.sourceTableFullName = sourceTableFullName;
this.tableProperties = tableProperties;
this.parallelism = parallelism;
this.icebergProperties = null;
}

public MigrateTableAction(
String connector,
String sourceTableFullName,
Map<String, String> catalogConfig,
String tableProperties,
Integer parallelism,
String icebergProperties) {
super(catalogConfig);
this.connector = connector;
this.sourceTableFullName = sourceTableFullName;
this.hiveTableFullName = hiveTableFullName;
this.tableProperties = tableProperties;
this.parallelism = parallelism;
this.icebergProperties = icebergProperties;
}

@Override
Expand All @@ -70,9 +52,8 @@ public void run() throws Exception {
migrateTableProcedure.call(
new DefaultProcedureContext(env),
connector,
sourceTableFullName,
hiveTableFullName,
tableProperties,
parallelism,
icebergProperties);
parallelism);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ public class MigrateTableActionFactory implements ActionFactory {
private static final String OPTIONS = "options";
private static final String PARALLELISM = "parallelism";

private static final String ICEBERG_OPTIONS = "iceberg_options";

@Override
public String identifier() {
return IDENTIFIER;
Expand All @@ -43,19 +41,11 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
String sourceHiveTable = params.get(TABLE);
Map<String, String> catalogConfig = catalogConfigMap(params);
String tableConf = params.get(OPTIONS);
Integer parallelism =
params.get(PARALLELISM) == null ? null : Integer.parseInt(params.get(PARALLELISM));

String icebergOptions = params.get(ICEBERG_OPTIONS);
Integer parallelism = Integer.parseInt(params.get(PARALLELISM));

MigrateTableAction migrateTableAction =
new MigrateTableAction(
connector,
sourceHiveTable,
catalogConfig,
tableConf,
parallelism,
icebergOptions);
connector, sourceHiveTable, catalogConfig, tableConf, parallelism);
return Optional.of(migrateTableAction);
}

Expand Down
Loading

0 comments on commit f73ecd1

Please sign in to comment.