Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add database method support file system client #420

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions hetu-filesystem-client/mysql_ddl.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
drop table `olk_fs_catalog` if exists;

CREATE TABLE `olk_fs_catalog`
(
`catalog_name` varchar(256) NOT NULL COMMENT '目录名称',
`metadata` text COMMENT '元数据',
`properties` text COMMENT '配置信息',
`create_time` datetime DEFAULT NULL COMMENT '配置信息',
PRIMARY KEY (`catalog_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
19 changes: 19 additions & 0 deletions hetu-filesystem-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,19 @@
<groupId>io.airlift</groupId>
<artifactId>log</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.2.15</version>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
Expand All @@ -57,6 +70,12 @@
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,12 @@ public Path getRoot()
public void validate(Path path)
throws AccessDeniedException
{
if (!path.toAbsolutePath().startsWith(root)) {
throw new AccessDeniedException(String.format("%s is not in workspace %s. Access has been denied.", path, root));
String os = System.getProperty("os.name");
//Windows操作系统不校验
if (os == null || !os.toLowerCase().startsWith("windows")) {
if (!path.toAbsolutePath().startsWith(root)) {
throw new AccessDeniedException(String.format("%s is not in workspace %s. Access has been denied.", path, root));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package io.hetu.core.filesystem;

import com.google.common.collect.ImmutableList;
import io.hetu.core.filesystem.db.DbFileSystemClientFactory;
import io.prestosql.spi.Plugin;
import io.prestosql.spi.filesystem.HetuFileSystemClientFactory;

Expand All @@ -29,6 +30,6 @@ public class HetuFileSystemClientPlugin
@Override
public Iterable<HetuFileSystemClientFactory> getFileSystemClientFactory()
{
return ImmutableList.of(new LocalFileSystemClientFactory(), new HdfsFileSystemClientFactory());
return ImmutableList.of(new LocalFileSystemClientFactory(), new HdfsFileSystemClientFactory(), new DbFileSystemClientFactory());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package io.hetu.core.filesystem.db;

import java.nio.file.Path;

class DbCatalog {
private String catalogName;

private String metadata;

private Path metadataDirPath;

private String properties;

private Path propertiesPath;

public DbCatalog() {
}

public DbCatalog(String catalogName, String metadata, String properties) {
this.catalogName = catalogName;
this.metadata = metadata;
this.properties = properties;
}

public DbCatalog(String catalogName, String metadata, Path metadataDirPath, String properties, Path propertiesPath) {
this.catalogName = catalogName;
this.metadata = metadata;
this.metadataDirPath = metadataDirPath;
this.properties = properties;
this.propertiesPath = propertiesPath;
}

public String getCatalogName() {
return catalogName;
}

public void setCatalogName(String catalogName) {
this.catalogName = catalogName;
}

public String getMetadata() {
return metadata;
}

public void setMetadata(String metadata) {
this.metadata = metadata;
}

public Path getMetadataDirPath() {
return metadataDirPath;
}

public void setMetadataDirPath(Path metadataDirPath) {
this.metadataDirPath = metadataDirPath;
}

public String getProperties() {
return properties;
}

public void setProperties(String properties) {
this.properties = properties;
}

public Path getPropertiesPath() {
return propertiesPath;
}

public void setPropertiesPath(Path propertiesPath) {
this.propertiesPath = propertiesPath;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (C) 2018-2020. Huawei Technologies Co., Ltd. All rights reserved.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.hetu.core.filesystem.db;

import io.airlift.log.Logger;

import java.util.Properties;

/**
* 动态目录数据库配置
*/
public class DbConfig {

private static final Logger LOG = Logger.get(DbConfig.class);

private static final String FS_DB_URL = "fs.db.url";

private static final String FS_DB_DRIVER = "fs.db.driver";

private static final String FS_DB_USERNAME = "fs.db.username";

private static final String FS_DB_PASSWORD = "fs.db.password";

private static final String FS_DB_INITIAL_SIZE = "fs.db.initialSize";

private static final String FS_DB_MIN_IDLE = "fs.db.minIdle";

private static final String FS_DB_MAX_ACTIVE = "fs.db.maxActive";

private static final String FS_DB_MAX_WAIT = "fs.db.maxWait";

private Properties dbProperties;

public Properties getDbProperties() {
return dbProperties;
}

public DbConfig(Properties properties) {
generateDbConfig(properties);
}

private void generateDbConfig(Properties properties) {
try {
dbProperties = new Properties();
dbProperties.setProperty("url", properties.getProperty(FS_DB_URL));
dbProperties.setProperty("driverClassName", properties.getProperty(FS_DB_DRIVER));
dbProperties.setProperty("username", properties.getProperty(FS_DB_USERNAME));
dbProperties.setProperty("password", properties.getProperty(FS_DB_PASSWORD));
dbProperties.setProperty("initialSize", configDefault(properties, FS_DB_INITIAL_SIZE, "1"));
dbProperties.setProperty("minIdle", configDefault(properties, FS_DB_MIN_IDLE, "1"));
dbProperties.setProperty("maxActive", configDefault(properties, FS_DB_MAX_ACTIVE, "1"));
dbProperties.setProperty("maxWait", configDefault(properties, FS_DB_MAX_WAIT, "1"));
} catch (Exception e) {
LOG.error("获取动态目录数据库配置异常, 原因: {}", e.getMessage(), e);
throw new IllegalArgumentException("获取动态目录数据库配置异常, 原因: " + e.getMessage());
}
}

private String configDefault(Properties properties, String key, String defaultValue) {
Object o = properties.get(key);
return o != null && o != "" ? o.toString() : defaultValue;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package io.hetu.core.filesystem.db;

import javax.sql.DataSource;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;

import static io.hetu.core.filesystem.db.HetuDbFileSystemClient.readString;

public class DbFileOutputStream extends OutputStream {

private DataSource dataSource;

private List<String> catalogBaseDir;

private Path path;

private OutputStream out;

public DbFileOutputStream(DataSource dataSource, List<String> catalogBaseDir, Path path, OutputStream out) {
this.dataSource = dataSource;
this.catalogBaseDir = catalogBaseDir;
this.path = path;
this.out = out;
}

@Override
public void write(int b) throws IOException {
out.write(b);
}

@Override
public void write(byte[] b) throws IOException {
out.write(b);
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
out.write(b, off, len);
}

@Override
public void flush() throws IOException {
out.flush();
}

@Override
public void close() throws IOException {
out.close();
saveDbCatalog(path);
}

/**
* 保存数据库连接器
* @param path
* @throws Exception
*/
private void saveDbCatalog(Path path) throws IOException {
String absolutePath = path.toFile().getParentFile().getAbsolutePath();
if (!catalogBaseDir.isEmpty() && catalogBaseDir.stream().filter(absolutePath::contains).count() > 0) {
String fileName = path.toFile().getName();
String catalogName = null;
String metatdata = null;
String properties = null;
if (fileName.endsWith(".properties") && Files.exists(path.toAbsolutePath())) {
catalogName = fileName.substring(0, fileName.lastIndexOf(".properties"));
properties = readString(Files.newInputStream(path.toAbsolutePath()));
}
if (fileName.endsWith(".metadata") && Files.exists(path.toAbsolutePath())) {
catalogName = fileName.substring(0, fileName.lastIndexOf(".metadata"));
metatdata = readString(Files.newInputStream(path.toAbsolutePath()));
}
if (catalogName != null) {
DbCatalog dbCatalog = DbUtils.selectOne(dataSource, catalogName);
if (dbCatalog != null) {
if (metatdata != null) {
dbCatalog.setMetadata(metatdata);
}
if (properties != null) {
dbCatalog.setProperties(properties);
}
DbUtils.updateByCatalogName(dataSource, dbCatalog.getCatalogName(), dbCatalog.getMetadata(), dbCatalog.getProperties());
} else {
DbUtils.insert(dataSource, catalogName, metatdata, properties);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (C) 2018-2020. Huawei Technologies Co., Ltd. All rights reserved.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.hetu.core.filesystem.db;

import io.hetu.core.filesystem.LocalFileSystemClientFactory;
import io.prestosql.spi.filesystem.HetuFileSystemClient;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Properties;

/**
* 动态目录数据库客户端工厂
*/
public class DbFileSystemClientFactory extends LocalFileSystemClientFactory {
private static final String NAME_DB = "db";

@Override
public HetuFileSystemClient getFileSystemClient(Properties properties) {
return new HetuDbFileSystemClient(new DbConfig(properties), Paths.get("/"));
}

@Override
public HetuFileSystemClient getFileSystemClient(Properties properties, Path root) {
return new HetuDbFileSystemClient(new DbConfig(properties), root);
}

@Override
public String getName() {
return NAME_DB;
}
}
Loading