Skip to content

Commit

Permalink
[INLONG-11618][Manager] Support COS stream source (#11619)
Browse files Browse the repository at this point in the history
  • Loading branch information
fuweng11 authored Dec 23, 2024
1 parent 6816f9f commit 4d3cc0a
Show file tree
Hide file tree
Showing 16 changed files with 801 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class DataNodeType {
public static final String CLICKHOUSE = "CLICKHOUSE";
public static final String ELASTICSEARCH = "ELASTICSEARCH";
public static final String MYSQL = "MYSQL";
public static final String COS = "COS";
public static final String STARROCKS = "STARROCKS";
public static final String REDIS = "REDIS";
public static final String KUDU = "KUDU";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class SourceType extends StreamType {
public static final String TUBEMQ = "TUBEMQ";

public static final String FILE = "FILE";
public static final String COS = "COS";
public static final String MYSQL_SQL = "MYSQL_SQL";
public static final String MYSQL_BINLOG = "MYSQL_BINLOG";
public static final String MONGODB = "MONGODB";
Expand All @@ -47,6 +48,7 @@ public class SourceType extends StreamType {
put(KAFKA, TaskTypeEnum.KAFKA);

put(FILE, TaskTypeEnum.FILE);
put(COS, TaskTypeEnum.COS);
put(MYSQL_SQL, TaskTypeEnum.SQL);
put(MYSQL_BINLOG, TaskTypeEnum.BINLOG);
put(POSTGRESQL, TaskTypeEnum.POSTGRES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,7 @@ int copy(@Param("name") String name, @Param("type") String type, @Param("sourceT
@MultiTenantQuery(with = false)
List<DataNodeEntity> selectByIdSelective(DataNodeEntity record);

@MultiTenantQuery(with = false)
DataNodeEntity selectByUniqueKeyWithoutTenant(@Param("name") String name, @Param("type") String type);

}
Original file line number Diff line number Diff line change
Expand Up @@ -263,4 +263,14 @@
and tenant = #{sourceTenant, jdbcType=VARCHAR}
</where>
</insert>
<select id="selectByUniqueKeyWithoutTenant" resultType="org.apache.inlong.manager.dao.entity.DataNodeEntity">
select
<include refid="Base_Column_List"/>
from data_node
<where>
name = #{name, jdbcType=VARCHAR}
and type = #{type, jdbcType=VARCHAR}
and is_deleted = 0
</where>
</select>
</mapper>
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.node.cos;

import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.StringUtils;

import javax.validation.constraints.NotNull;

/**
* COS data node info
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@ApiModel("COS data node info")
public class COSDataNodeDTO {

@ApiModelProperty(value = "COS bucket name")
private String bucketName;

@ApiModelProperty(value = "COS secret id")
private String credentialsId;

@ApiModelProperty(value = "COS secret key")
private String credentialsKey;

@ApiModelProperty(value = "COS region")
private String region;

/**
* Get the dto instance from the request
*/
public static COSDataNodeDTO getFromRequest(COSDataNodeRequest request, String extParams) {
COSDataNodeDTO dto = StringUtils.isNotBlank(extParams)
? COSDataNodeDTO.getFromJson(extParams)
: new COSDataNodeDTO();
return CommonBeanUtils.copyProperties(request, dto, true);
}

/**
* Get the dto instance from the JSON string.
*/
public static COSDataNodeDTO getFromJson(@NotNull String extParams) {
try {
return JsonUtils.parseObject(extParams, COSDataNodeDTO.class);
} catch (Exception e) {
throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT,
String.format("Failed to parse extParams for COS node: %s", e.getMessage()));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.node.cos;

import org.apache.inlong.manager.common.consts.DataNodeType;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

/**
* COS data node info
*/
@Data
@SuperBuilder
@AllArgsConstructor
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@JsonTypeDefine(value = DataNodeType.COS)
@ApiModel("COS data node info")
public class COSDataNodeInfo extends DataNodeInfo {

@ApiModelProperty(value = "COS bucket name")
private String bucketName;

@ApiModelProperty(value = "COS secret id")
private String credentialsId;

@ApiModelProperty(value = "COS secret key")
private String credentialsKey;

@ApiModelProperty(value = "COS region")
private String region;

public COSDataNodeInfo() {
this.setType(DataNodeType.COS);
}

@Override
public COSDataNodeRequest genRequest() {
return CommonBeanUtils.copyProperties(this, COSDataNodeRequest::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.node.cos;

import org.apache.inlong.manager.common.consts.DataNodeType;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.node.DataNodeRequest;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;

/**
* COS data node request
*/
@Data
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@JsonTypeDefine(value = DataNodeType.COS)
@ApiModel("COS data node request")
public class COSDataNodeRequest extends DataNodeRequest {

@ApiModelProperty(value = "COS bucket name")
private String bucketName;

@ApiModelProperty(value = "COS secret id")
private String credentialsId;

@ApiModelProperty(value = "COS secret key")
private String credentialsKey;

@ApiModelProperty(value = "COS region")
private String region;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.source.cos;

import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.source.DataAddTaskRequest;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;

import java.util.List;

@Data
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@JsonTypeDefine(value = SourceType.COS)
@ApiModel(value = "COS data add task request")
public class COSDataAddTaskRequest extends DataAddTaskRequest {

@ApiModelProperty("filterStreams")
private List<String> filterStreams;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.source.cos;

import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.source.SourceRequest;
import org.apache.inlong.manager.pojo.source.StreamSource;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

import java.util.List;

/**
* COS source info
*/
@Data
@SuperBuilder
@AllArgsConstructor
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@ApiModel(value = "COS source info")
@JsonTypeDefine(value = SourceType.COS)
public class COSSource extends StreamSource {

@ApiModelProperty(value = "Path regex pattern for file, such as /a/b/*.txt", required = true)
private String pattern;

@ApiModelProperty("Cycle unit")
private String cycleUnit;

@ApiModelProperty("Whether retry")
private Boolean retry;

@ApiModelProperty(value = "Data start time")
private String dataTimeFrom;

@ApiModelProperty(value = "Data end time")
private String dataTimeTo;

@ApiModelProperty("TimeOffset for collection, "
+ "'1m' means from one minute after, '-1m' means from one minute before, "
+ "'1h' means from one hour after, '-1h' means from one minute before, "
+ "'1d' means from one day after, '-1d' means from one minute before, "
+ "Null or blank means from current timestamp")
private String timeOffset;

@ApiModelProperty("Max file count")
private String maxFileCount;

@ApiModelProperty(" Type of data result for column separator"
+ " CSV format, set this parameter to a custom separator: , | : "
+ " Json format, set this parameter to json ")
private String contentStyle;

@ApiModelProperty("filterStreams")
private List<String> filterStreams;

public COSSource() {
this.setSourceType(SourceType.COS);
}

@Override
public SourceRequest genSourceRequest() {
return CommonBeanUtils.copyProperties(this, COSSourceRequest::new);
}

}
Loading

0 comments on commit 4d3cc0a

Please sign in to comment.