Skip to content

Commit

Permalink
[filesystem] Support Tencent COSN (#4854)
Browse files Browse the repository at this point in the history
  • Loading branch information
liujinhui1994 authored Jan 22, 2025
1 parent 79939ed commit a4de5e7
Show file tree
Hide file tree
Showing 20 changed files with 1,275 additions and 7 deletions.
13 changes: 7 additions & 6 deletions docs/content/maintenance/filesystems.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ FileSystem pluggable jars for user to query tables from Spark/Hive side.

## Supported FileSystems

| FileSystem | URI Scheme | Pluggable | Description |
|:------------------|:-----------|-----------|:-----------------------------------------------------------------------|
| Local File System | file:// | N | Built-in Support |
| HDFS | hdfs:// | N | Built-in Support, ensure that the cluster is in the hadoop environment |
| Aliyun OSS | oss:// | Y | |
| S3 | s3:// | Y | |
| FileSystem | URI Scheme | Pluggable | Description |
|:-----------------------------|:-----------|-----------|:-----------------------------------------------------------------------|
| Local File System | file:// | N | Built-in Support |
| HDFS | hdfs:// | N | Built-in Support, ensure that the cluster is in the hadoop environment |
| Aliyun OSS | oss:// | Y | |
| S3 | s3:// | Y | |
| Tencent Cloud Object Storage | cosn:// | Y | |

## Dependency

Expand Down
201 changes: 201 additions & 0 deletions paimon-filesystems/paimon-cosn-impl/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>paimon-filesystems</artifactId>
<groupId>org.apache.paimon</groupId>
<version>1.1-SNAPSHOT</version>
</parent>

<artifactId>paimon-cosn-impl</artifactId>
<name>Paimon : FileSystems : COSN : Impl</name>
<packaging>jar</packaging>

<properties>
<fs.cosn.sdk.version>3.3.5</fs.cosn.sdk.version>
<fs.cosn.api.version>5.6.139</fs.cosn.api.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-hadoop-shaded</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-common</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-cos</artifactId>
<version>${fs.cosn.sdk.version}</version>
<exclusions>
<exclusion>
<groupId>com.qcloud</groupId>
<artifactId>cos_api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.reload4j</groupId>
<artifactId>reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.qcloud</groupId>
<artifactId>cos_api</artifactId>
<version>${fs.cosn.api.version}</version>
<exclusions>
<exclusion>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<!-- Hadoop requires jaxb-api for javax.xml.bind.JAXBException -->
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
<version>${jaxb.api.version}</version>
<!-- packaged as an optional dependency that is only accessible on Java 11+ -->
<scope>provided</scope>
</dependency>
</dependencies>

<build>
<plugins>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifestEntries>
<!-- jaxb-api is packaged as an optional dependency that is only accessible on Java 11 -->
<Multi-Release>true</Multi-Release>
</manifestEntries>
</archive>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-javax-jars</id>
<phase>process-resources</phase>
<goals>
<goal>copy</goal>
</goals>
</execution>
</executions>
<configuration>
<artifactItems>
<artifactItem>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
<version>${jaxb.api.version}</version>
<type>jar</type>
<overWrite>true</overWrite>
</artifactItem>
</artifactItems>
<outputDirectory>${project.build.directory}/temporary</outputDirectory>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<id>unpack-javax-libraries</id>
<phase>process-resources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<target>
<echo message="unpacking javax jars"/>
<unzip dest="${project.build.directory}/classes/META-INF/versions/11">
<fileset dir="${project.build.directory}/temporary">
<include name="*"/>
</fileset>
</unzip>
</target>
</configuration>
</execution>
</executions>
</plugin>

<!-- Relocate all CONS related classes -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>shade-paimon</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<includes>
<include>*:*</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*</artifact>
<excludes>
<exclude>.gitkeep</exclude>
<exclude>mime.types</exclude>
<exclude>mozilla/**</exclude>
<exclude>META-INF/maven/**</exclude>
<exclude>META-INF/versions/11/META-INF/maven/**</exclude>
<exclude>META-INF/LICENSE.txt</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.cosn;

import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.options.Options;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.cosn.CosNFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

/** COSN {@link FileIO}. */
public class COSNFileIO extends HadoopCompliantFileIO {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(COSNFileIO.class);
/**
* In order to simplify, we make paimon cosn configuration keys same with hadoop cosn module.
* So, we add all configuration key with prefix `fs.cosn` in paimon conf to hadoop conf.
*/
private static final String[] CONFIG_PREFIXES = {"fs.cosn."};

private static final String COSN_USER_INFO_KEY_ID = "fs.cosn.userinfo.secretId";
private static final String COSN_USER_INFO_KEY_SECRET = "fs.cosn.userinfo.secretKey";
private static final Map<String, String> CASE_SENSITIVE_KEYS =
new HashMap<String, String>() {
{
put(COSN_USER_INFO_KEY_ID.toLowerCase(), COSN_USER_INFO_KEY_ID);
put(COSN_USER_INFO_KEY_SECRET.toLowerCase(), COSN_USER_INFO_KEY_SECRET);
}
};
/**
* Cache CosNFileSystem, at present, there is no good mechanism to ensure that the file system
* will be shut down, so here the fs cache is used to avoid resource leakage.
*/
private static final Map<CacheKey, CosNFileSystem> CACHE = new ConcurrentHashMap<>();

private Options hadoopOptions;

@Override
public boolean isObjectStore() {
return true;
}

@Override
public void configure(CatalogContext context) {
hadoopOptions = new Options();
// read all configuration with prefix 'CONFIG_PREFIXES'
for (String key : context.options().keySet()) {
for (String prefix : CONFIG_PREFIXES) {
if (key.startsWith(prefix)) {
String value = context.options().get(key);
if (CASE_SENSITIVE_KEYS.containsKey(key.toLowerCase())) {
key = CASE_SENSITIVE_KEYS.get(key.toLowerCase());
}
hadoopOptions.set(key, value);
LOG.warn(
"Adding config entry for {} as {} to Hadoop config",
key,
hadoopOptions.get(key));
}
}
}
}

@Override
protected FileSystem createFileSystem(org.apache.hadoop.fs.Path path) {
final String scheme = path.toUri().getScheme();
final String authority = path.toUri().getAuthority();
return CACHE.computeIfAbsent(
new CacheKey(hadoopOptions, scheme, authority),
key -> {
Configuration hadoopConf = new Configuration();
key.options.toMap().forEach(hadoopConf::set);
URI fsUri = path.toUri();
if (scheme == null && authority == null) {
fsUri = FileSystem.getDefaultUri(hadoopConf);
} else if (scheme != null && authority == null) {
URI defaultUri = FileSystem.getDefaultUri(hadoopConf);
if (scheme.equals(defaultUri.getScheme())
&& defaultUri.getAuthority() != null) {
fsUri = defaultUri;
}
}
CosNFileSystem fs = new CosNFileSystem();
try {
fs.initialize(fsUri, hadoopConf);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return fs;
});
}

private static class CacheKey {
private final Options options;
private final String scheme;
private final String authority;

private CacheKey(Options options, String scheme, String authority) {
this.options = options;
this.scheme = scheme;
this.authority = authority;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CacheKey cacheKey = (CacheKey) o;
return Objects.equals(options, cacheKey.options)
&& Objects.equals(scheme, cacheKey.scheme)
&& Objects.equals(authority, cacheKey.authority);
}

@Override
public int hashCode() {
return Objects.hash(options, scheme, authority);
}
}
}
Loading

0 comments on commit a4de5e7

Please sign in to comment.