Skip to content

Commit

Permalink
repo-sync-2024-09-14T14:22:57+0800
Browse files Browse the repository at this point in the history
  • Loading branch information
oeqqwq committed Sep 14, 2024
1 parent c518a71 commit 04c1970
Show file tree
Hide file tree
Showing 46 changed files with 1,551 additions and 445 deletions.
2 changes: 1 addition & 1 deletion .bazelrc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

common --experimental_repo_remote_exec
common --experimental_remote_download_regex='.*\/dataproxy_sdk$|.*\/arrow$'
common --modify_execution_info=CppLink=+no-remote-cache


build --incompatible_new_actions_api=false
Expand Down
2 changes: 1 addition & 1 deletion .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ IncludeCategories:
Priority: 2
- Regex: '.*\.pb\.h"$'
Priority: 5
- Regex: '^"secretflow_serving.*'
- Regex: '^"dataproxy_sdk.*'
Priority: 4
- Regex: '^".*'
Priority: 3
6 changes: 3 additions & 3 deletions build/Dockerfiles/dataproxy.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# Copyright 2024 Ant Group Co., Ltd.
#
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down
6 changes: 3 additions & 3 deletions config/application.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# Copyright 2024 Ant Group Co., Ltd.
#
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/*
* Copyright 2024 Ant Group Co., Ltd.
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/*
* Copyright 2024 Ant Group Co., Ltd.
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/*
* Copyright 2024 Ant Group Co., Ltd.
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/*
* Copyright 2024 Ant Group Co., Ltd.
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public class OdpsDataWriter implements DataWriter {

private final boolean overwrite = true;

private boolean isTemporarilyCreatedTable = false;

private TableTunnel.UploadSession uploadSession = null;
private RecordWriter recordWriter = null;

Expand Down Expand Up @@ -147,9 +149,9 @@ private void initOdps() throws TunnelException, IOException {
Odps odps = initOdpsClient(this.connConfig);
// Pre-processing
preProcessing(odps, connConfig.getProjectName(), tableInfo.tableName());
// init download session
// init upload session
TableTunnel tunnel = new TableTunnel(odps);
if (tableInfo.partitionSpec() != null && !tableInfo.partitionSpec().isEmpty()) {
if (tableInfo.partitionSpec() != null && !tableInfo.partitionSpec().isEmpty() && !isTemporarilyCreatedTable) {
PartitionSpec partitionSpec = new PartitionSpec(tableInfo.partitionSpec());
uploadSession = tunnel.createUploadSession(connConfig.getProjectName(), tableInfo.tableName(), partitionSpec, overwrite);
} else {
Expand Down Expand Up @@ -244,6 +246,7 @@ private void preProcessing(Odps odps, String projectName, String tableName) {
if (!odpsTable) {
throw DataproxyException.of(DataproxyErrorCode.ODPS_CREATE_TABLE_FAILED);
}
isTemporarilyCreatedTable = true;
}
log.info("odps table is exists or create table successful, project: {}, table name: {}", projectName, tableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.ResultSet;
import com.aliyun.odps.task.SQLTask;
import com.aliyun.odps.utils.StringUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.BigIntVector;
Expand All @@ -44,12 +43,7 @@
import org.secretflow.dataproxy.manager.SplitReader;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
Expand Down Expand Up @@ -155,147 +149,16 @@ public ArrowReader startRead() {
return this;
}

private String buildSql(String tableName, List<String> fields, String partition) {
private String buildSql(String tableName, List<String> fields, String whereClause) {

if (!columnOrValuePattern.matcher(tableName).matches()) {
throw DataproxyException.of(DataproxyErrorCode.PARAMS_UNRELIABLE, "Invalid tableName:" + tableName);
}

String transformedPartition = buildWhereClause(partition);
return "select " + String.join(",", fields) + " from " + tableName + (transformedPartition.isEmpty() ? "" : " where " + transformedPartition) + ";";
return "select " + String.join(",", fields) + " from " + tableName + (whereClause.isEmpty() ? "" : " where " + whereClause) + ";";
}

/**
* 过时方法,后续删除
*
* @param partition 分区字段
* @return boolean
*/
@Deprecated
private String transformPartition(String partition) {

Map<String, List<String>> fieldValuesMap = new HashMap<>();

if (partition != null) {
String[] split = StringUtils.split(partition, ';');
for (String s : split) {
String[] kv = StringUtils.split(s, '=');
if (kv.length != 2 || kv[0].isEmpty() || kv[1].isEmpty()) {
throw DataproxyException.of(DataproxyErrorCode.INVALID_PARTITION_SPEC);
}
if (fieldValuesMap.containsKey(kv[0])) {
fieldValuesMap.get(kv[0]).add(kv[1]);
} else {
fieldValuesMap.put(kv[0], new ArrayList<>(List.of(kv[1])));
}
}
}

return buildEqualClause(fieldValuesMap).toString();
}

/**
* 构造转换等于号多值条件至 "in" 条件,单值保留为 "=" 条件 <br>
*
* @param fieldValuesMap 字段值
* @return where clause string
*/
private StringBuilder buildEqualClause(Map<String, List<String>> fieldValuesMap) {
StringBuilder sb = new StringBuilder();
if (!fieldValuesMap.isEmpty()) {

boolean first = true;
for (Map.Entry<String, List<String>> entry : fieldValuesMap.entrySet()) {
if (!first) {
sb.append(" and ");
}
first = false;
sb.append(entry.getKey());
List<String> values = entry.getValue();
if (values.size() > 1) {
sb.append(" in (");
for (String value : values) {
sb.append("'").append(value).append("'").append(", ");
}
sb.setLength(sb.length() - 2);
sb.append(")");
} else {
sb.append(" = ").append("'").append(values.get(0)).append("'");
}
}
}

return sb;
}

/**
* TODO: 对于通过 JDBC 操作的方式,可以把这块逻辑抽出来
*
* @param conditionString 条件字段
* @return where clause
*/
private String buildWhereClause(String conditionString) {

if (conditionString == null || conditionString.isEmpty()) {
return "";
}

String[] conditions = conditionString.split(";");

StringBuilder whereClause = new StringBuilder();
Pattern pattern = Pattern.compile("^(\\w+)(>=|<=|<>|!=|=|>|<| LIKE | like )(.*)$");


Map<String, List<String>> equalFieldValuesMap = new HashMap<>();

for (String condition : conditions) {
Matcher matcher = pattern.matcher(condition.trim());

if (!matcher.matches() || matcher.groupCount() != 3) {
throw new DataproxyException(DataproxyErrorCode.INVALID_PARTITION_SPEC, "Invalid condition format: " + condition);
}

String column = matcher.group(1).trim();
String operator = matcher.group(2);
String value = matcher.group(3).trim();

if (!columnOrValuePattern.matcher(column).matches()) {
throw new DataproxyException(DataproxyErrorCode.INVALID_PARTITION_SPEC, "Invalid condition format: " + column);
}

if (!columnOrValuePattern.matcher(value).matches()) {
throw new DataproxyException(DataproxyErrorCode.INVALID_PARTITION_SPEC, "Invalid condition format: " + column);
}

// 安全处理用户输入的值,可以根据具体需要进行处理
value = value.replace("'", "''"); // 简单处理单引号转义

if ("=".equals(operator)) {
if (equalFieldValuesMap.containsKey(column)) {
equalFieldValuesMap.get(column).add(value);
} else {
equalFieldValuesMap.put(column, new ArrayList<>(List.of(value)));
}
} else {
if (!whereClause.isEmpty()) {
whereClause.append(" and ");
}
whereClause.append(column).append(' ').append(operator).append(" '").append(value).append("'");
}
}
StringBuilder equalFieldClause = buildEqualClause(equalFieldValuesMap);

if (whereClause.isEmpty()) {
return equalFieldClause.toString();
}

if (!equalFieldClause.isEmpty()) {
whereClause.append(" and ").append(equalFieldClause);
}
return whereClause.toString();
}

private void toArrowVector(Record record, VectorSchemaRoot root, int rowIndex) throws IOException {
private void toArrowVector(Record record, VectorSchemaRoot root, int rowIndex) {
FieldVector vector;
String columnName;
for (Field field : schema.getFields()) {
Expand Down Expand Up @@ -327,7 +190,8 @@ private void setValue(ArrowType type, FieldVector vector, int rowIndex, Record r
}
case Utf8 -> {
if (vector instanceof VarCharVector varcharVector) {
varcharVector.setSafe(rowIndex, record.getString(columnName).getBytes(StandardCharsets.UTF_8));
// record#getBytes default is UTF-8
varcharVector.setSafe(rowIndex, record.getBytes(columnName));
} else {
log.warn("Unsupported type: {}", type);
}
Expand Down
6 changes: 3 additions & 3 deletions dataproxy-server/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# Copyright 2024 Ant Group Co., Ltd.
#
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down
6 changes: 3 additions & 3 deletions dataproxy_sdk/bazel/repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ def _kuscia():
http_archive,
name = "kuscia",
urls = [
"https://github.com/secretflow/kuscia/archive/refs/tags/v0.9.0b0.tar.gz",
"https://github.com/secretflow/kuscia/archive/refs/tags/v0.11.0b0.tar.gz",
],
strip_prefix = "kuscia-0.9.0b0",
sha256 = "851455f4a3ba70850c8a751a78ebfbbb9fd6d78ec902d0cbf32c2c565d1c8410",
strip_prefix = "kuscia-0.11.0b0",
sha256 = "c8de425a5f442ba3fa30a9b5943f9fd056efd9ab610ddc2168d5ffcf71224974",
)

def _bazel_rules_pkg():
Expand Down
Loading

0 comments on commit 04c1970

Please sign in to comment.