Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#6078] feat(core): Support model event to Gravitino server #6129

Open
wants to merge 40 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
4b5e95a
[#6078] feat(core): Support model event to Gravitino server
Abyss-lord Jan 7, 2025
9098335
[#6078] feat(core): Support model event to Gravitino server
Abyss-lord Jan 7, 2025
cd86b6a
[#6078] feat(core): Support model event to Gravitino server
Abyss-lord Jan 9, 2025
e0b0514
[#6166] web(ui): load tree load data after refreshing the version det…
LauraXia123 Jan 9, 2025
b6c29e0
[#6143] improve(CLI): Refactor catalog commands in Gavitino CLI (#6159)
Abyss-lord Jan 9, 2025
73f1e01
[#6145] improve(CLI): Refactor table commands in Gavitino CLI (#6161)
Abyss-lord Jan 9, 2025
3b24ee2
[#6148] improve(CLI): Refactor model commands in Gavitino CLI (#6162)
Abyss-lord Jan 9, 2025
2f81bdc
[#5973] feat(hadoop-catalog): Support credential when using fileset c…
yuqi1129 Jan 10, 2025
7838acc
[#6173] fix Trino license and notice files (#6173)
justinmclean Jan 10, 2025
0d6864b
[Minor] Update year in NOTICE files (#6171)
justinmclean Jan 10, 2025
62bcbc2
build(deps): bump nanoid from 3.3.7 to 3.3.8 in /web/web (#6176)
dependabot[bot] Jan 10, 2025
87cfdf9
[#6146] improve(CLI): Refactor topic commands in Gavitino CLI (#6174)
Abyss-lord Jan 10, 2025
f26fc79
[#6151] improve(CLI): Refactor group commands in Gavitino CLI (#6175)
Abyss-lord Jan 10, 2025
ed5e031
[#6165] feat(core): Use Gravitino cloud jar without hadoop packages …
FANNG1 Jan 10, 2025
fce6429
[#6054] feat(core): add more GCS permission to support fileset oper…
FANNG1 Jan 10, 2025
7be9c46
[#6110] doc(authz): Add document for chain authorization plugin (#6115)
jerqi Jan 10, 2025
be78740
[#6144] improve(CLI): Refactor schema commands in Gravitino CLI (#6178)
luoshipeng Jan 10, 2025
2460f6a
[#6184]improve(core): Remove the protobuf dependency (#6185)
jerryshao Jan 10, 2025
6e87ccc
[#6153]refactor: Break up role commands in Gravitino CLI (#6170)
cool9850311 Jan 10, 2025
dc3444d
Merge branch 'main' into model-event
Abyss-lord Jan 11, 2025
815d72a
[#6177] improve(CLI): Refactor ownership commands in Gravitino CLI (#…
Abyss-lord Jan 12, 2025
effb35f
[#6149] improve(CLI): Refactor column commands in Gavitino CLI (#6190)
TungYuChiang Jan 12, 2025
2180b35
[#6150] improve(CLI): Refactor user commands in Gavitino CLI (#6193)
antony0016 Jan 12, 2025
ccc1f67
[MINOR] bump version to 0.9.0-incubating-snapshot (#6094)
FANNG1 Jan 13, 2025
09d48b1
[#6194] Add python client license and notice file (#6195)
justinmclean Jan 13, 2025
e7f2450
[#6069] fix(docs): Fix access-control.md (#6189)
Abyss-lord Jan 13, 2025
7b01887
[#5533] fix (trino-connector): Fix the exception of ArrayIndexOutOfBo…
diqiu50 Jan 13, 2025
507da37
[Minor] Update command usage and add usage tracker in Gravitino CLI (…
justinmclean Jan 13, 2025
cfcdb83
[doc] Revise the glossary documentation (#5837)
tengqm Jan 13, 2025
b0c8321
[#6078] feat(core): Support model event to Gravitino server
Abyss-lord Jan 14, 2025
aee42c0
[#6200] improvement(docs): Add Docker image details for 0.8.0 (#6202)
jerqi Jan 13, 2025
c9ceec7
[#5545] fix(doris-catalog): Fix the problem that we can't set Doris t…
yuqi1129 Jan 13, 2025
1ef596e
[#5721] improvement(mysql-catalog): add column not null limitation in…
mchades Jan 13, 2025
482b222
[#5100] improvement(docs): Add extra documents to clarify the engine …
yuqi1129 Jan 13, 2025
ca97340
[#5192] [#5193] feat(flink): Support Catalog&Schema Operation DDL for…
hdygxsj Jan 13, 2025
181acd4
[#6139] Refactor metalake command in Gravitino CLI (#6140)
justinmclean Jan 13, 2025
b36bd8d
[#6147] improve(CLI): Refactor fileset commands in Gavitino CLI  (#6191)
TungYuChiang Jan 13, 2025
8fb7756
[#6152] refactor: Refactor tag commands in Gravitino CLI (#6192)
frankvicky Jan 14, 2025
e130f53
Merge branch 'main' into model-event
Abyss-lord Jan 14, 2025
f737606
[#6220] improve(CLI): Clean up GravitinoCommandLine class now it bee…
Abyss-lord Jan 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.oss.fs;

import com.aliyun.oss.common.auth.BasicCredentials;
import com.aliyun.oss.common.auth.Credentials;
import com.aliyun.oss.common.auth.CredentialsProvider;
import com.aliyun.oss.common.auth.DefaultCredentials;
import java.net.URI;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
import org.apache.gravitino.catalog.hadoop.fs.GravitinoFileSystemCredentialsProvider;
import org.apache.gravitino.credential.Credential;
import org.apache.gravitino.credential.OSSSecretKeyCredential;
import org.apache.gravitino.credential.OSSTokenCredential;
import org.apache.hadoop.conf.Configuration;

public class OSSCredentialsProvider implements CredentialsProvider {

private GravitinoFileSystemCredentialsProvider gravitinoFileSystemCredentialsProvider;
private Credentials basicCredentials;
private long expirationTime = Long.MAX_VALUE;
private static final double EXPIRATION_TIME_FACTOR = 0.5D;

public OSSCredentialsProvider(URI uri, Configuration conf) {
this.gravitinoFileSystemCredentialsProvider = FileSystemUtils.getGvfsCredentialProvider(conf);
}

@Override
public void setCredentials(Credentials credentials) {}

@Override
public Credentials getCredentials() {
if (basicCredentials == null || System.currentTimeMillis() >= expirationTime) {
synchronized (this) {
refresh();
}
}

return basicCredentials;
}

private void refresh() {
Credential[] gravitinoCredentials = gravitinoFileSystemCredentialsProvider.getCredentials();
Credential credential = OSSUtils.getSuitableCredential(gravitinoCredentials);
if (credential == null) {
throw new RuntimeException("No suitable credential for OSS found...");
}

if (credential instanceof OSSSecretKeyCredential) {
OSSSecretKeyCredential ossSecretKeyCredential = (OSSSecretKeyCredential) credential;
basicCredentials =
new DefaultCredentials(
ossSecretKeyCredential.accessKeyId(), ossSecretKeyCredential.secretAccessKey());
} else if (credential instanceof OSSTokenCredential) {
OSSTokenCredential ossTokenCredential = (OSSTokenCredential) credential;
basicCredentials =
new BasicCredentials(
ossTokenCredential.accessKeyId(),
ossTokenCredential.secretAccessKey(),
ossTokenCredential.securityToken());
}

if (credential.expireTimeInMs() > 0) {
expirationTime =
System.currentTimeMillis()
+ (long)
((credential.expireTimeInMs() - System.currentTimeMillis())
* EXPIRATION_TIME_FACTOR);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,23 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.Map;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
import org.apache.gravitino.catalog.hadoop.fs.SupportsCredentialVending;
import org.apache.gravitino.credential.Credential;
import org.apache.gravitino.credential.OSSSecretKeyCredential;
import org.apache.gravitino.credential.OSSTokenCredential;
import org.apache.gravitino.storage.OSSProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem;
import org.apache.hadoop.fs.aliyun.oss.Constants;

public class OSSFileSystemProvider implements FileSystemProvider {
public class OSSFileSystemProvider implements FileSystemProvider, SupportsCredentialVending {

private static final String OSS_FILESYSTEM_IMPL = "fs.oss.impl";

Expand Down Expand Up @@ -61,9 +66,22 @@ public FileSystem getFileSystem(Path path, Map<String, String> config) throws IO
}

hadoopConfMap.forEach(configuration::set);

return AliyunOSSFileSystem.newInstance(path.toUri(), configuration);
}

@Override
public Map<String, String> getFileSystemCredentialConf(Credential[] credentials) {
Credential credential = OSSUtils.getSuitableCredential(credentials);
Map<String, String> result = Maps.newHashMap();
if (credential instanceof OSSSecretKeyCredential || credential instanceof OSSTokenCredential) {
result.put(
Constants.CREDENTIALS_PROVIDER_KEY, OSSCredentialsProvider.class.getCanonicalName());
}

return result;
}

@Override
public String scheme() {
return "oss";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.oss.fs;

import org.apache.gravitino.credential.Credential;
import org.apache.gravitino.credential.OSSSecretKeyCredential;
import org.apache.gravitino.credential.OSSTokenCredential;

public class OSSUtils {

/**
* Get the credential from the credential array. Using dynamic credential first, if not found,
* uses static credential.
*
* @param credentials The credential array.
* @return A credential. Null if not found.
*/
static Credential getSuitableCredential(Credential[] credentials) {
// Use dynamic credential if found.
for (Credential credential : credentials) {
if (credential instanceof OSSTokenCredential) {
return credential;
}
}

// If dynamic credential not found, use the static one
for (Credential credential : credentials) {
if (credential instanceof OSSSecretKeyCredential) {
return credential;
}
}

return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.s3.fs;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.BasicSessionCredentials;
import java.net.URI;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
import org.apache.gravitino.catalog.hadoop.fs.GravitinoFileSystemCredentialsProvider;
import org.apache.gravitino.credential.Credential;
import org.apache.gravitino.credential.S3SecretKeyCredential;
import org.apache.gravitino.credential.S3TokenCredential;
import org.apache.hadoop.conf.Configuration;

public class S3CredentialsProvider implements AWSCredentialsProvider {
private GravitinoFileSystemCredentialsProvider gravitinoFileSystemCredentialsProvider;

private AWSCredentials basicSessionCredentials;
private long expirationTime = Long.MAX_VALUE;
private static final double EXPIRATION_TIME_FACTOR = 0.5D;

public S3CredentialsProvider(final URI uri, final Configuration conf) {
this.gravitinoFileSystemCredentialsProvider = FileSystemUtils.getGvfsCredentialProvider(conf);
}

@Override
public AWSCredentials getCredentials() {
// Refresh credentials if they are null or about to expire.
if (basicSessionCredentials == null || System.currentTimeMillis() >= expirationTime) {
synchronized (this) {
refresh();
}
}

return basicSessionCredentials;
}

@Override
public void refresh() {
Credential[] gravitinoCredentials = gravitinoFileSystemCredentialsProvider.getCredentials();
Credential credential = S3Utils.getSuitableCredential(gravitinoCredentials);

if (credential == null) {
throw new RuntimeException("No suitable credential for S3 found...");
}

if (credential instanceof S3SecretKeyCredential) {
S3SecretKeyCredential s3SecretKeyCredential = (S3SecretKeyCredential) credential;
basicSessionCredentials =
new BasicAWSCredentials(
s3SecretKeyCredential.accessKeyId(), s3SecretKeyCredential.secretAccessKey());
} else if (credential instanceof S3TokenCredential) {
S3TokenCredential s3TokenCredential = (S3TokenCredential) credential;
basicSessionCredentials =
new BasicSessionCredentials(
s3TokenCredential.accessKeyId(),
s3TokenCredential.secretAccessKey(),
s3TokenCredential.sessionToken());
}

if (credential.expireTimeInMs() > 0) {
expirationTime =
System.currentTimeMillis()
+ (long)
((credential.expireTimeInMs() - System.currentTimeMillis())
* EXPIRATION_TIME_FACTOR);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,16 @@
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
import org.apache.gravitino.catalog.hadoop.fs.SupportsCredentialVending;
import org.apache.gravitino.credential.Credential;
import org.apache.gravitino.credential.S3SecretKeyCredential;
import org.apache.gravitino.credential.S3TokenCredential;
import org.apache.gravitino.storage.S3Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -39,9 +44,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class S3FileSystemProvider implements FileSystemProvider {
public class S3FileSystemProvider implements FileSystemProvider, SupportsCredentialVending {

private static final Logger LOGGER = LoggerFactory.getLogger(S3FileSystemProvider.class);
private static final Logger LOG = LoggerFactory.getLogger(S3FileSystemProvider.class);

@VisibleForTesting
public static final Map<String, String> GRAVITINO_KEY_TO_S3_HADOOP_KEY =
Expand All @@ -61,18 +66,29 @@ public FileSystem getFileSystem(Path path, Map<String, String> config) throws IO
Map<String, String> hadoopConfMap =
FileSystemUtils.toHadoopConfigMap(config, GRAVITINO_KEY_TO_S3_HADOOP_KEY);

hadoopConfMap.forEach(configuration::set);
if (!hadoopConfMap.containsKey(S3_CREDENTIAL_KEY)) {
hadoopConfMap.put(S3_CREDENTIAL_KEY, S3_SIMPLE_CREDENTIAL);
configuration.set(S3_CREDENTIAL_KEY, S3_SIMPLE_CREDENTIAL);
}

hadoopConfMap.forEach(configuration::set);

// Hadoop-aws 2 does not support IAMInstanceCredentialsProvider
checkAndSetCredentialProvider(configuration);

return S3AFileSystem.newInstance(path.toUri(), configuration);
}

@Override
public Map<String, String> getFileSystemCredentialConf(Credential[] credentials) {
Credential credential = S3Utils.getSuitableCredential(credentials);
Map<String, String> result = Maps.newHashMap();
if (credential instanceof S3SecretKeyCredential || credential instanceof S3TokenCredential) {
result.put(
Constants.AWS_CREDENTIALS_PROVIDER, S3CredentialsProvider.class.getCanonicalName());
}

return result;
}

private void checkAndSetCredentialProvider(Configuration configuration) {
String provides = configuration.get(S3_CREDENTIAL_KEY);
if (provides == null) {
Expand All @@ -91,12 +107,12 @@ private void checkAndSetCredentialProvider(Configuration configuration) {
if (AWSCredentialsProvider.class.isAssignableFrom(c)) {
validProviders.add(provider);
} else {
LOGGER.warn(
LOG.warn(
"Credential provider {} is not a subclass of AWSCredentialsProvider, skipping",
provider);
}
} catch (Exception e) {
LOGGER.warn(
LOG.warn(
"Credential provider {} not found in the Hadoop runtime, falling back to default",
provider);
configuration.set(S3_CREDENTIAL_KEY, S3_SIMPLE_CREDENTIAL);
Expand Down
Loading
Loading