From 4d3cc0a9fb27b67a1f91a1791a37229af1af2499 Mon Sep 17 00:00:00 2001 From: fuweng11 <76141879+fuweng11@users.noreply.github.com> Date: Mon, 23 Dec 2024 18:47:02 +0800 Subject: [PATCH] [INLONG-11618][Manager] Support COS stream source (#11619) --- .../manager/common/consts/DataNodeType.java | 1 + .../manager/common/consts/SourceType.java | 2 + .../dao/mapper/DataNodeEntityMapper.java | 3 + .../mappers/DataNodeEntityMapper.xml | 10 ++ .../manager/pojo/node/cos/COSDataNodeDTO.java | 78 +++++++++ .../pojo/node/cos/COSDataNodeInfo.java | 65 +++++++ .../pojo/node/cos/COSDataNodeRequest.java | 52 ++++++ .../source/cos/COSDataAddTaskRequest.java | 42 +++++ .../manager/pojo/source/cos/COSSource.java | 90 ++++++++++ .../manager/pojo/source/cos/COSSourceDTO.java | 116 +++++++++++++ .../pojo/source/cos/COSSourceRequest.java | 71 ++++++++ .../manager/service/node/DataNodeService.java | 2 + .../service/node/DataNodeServiceImpl.java | 14 ++ .../service/node/cos/COSDataNodeOperator.java | 84 +++++++++ .../source/AbstractSourceOperator.java | 11 ++ .../service/source/cos/COSSourceOperator.java | 160 ++++++++++++++++++ 16 files changed, 801 insertions(+) create mode 100644 inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cos/COSDataNodeDTO.java create mode 100644 inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cos/COSDataNodeInfo.java create mode 100644 inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cos/COSDataNodeRequest.java create mode 100644 inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSDataAddTaskRequest.java create mode 100644 inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSource.java create mode 100644 inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceDTO.java create mode 100644 inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceRequest.java create mode 100644 inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/cos/COSDataNodeOperator.java create mode 100644 inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/cos/COSSourceOperator.java diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java index 47b139f1592..35164d31d6f 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java @@ -29,6 +29,7 @@ public class DataNodeType { public static final String CLICKHOUSE = "CLICKHOUSE"; public static final String ELASTICSEARCH = "ELASTICSEARCH"; public static final String MYSQL = "MYSQL"; + public static final String COS = "COS"; public static final String STARROCKS = "STARROCKS"; public static final String REDIS = "REDIS"; public static final String KUDU = "KUDU"; diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java index cb6f1a7f6b1..59deca4749a 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java @@ -31,6 +31,7 @@ public class SourceType extends StreamType { public static final String TUBEMQ = "TUBEMQ"; public static final String FILE = "FILE"; + public static final String COS = "COS"; public static final String MYSQL_SQL = "MYSQL_SQL"; public static final String MYSQL_BINLOG = "MYSQL_BINLOG"; public static final String MONGODB = "MONGODB"; @@ -47,6 +48,7 @@ public class SourceType extends StreamType { put(KAFKA, TaskTypeEnum.KAFKA); put(FILE, TaskTypeEnum.FILE); + put(COS, TaskTypeEnum.COS); put(MYSQL_SQL, TaskTypeEnum.SQL); put(MYSQL_BINLOG, TaskTypeEnum.BINLOG); put(POSTGRESQL, TaskTypeEnum.POSTGRES); diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataNodeEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataNodeEntityMapper.java index c469dccc138..a9c45e3f6f5 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataNodeEntityMapper.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataNodeEntityMapper.java @@ -58,4 +58,7 @@ int copy(@Param("name") String name, @Param("type") String type, @Param("sourceT @MultiTenantQuery(with = false) List selectByIdSelective(DataNodeEntity record); + @MultiTenantQuery(with = false) + DataNodeEntity selectByUniqueKeyWithoutTenant(@Param("name") String name, @Param("type") String type); + } diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/DataNodeEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/DataNodeEntityMapper.xml index 7e62cb562ab..7ec124aa74c 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/DataNodeEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/DataNodeEntityMapper.xml @@ -263,4 +263,14 @@ and tenant = #{sourceTenant, jdbcType=VARCHAR} + diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cos/COSDataNodeDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cos/COSDataNodeDTO.java new file mode 100644 index 00000000000..f3cb2973cce --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cos/COSDataNodeDTO.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.node.cos; + +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonUtils; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.commons.lang3.StringUtils; + +import javax.validation.constraints.NotNull; + +/** + * COS data node info + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@ApiModel("COS data node info") +public class COSDataNodeDTO { + + @ApiModelProperty(value = "COS bucket name") + private String bucketName; + + @ApiModelProperty(value = "COS secret id") + private String credentialsId; + + @ApiModelProperty(value = "COS secret key") + private String credentialsKey; + + @ApiModelProperty(value = "COS region") + private String region; + + /** + * Get the dto instance from the request + */ + public static COSDataNodeDTO getFromRequest(COSDataNodeRequest request, String extParams) { + COSDataNodeDTO dto = StringUtils.isNotBlank(extParams) + ? COSDataNodeDTO.getFromJson(extParams) + : new COSDataNodeDTO(); + return CommonBeanUtils.copyProperties(request, dto, true); + } + + /** + * Get the dto instance from the JSON string. + */ + public static COSDataNodeDTO getFromJson(@NotNull String extParams) { + try { + return JsonUtils.parseObject(extParams, COSDataNodeDTO.class); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT, + String.format("Failed to parse extParams for COS node: %s", e.getMessage())); + } + } +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cos/COSDataNodeInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cos/COSDataNodeInfo.java new file mode 100644 index 00000000000..06c89e40dcd --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cos/COSDataNodeInfo.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.node.cos; + +import org.apache.inlong.manager.common.consts.DataNodeType; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.node.DataNodeInfo; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import lombok.experimental.SuperBuilder; + +/** + * COS data node info + */ +@Data +@SuperBuilder +@AllArgsConstructor +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = DataNodeType.COS) +@ApiModel("COS data node info") +public class COSDataNodeInfo extends DataNodeInfo { + + @ApiModelProperty(value = "COS bucket name") + private String bucketName; + + @ApiModelProperty(value = "COS secret id") + private String credentialsId; + + @ApiModelProperty(value = "COS secret key") + private String credentialsKey; + + @ApiModelProperty(value = "COS region") + private String region; + + public COSDataNodeInfo() { + this.setType(DataNodeType.COS); + } + + @Override + public COSDataNodeRequest genRequest() { + return CommonBeanUtils.copyProperties(this, COSDataNodeRequest::new); + } +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cos/COSDataNodeRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cos/COSDataNodeRequest.java new file mode 100644 index 00000000000..2b3b2126879 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cos/COSDataNodeRequest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.node.cos; + +import org.apache.inlong.manager.common.consts.DataNodeType; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.node.DataNodeRequest; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +/** + * COS data node request + */ +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = DataNodeType.COS) +@ApiModel("COS data node request") +public class COSDataNodeRequest extends DataNodeRequest { + + @ApiModelProperty(value = "COS bucket name") + private String bucketName; + + @ApiModelProperty(value = "COS secret id") + private String credentialsId; + + @ApiModelProperty(value = "COS secret key") + private String credentialsKey; + + @ApiModelProperty(value = "COS region") + private String region; + +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSDataAddTaskRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSDataAddTaskRequest.java new file mode 100644 index 00000000000..ca89b7ebab1 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSDataAddTaskRequest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.source.cos; + +import org.apache.inlong.manager.common.consts.SourceType; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.source.DataAddTaskRequest; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +import java.util.List; + +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = SourceType.COS) +@ApiModel(value = "COS data add task request") +public class COSDataAddTaskRequest extends DataAddTaskRequest { + + @ApiModelProperty("filterStreams") + private List filterStreams; + +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSource.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSource.java new file mode 100644 index 00000000000..cb496689bf4 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSource.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.source.cos; + +import org.apache.inlong.manager.common.consts.SourceType; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.source.SourceRequest; +import org.apache.inlong.manager.pojo.source.StreamSource; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import lombok.experimental.SuperBuilder; + +import java.util.List; + +/** + * COS source info + */ +@Data +@SuperBuilder +@AllArgsConstructor +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@ApiModel(value = "COS source info") +@JsonTypeDefine(value = SourceType.COS) +public class COSSource extends StreamSource { + + @ApiModelProperty(value = "Path regex pattern for file, such as /a/b/*.txt", required = true) + private String pattern; + + @ApiModelProperty("Cycle unit") + private String cycleUnit; + + @ApiModelProperty("Whether retry") + private Boolean retry; + + @ApiModelProperty(value = "Data start time") + private String dataTimeFrom; + + @ApiModelProperty(value = "Data end time") + private String dataTimeTo; + + @ApiModelProperty("TimeOffset for collection, " + + "'1m' means from one minute after, '-1m' means from one minute before, " + + "'1h' means from one hour after, '-1h' means from one minute before, " + + "'1d' means from one day after, '-1d' means from one minute before, " + + "Null or blank means from current timestamp") + private String timeOffset; + + @ApiModelProperty("Max file count") + private String maxFileCount; + + @ApiModelProperty(" Type of data result for column separator" + + " CSV format, set this parameter to a custom separator: , | : " + + " Json format, set this parameter to json ") + private String contentStyle; + + @ApiModelProperty("filterStreams") + private List filterStreams; + + public COSSource() { + this.setSourceType(SourceType.COS); + } + + @Override + public SourceRequest genSourceRequest() { + return CommonBeanUtils.copyProperties(this, COSSourceRequest::new); + } + +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceDTO.java new file mode 100644 index 00000000000..1dd85385fcf --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceDTO.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.source.cos; + +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonUtils; + +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +import javax.validation.constraints.NotNull; + +import java.util.List; + +/** + * COS source information data transfer object + */ +@Builder +@AllArgsConstructor +@NoArgsConstructor +@Data +@Slf4j +public class COSSourceDTO { + + @ApiModelProperty(value = "Path regex pattern for file, such as /a/b/*.txt", required = true) + private String pattern; + + @ApiModelProperty("Cycle unit") + private String cycleUnit = "D"; + + @ApiModelProperty("Whether retry") + private Boolean retry = false;; + + @ApiModelProperty("Column separator of data source ") + private String dataSeparator; + + @ApiModelProperty(value = "Data start time") + private String dataTimeFrom; + + @ApiModelProperty(value = "Data end time") + private String dataTimeTo; + + @ApiModelProperty("TimeOffset for collection, " + + "'1m' means from one minute after, '-1m' means from one minute before, " + + "'1h' means from one hour after, '-1h' means from one minute before, " + + "'1d' means from one day after, '-1d' means from one minute before, " + + "Null or blank means from current timestamp") + private String timeOffset; + + @ApiModelProperty("Max file count") + private String maxFileCount; + + @ApiModelProperty(" Type of data result for column separator" + + " CSV format, set this parameter to a custom separator: , | : " + + " Json format, set this parameter to json ") + private String contentStyle; + + @ApiModelProperty(value = "Audit version") + private String auditVersion; + + @ApiModelProperty("filterStreams") + private List filterStreams; + + @ApiModelProperty(value = "COS bucket name") + private String bucketName; + + @ApiModelProperty(value = "COS secret id") + private String credentialsId; + + @ApiModelProperty(value = "COS secret key") + private String credentialsKey; + + @ApiModelProperty(value = "COS region") + private String region; + + public static COSSourceDTO getFromRequest(@NotNull COSSourceRequest cosSourceRequest, String extParams) { + COSSourceDTO dto = StringUtils.isNotBlank(extParams) + ? COSSourceDTO.getFromJson(extParams) + : new COSSourceDTO(); + return CommonBeanUtils.copyProperties(cosSourceRequest, dto, true); + } + + public static COSSourceDTO getFromJson(@NotNull String extParams) { + try { + log.info("teste extparmas={}", extParams); + return JsonUtils.parseObject(extParams, COSSourceDTO.class); + } catch (Exception e) { + log.info("teste extparmas=eoor:", e); + throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT, + String.format("parse extParams of COSSource failure: %s", e.getMessage())); + } + } + +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceRequest.java new file mode 100644 index 00000000000..18266d619a2 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceRequest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.source.cos; + +import org.apache.inlong.manager.common.consts.SourceType; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.source.SourceRequest; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +import java.util.List; + +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = SourceType.COS) +@ApiModel(value = "COS source request") +public class COSSourceRequest extends SourceRequest { + + @ApiModelProperty(value = "Path regex pattern for file, such as /a/b/*.txt", required = true) + private String pattern; + + @ApiModelProperty("Cycle unit") + private String cycleUnit; + + @ApiModelProperty("Whether retry") + private Boolean retry; + + @ApiModelProperty(value = "Data start time") + private String dataTimeFrom; + + @ApiModelProperty(value = "Data end time") + private String dataTimeTo; + + @ApiModelProperty("TimeOffset for collection, " + + "'1m' means from one minute after, '-1m' means from one minute before, " + + "'1h' means from one hour after, '-1h' means from one minute before, " + + "'1d' means from one day after, '-1d' means from one minute before, " + + "Null or blank means from current timestamp") + private String timeOffset; + + @ApiModelProperty("Max file count") + private String maxFileCount; + + @ApiModelProperty(" Type of data result for column separator" + + " CSV format, set this parameter to a custom separator: , | : " + + " Json format, set this parameter to json ") + private String contentStyle; + + @ApiModelProperty("filterStreams") + private List filterStreams; +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeService.java index 73210536dd1..48b900d56b3 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeService.java @@ -156,4 +156,6 @@ public interface DataNodeService { */ Boolean testConnection(DataNodeRequest request); + DataNodeInfo getByKeyWithoutTenant(String name, String type); + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java index 9022049acf5..f7db3ec87e7 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java @@ -296,4 +296,18 @@ public Boolean testConnection(DataNodeRequest request) { return result; } + @Override + public DataNodeInfo getByKeyWithoutTenant(String name, String type) { + DataNodeEntity entity = dataNodeMapper.selectByUniqueKeyWithoutTenant(name, type); + if (entity == null) { + LOGGER.error("data node not found by name={}, type={}", name, type); + throw new BusinessException("data node not found"); + } + String dataNodeType = entity.getType(); + DataNodeOperator dataNodeOperator = operatorFactory.getInstance(dataNodeType); + DataNodeInfo dataNodeInfo = dataNodeOperator.getFromEntity(entity); + LOGGER.debug("success to get data node info by name={}, type={}", name, type); + return dataNodeInfo; + } + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/cos/COSDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/cos/COSDataNodeOperator.java new file mode 100644 index 00000000000..2e4cc594baa --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/cos/COSDataNodeOperator.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.node.cos; + +import org.apache.inlong.manager.common.consts.DataNodeType; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.dao.entity.DataNodeEntity; +import org.apache.inlong.manager.pojo.node.DataNodeInfo; +import org.apache.inlong.manager.pojo.node.DataNodeRequest; +import org.apache.inlong.manager.pojo.node.cos.COSDataNodeDTO; +import org.apache.inlong.manager.pojo.node.cos.COSDataNodeInfo; +import org.apache.inlong.manager.pojo.node.cos.COSDataNodeRequest; +import org.apache.inlong.manager.service.node.AbstractDataNodeOperator; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * COS data node operator + */ +@Service +public class COSDataNodeOperator extends AbstractDataNodeOperator { + + @Autowired + private ObjectMapper objectMapper; + + @Override + public Boolean accept(String dataNodeType) { + return getDataNodeType().equals(dataNodeType); + } + + @Override + public String getDataNodeType() { + return DataNodeType.COS; + } + + @Override + public DataNodeInfo getFromEntity(DataNodeEntity entity) { + if (entity == null) { + throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND); + } + + COSDataNodeInfo dataNodeInfo = new COSDataNodeInfo(); + CommonBeanUtils.copyProperties(entity, dataNodeInfo); + if (StringUtils.isNotBlank(entity.getExtParams())) { + COSDataNodeDTO dto = COSDataNodeDTO.getFromJson(entity.getExtParams()); + CommonBeanUtils.copyProperties(dto, dataNodeInfo); + } + return dataNodeInfo; + } + + @Override + protected void setTargetEntity(DataNodeRequest request, DataNodeEntity targetEntity) { + COSDataNodeRequest dataNodeRequest = (COSDataNodeRequest) request; + CommonBeanUtils.copyProperties(dataNodeRequest, targetEntity, true); + try { + COSDataNodeDTO dto = COSDataNodeDTO.getFromRequest(dataNodeRequest, targetEntity.getExtParams()); + targetEntity.setExtParams(objectMapper.writeValueAsString(dto)); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT, + String.format("Failed to build extParams for COS node: %s", e.getMessage())); + } + } + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java index cdf6290281f..83382dbcc10 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java @@ -59,6 +59,7 @@ import org.apache.inlong.manager.pojo.source.DataAddTaskRequest; import org.apache.inlong.manager.pojo.source.SourceRequest; import org.apache.inlong.manager.pojo.source.StreamSource; +import org.apache.inlong.manager.pojo.source.cos.COSSourceDTO; import org.apache.inlong.manager.pojo.source.file.FileSourceDTO; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.pojo.stream.StreamField; @@ -554,6 +555,16 @@ private DataConfig getDataConfig(StreamSourceEntity entity, int op) { extParams = JsonUtils.toJsonString(fileSourceDTO); } } + if (SourceType.COS.equalsIgnoreCase(entity.getSourceType())) { + String dataSeparator = String.valueOf((char) Integer.parseInt(streamEntity.getDataSeparator())); + COSSourceDTO cosSourceDTO = JsonUtils.parseObject(extParams, COSSourceDTO.class); + if (Objects.nonNull(cosSourceDTO)) { + cosSourceDTO.setDataSeparator(dataSeparator); + dataConfig.setAuditVersion(cosSourceDTO.getAuditVersion()); + cosSourceDTO.setContentStyle(streamEntity.getDataType()); + extParams = JsonUtils.toJsonString(cosSourceDTO); + } + } InlongStreamInfo streamInfo = CommonBeanUtils.copyProperties(streamEntity, InlongStreamInfo::new); // Processing extParams unpackExtParams(streamEntity.getExtParams(), streamInfo); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/cos/COSSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/cos/COSSourceOperator.java new file mode 100644 index 00000000000..f99a6321bdd --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/cos/COSSourceOperator.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.source.cos; + +import org.apache.inlong.manager.common.consts.DataNodeType; +import org.apache.inlong.manager.common.consts.SourceType; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.dao.entity.StreamSourceEntity; +import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper; +import org.apache.inlong.manager.pojo.node.cos.COSDataNodeInfo; +import org.apache.inlong.manager.pojo.source.DataAddTaskDTO; +import org.apache.inlong.manager.pojo.source.DataAddTaskRequest; +import org.apache.inlong.manager.pojo.source.SourceRequest; +import org.apache.inlong.manager.pojo.source.StreamSource; +import org.apache.inlong.manager.pojo.source.cos.COSDataAddTaskRequest; +import org.apache.inlong.manager.pojo.source.cos.COSSource; +import org.apache.inlong.manager.pojo.source.cos.COSSourceDTO; +import org.apache.inlong.manager.pojo.source.cos.COSSourceRequest; +import org.apache.inlong.manager.pojo.stream.StreamField; +import org.apache.inlong.manager.service.source.AbstractSourceOperator; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Isolation; +import org.springframework.transaction.annotation.Transactional; + +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * COS source operator, such as get or set COS source info. + */ +@Service +public class COSSourceOperator extends AbstractSourceOperator { + + private static final Logger LOGGER = LoggerFactory.getLogger(COSSourceOperator.class); + + @Autowired + private ObjectMapper objectMapper; + + @Autowired + private StreamSourceEntityMapper sourceMapper; + + @Override + public Boolean accept(String sourceType) { + return SourceType.COS.equals(sourceType); + } + + @Override + protected String getSourceType() { + return SourceType.COS; + } + + @Override + protected void setTargetEntity(SourceRequest request, StreamSourceEntity targetEntity) { + COSSourceRequest sourceRequest = (COSSourceRequest) request; + try { + CommonBeanUtils.copyProperties(sourceRequest, targetEntity, true); + COSSourceDTO dto = COSSourceDTO.getFromRequest(sourceRequest, targetEntity.getExtParams()); + targetEntity.setExtParams(objectMapper.writeValueAsString(dto)); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT, + String.format("serialize extParams of COS SourceDTO failure: %s", e.getMessage())); + } + } + + @Override + public StreamSource getFromEntity(StreamSourceEntity entity) { + COSSource source = new COSSource(); + if (entity == null) { + return source; + } + + COSSourceDTO dto = COSSourceDTO.getFromJson(entity.getExtParams()); + CommonBeanUtils.copyProperties(entity, source, true); + CommonBeanUtils.copyProperties(dto, source, true); + + List sourceFields = super.getSourceFields(entity.getId()); + source.setFieldList(sourceFields); + + List dataAddTaskList = sourceMapper.selectByTaskMapId(entity.getId()); + source.setDataAddTaskList(dataAddTaskList.stream().map(subEntity -> DataAddTaskDTO.builder() + .id(subEntity.getId()) + .taskMapId(entity.getId()) + .agentIp(subEntity.getAgentIp()) + .status(subEntity.getStatus()).build()) + .collect(Collectors.toList())); + return source; + } + + @Override + public String getExtParams(StreamSourceEntity sourceEntity) { + COSSourceDTO cosSourceDTO = COSSourceDTO.getFromJson(sourceEntity.getExtParams()); + if (Objects.nonNull(cosSourceDTO) && StringUtils.isNotBlank(sourceEntity.getDataNodeName())) { + COSDataNodeInfo dataNodeInfo = + (COSDataNodeInfo) dataNodeService.getByKeyWithoutTenant(sourceEntity.getDataNodeName(), + DataNodeType.COS); + CommonBeanUtils.copyProperties(dataNodeInfo, cosSourceDTO, true); + return JsonUtils.toJsonString(cosSourceDTO); + } + return sourceEntity.getExtParams(); + } + + @Override + @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ) + public Integer addDataAddTask(DataAddTaskRequest request, String operator) { + try { + COSDataAddTaskRequest sourceRequest = (COSDataAddTaskRequest) request; + StreamSourceEntity sourceEntity = sourceMapper.selectById(request.getSourceId()); + COSSourceDTO dto = COSSourceDTO.getFromJson(sourceEntity.getExtParams()); + dto.setDataTimeFrom(sourceRequest.getDataTimeFrom()); + dto.setDataTimeTo(sourceRequest.getDataTimeTo()); + dto.setRetry(true); + if (request.getIncreaseAuditVersion()) { + dto.setAuditVersion(request.getAuditVersion()); + } + dto.setFilterStreams(sourceRequest.getFilterStreams()); + StreamSourceEntity dataAddTaskEntity = + CommonBeanUtils.copyProperties(sourceEntity, StreamSourceEntity::new); + dataAddTaskEntity.setId(null); + dataAddTaskEntity.setSourceName( + sourceEntity.getSourceName() + "-" + request.getAuditVersion() + "-" + sourceEntity.getId()); + dataAddTaskEntity.setExtParams(objectMapper.writeValueAsString(dto)); + dataAddTaskEntity.setTaskMapId(sourceEntity.getId()); + Integer id = sourceMapper.insert(dataAddTaskEntity); + SourceRequest dataAddTaskRequest = + CommonBeanUtils.copyProperties(dataAddTaskEntity, SourceRequest::new, true); + updateAgentTaskConfig(dataAddTaskRequest, operator); + return id; + } catch (Exception e) { + LOGGER.error("serialize extParams of COS SourceDTO failure: ", e); + throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT, + String.format("serialize extParams of COS SourceDTO failure: %s", e.getMessage())); + } + } + +}