Skip to content

Commit 2ec59f7

Browse files
authored
[INLONG-4099][Manager] Support create Iceberg resource (#4192)
1 parent cb3810e commit 2ec59f7

File tree

16 files changed

+2108
-1
lines changed

16 files changed

+2108
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.inlong.manager.common.enums;
19+
20+
import org.apache.inlong.manager.common.util.Preconditions;
21+
22+
import java.util.Locale;
23+
24+
/**
25+
* Iceberg partition type
26+
*/
27+
public enum IcebergPartition {
28+
IDENTITY,
29+
BUCKET,
30+
TRUNCATE,
31+
YEAR,
32+
MONTH,
33+
DAY,
34+
HOUR,
35+
;
36+
37+
/**
38+
* Get partition type from name
39+
*/
40+
public static IcebergPartition forName(String name) {
41+
Preconditions.checkNotNull(name, "IcebergPartition should not be null");
42+
for (IcebergPartition value : values()) {
43+
if (value.toString().equals(name) || value.toString().equals(name.toUpperCase(Locale.ROOT))) {
44+
return value;
45+
}
46+
}
47+
throw new IllegalArgumentException(String.format("Unsupported IcebergPartition : %s", name));
48+
}
49+
50+
@Override
51+
public String toString() {
52+
return name();
53+
}
54+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.inlong.manager.common.enums;
19+
20+
import lombok.Getter;
21+
22+
/**
23+
* Iceberg data type
24+
*/
25+
public enum IcebergType {
26+
27+
BOOLEAN("boolean"),
28+
INT("int"),
29+
LONG("long"),
30+
FLOAT("float"),
31+
DOUBLE("double"),
32+
DECIMAL("decimal(P,S)"),
33+
DATE("date"),
34+
TIME("time"),
35+
TIMESTAMP("timestamp"),
36+
TIMESTAMPTZ("timestamptz"),
37+
STRING("string"),
38+
UUID("uuid"),
39+
FIXED("fixed(L)"),
40+
BINARY("binary");
41+
42+
@Getter
43+
private String type;
44+
45+
IcebergType(String type) {
46+
this.type = type;
47+
}
48+
49+
/**
50+
* Get type from name
51+
*/
52+
public static IcebergType forType(String type) {
53+
for (IcebergType ibType : values()) {
54+
if (ibType.getType().equalsIgnoreCase(type)) {
55+
return ibType;
56+
}
57+
}
58+
throw new IllegalArgumentException(String.format("invalid iceberg type = %s", type));
59+
}
60+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.inlong.manager.common.pojo.sink.iceberg;
19+
20+
import lombok.Data;
21+
22+
/**
23+
* Iceberg column info
24+
*/
25+
@Data
26+
public class IcebergColumnInfo {
27+
28+
private String name;
29+
private String type;
30+
private String desc;
31+
private boolean required;
32+
private Integer length;
33+
private String partitionStrategy;
34+
private Integer precision;
35+
private Integer scale;
36+
private Integer bucketNum;
37+
private Integer width;
38+
}

inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkDTO.java

+14
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.inlong.manager.common.exceptions.BusinessException;
2929

3030
import javax.validation.constraints.NotNull;
31+
import java.util.List;
3132
import java.util.Map;
3233

3334
/**
@@ -102,4 +103,17 @@ public static IcebergSinkDTO getFromJson(@NotNull String extParams) {
102103
}
103104
}
104105

106+
/**
107+
* Get Iceberg table info
108+
*/
109+
public static IcebergTableInfo getIcebergTableInfo(IcebergSinkDTO icebergInfo, List<IcebergColumnInfo> columnList) {
110+
IcebergTableInfo info = new IcebergTableInfo();
111+
info.setDbName(icebergInfo.getDbName());
112+
info.setTableName(icebergInfo.getTableName());
113+
info.setFileFormat(icebergInfo.getFileFormat());
114+
info.setTblProperties(icebergInfo.getProperties());
115+
info.setColumns(columnList);
116+
return info;
117+
}
118+
105119
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.inlong.manager.common.pojo.sink.iceberg;
19+
20+
import lombok.Data;
21+
22+
import java.util.List;
23+
import java.util.Map;
24+
25+
/**
26+
* Iceberg table info
27+
*/
28+
@Data
29+
public class IcebergTableInfo {
30+
31+
private String dbName;
32+
private String tableName;
33+
private String tableDesc;
34+
private String fileFormat;
35+
private Map<String, Object> tblProperties;
36+
private List<IcebergColumnInfo> columns;
37+
}

inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java

+2
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ public class StreamSinkFieldEntity implements Serializable {
4545
private Integer fieldPrecision;
4646
private Integer fieldScale;
4747
private String partitionStrategy;
48+
private Integer bucketNum;
49+
private Integer width;
4850

4951
private Integer isMetaField;
5052
private String fieldFormat;

inlong-manager/manager-service/pom.xml

+49
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,55 @@
167167
<groupId>org.apache.kafka</groupId>
168168
<artifactId>kafka-clients</artifactId>
169169
</dependency>
170+
171+
<dependency>
172+
<groupId>org.apache.iceberg</groupId>
173+
<artifactId>iceberg-api</artifactId>
174+
</dependency>
175+
<dependency>
176+
<groupId>org.apache.iceberg</groupId>
177+
<artifactId>iceberg-hive-metastore</artifactId>
178+
</dependency>
179+
<dependency>
180+
<groupId>org.apache.iceberg</groupId>
181+
<artifactId>iceberg-core</artifactId>
182+
</dependency>
183+
<dependency>
184+
<groupId>org.apache.hive</groupId>
185+
<artifactId>hive-metastore</artifactId>
186+
</dependency>
187+
188+
<dependency>
189+
<groupId>org.apache.hadoop</groupId>
190+
<artifactId>hadoop-hdfs</artifactId>
191+
<exclusions>
192+
<exclusion>
193+
<artifactId>servlet-api</artifactId>
194+
<groupId>javax.servlet</groupId>
195+
</exclusion>
196+
</exclusions>
197+
</dependency>
198+
<dependency>
199+
<groupId>org.apache.hadoop</groupId>
200+
<artifactId>hadoop-common</artifactId>
201+
<exclusions>
202+
<exclusion>
203+
<artifactId>servlet-api</artifactId>
204+
<groupId>javax.servlet</groupId>
205+
</exclusion>
206+
</exclusions>
207+
</dependency>
208+
<dependency>
209+
<groupId>org.apache.hadoop</groupId>
210+
<artifactId>hadoop-mapreduce-client-core</artifactId>
211+
<exclusions>
212+
<exclusion>
213+
<artifactId>hadoop-yarn-common</artifactId>
214+
<groupId>org.apache.hadoop</groupId>
215+
</exclusion>
216+
</exclusions>
217+
</dependency>
218+
170219
</dependencies>
171220

172221
</project>

0 commit comments

Comments
 (0)