Skip to content

Commit

Permalink
Support parsing Doris sql
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyang377 committed Oct 19, 2024
1 parent 044e049 commit d87aedd
Show file tree
Hide file tree
Showing 27 changed files with 409 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ private static void checkTableExists(final SQLStatementBinderContext binderConte
if (SystemSchemaManager.isSystemTable(schemaName, tableName)) {
return;
}
if (binderContext.getExternalTableBinderContexts().containsKey(tableName)) {
if (binderContext.getExternalTableBinderContexts().containsKey(new CaseInsensitiveString(tableName))) {
return;
}
ShardingSpherePreconditions.checkState(binderContext.getMetaData().containsDatabase(databaseName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
/**
* Substitutable column name token.
*/
@EqualsAndHashCode(callSuper = false)
// TODO remove @EqualsAndHashCode in token rewriter
@EqualsAndHashCode(callSuper = false, exclude = "projections")
public final class SubstitutableColumnNameToken extends SQLToken implements Substitutable, RouteUnitAware {

private static final String COLUMN_NAME_SPLITTER = ", ";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,12 @@
*/
public final class PipelineDataSourceCheckEngine {

private final DialectDatabasePrivilegeChecker privilegeChecker;

private final DialectPipelineDatabaseVariableChecker variableChecker;
private final DatabaseType databaseType;

private final PipelinePrepareSQLBuilder sqlBuilder;

public PipelineDataSourceCheckEngine(final DatabaseType databaseType) {
privilegeChecker = DatabaseTypedSPILoader.findService(DialectDatabasePrivilegeChecker.class, databaseType).orElse(null);
variableChecker = DatabaseTypedSPILoader.findService(DialectPipelineDatabaseVariableChecker.class, databaseType).orElse(null);
this.databaseType = databaseType;
sqlBuilder = new PipelinePrepareSQLBuilder(databaseType);
}

Expand All @@ -75,12 +72,8 @@ public void checkConnection(final Collection<DataSource> dataSources) {
*/
public void checkSourceDataSources(final Collection<DataSource> dataSources) {
checkConnection(dataSources);
if (null != privilegeChecker) {
dataSources.forEach(each -> privilegeChecker.check(each, PrivilegeCheckType.PIPELINE));
}
if (null != variableChecker) {
dataSources.forEach(variableChecker::check);
}
DatabaseTypedSPILoader.findService(DialectDatabasePrivilegeChecker.class, databaseType).ifPresent(optional -> dataSources.forEach(each -> optional.check(each, PrivilegeCheckType.PIPELINE)));
DatabaseTypedSPILoader.findService(DialectPipelineDatabaseVariableChecker.class, databaseType).ifPresent(optional -> dataSources.forEach(optional::check));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,13 @@ public void setLogWriter(final PrintWriter out) throws SQLException {
}

@Override
public void setLoginTimeout(final int seconds) throws SQLException {
dataSource.setLoginTimeout(seconds);
public int getLoginTimeout() throws SQLException {
return dataSource.getLoginTimeout();
}

@Override
public int getLoginTimeout() throws SQLException {
return dataSource.getLoginTimeout();
public void setLoginTimeout(final int seconds) throws SQLException {
dataSource.setLoginTimeout(seconds);
}

@Override
Expand All @@ -118,13 +118,7 @@ public void close() throws SQLException {
log.warn("Data source is not closed, it might cause connection leak, data source: {}", dataSource);
return;
}
try {
new DataSourcePoolDestroyer(dataSource).asyncDestroy();
closed.set(true);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
throw new SQLException("Data source close failed.", ex);
}
new DataSourcePoolDestroyer(dataSource).asyncDestroy();
closed.set(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ public static PipelineExecuteEngine newFixedThreadInstance(final int threadNumbe
return new PipelineExecuteEngine(Executors.newFixedThreadPool(threadNumber, ExecutorThreadFactoryBuilder.build(threadNameFormat)));
}

/**
* Submit a {@code LifecycleExecutor} to execute.
*
* @param pipelineLifecycleRunnable lifecycle executor
* @return execute future
*/
public CompletableFuture<?> submit(final PipelineLifecycleRunnable pipelineLifecycleRunnable) {
return CompletableFuture.runAsync(pipelineLifecycleRunnable, executorService);
}

/**
* Submit a {@code LifecycleExecutor} with callback {@code ExecuteCallback} to execute.
*
Expand All @@ -82,30 +92,19 @@ public CompletableFuture<?> submit(final PipelineLifecycleRunnable pipelineLifec
executeCallback.onSuccess();
} else {
Throwable cause = throwable.getCause();
executeCallback.onFailure(null != cause ? cause : throwable);
executeCallback.onFailure(null == cause ? throwable : cause);
}
}, CALLBACK_EXECUTOR);
}

/**
* Submit a {@code LifecycleExecutor} to execute.
*
* @param pipelineLifecycleRunnable lifecycle executor
* @return execute future
*/
public CompletableFuture<?> submit(final PipelineLifecycleRunnable pipelineLifecycleRunnable) {
return CompletableFuture.runAsync(pipelineLifecycleRunnable, executorService);
}

/**
* Shutdown.
*/
public void shutdown() {
if (executorService.isShutdown()) {
return;
if (!executorService.isShutdown()) {
executorService.shutdown();
executorService.shutdownNow();
}
executorService.shutdown();
executorService.shutdownNow();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,9 @@ public void init(final Properties props) {

@Override
public void intercept(final PipelineSQLOperationType type, final Number data) {
if (type != PipelineSQLOperationType.SELECT) {
return;
if (type == PipelineSQLOperationType.SELECT) {
rateLimiter.acquire(null == data ? 1 : data.intValue());
}
rateLimiter.acquire(null != data ? data.intValue() : 1);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,8 @@ public void init(final Properties props) {

@Override
public void intercept(final PipelineSQLOperationType type, final Number data) {
switch (type) {
case INSERT:
case DELETE:
case UPDATE:
rateLimiter.acquire(null != data ? data.intValue() : 1);
break;
default:
if (type != PipelineSQLOperationType.SELECT) {
rateLimiter.acquire(null == data ? 1 : data.intValue());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.core.channel;

import org.apache.shardingsphere.data.pipeline.core.channel.memory.MemoryPipelineChannel;
import org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
import org.apache.shardingsphere.infra.algorithm.core.config.AlgorithmConfiguration;
import org.junit.jupiter.api.Test;

import java.util.Properties;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;

class IncrementalChannelCreatorTest {

@Test
void assertCreate() {
assertThat(IncrementalChannelCreator.create(new AlgorithmConfiguration("MEMORY", new Properties()), mock(IncrementalTaskProgress.class)), instanceOf(MemoryPipelineChannel.class));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.core.channel;

import org.apache.shardingsphere.data.pipeline.core.channel.memory.MemoryPipelineChannel;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import org.apache.shardingsphere.infra.algorithm.core.config.AlgorithmConfiguration;
import org.junit.jupiter.api.Test;

import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;

class InventoryChannelCreatorTest {

@Test
void assertCreate() {
assertThat(InventoryChannelCreator.create(new AlgorithmConfiguration("MEMORY", new Properties()), 1, new AtomicReference<>(mock(IngestPosition.class))),
instanceOf(MemoryPipelineChannel.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

class MemoryPipelineChannelTest {

Expand All @@ -58,4 +60,38 @@ void assertFetchWithZeroTimeout() {
channel.push(records);
assertThat(channel.fetch(10, 0L), is(records));
}

@Test
void assertPeekWithRecords() {
MemoryPipelineChannel channel = new MemoryPipelineChannel(100, new InventoryTaskAckCallback(new AtomicReference<>()));
List<Record> records = Collections.singletonList(new PlaceholderRecord(new IngestFinishedPosition()));
channel.push(records);
assertThat(channel.peek(), is(records));
}

@Test
void assertPeekWithoutRecords() {
assertThat(new MemoryPipelineChannel(100, new InventoryTaskAckCallback(new AtomicReference<>())).peek(), is(Collections.emptyList()));
}

@Test
void assertPollWithRecords() {
MemoryPipelineChannel channel = new MemoryPipelineChannel(100, new InventoryTaskAckCallback(new AtomicReference<>()));
List<Record> records = Collections.singletonList(new PlaceholderRecord(new IngestFinishedPosition()));
channel.push(records);
assertThat(channel.poll(), is(records));
}

@Test
void assertPollWithoutRecords() {
assertThat(new MemoryPipelineChannel(100, new InventoryTaskAckCallback(new AtomicReference<>())).poll(), is(Collections.emptyList()));
}

@Test
void assertAck() {
InventoryTaskAckCallback callback = mock(InventoryTaskAckCallback.class);
List<Record> records = Collections.singletonList(new PlaceholderRecord(new IngestFinishedPosition()));
new MemoryPipelineChannel(100, callback).ack(records);
verify(callback).onAck(records);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
package org.apache.shardingsphere.data.pipeline.core.checker;

import org.apache.shardingsphere.data.pipeline.core.checker.PipelineDataSourceCheckEngine;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
import org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
Expand All @@ -39,6 +38,7 @@
import java.util.Collections;
import java.util.LinkedList;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -83,14 +83,21 @@ void assertCheckConnectionFailed() throws SQLException {
assertThrows(SQLWrapperException.class, () -> pipelineDataSourceCheckEngine.checkConnection(dataSources));
}

@Test
void assertCheckSourceDataSources() throws SQLException {
when(dataSource.getConnection()).thenReturn(connection);
pipelineDataSourceCheckEngine.checkSourceDataSources(dataSources);
verify(dataSource).getConnection();
}

@Test
void assertCheckTargetDataSources() throws SQLException {
when(dataSource.getConnection()).thenReturn(connection);
when(connection.prepareStatement("SELECT * FROM t_order LIMIT 1")).thenReturn(preparedStatement);
when(preparedStatement.executeQuery()).thenReturn(resultSet);
ImporterConfiguration importerConfig = mock(ImporterConfiguration.class);
when(importerConfig.getQualifiedTables()).thenReturn(Collections.singleton(new CaseInsensitiveQualifiedTable(null, "t_order")));
pipelineDataSourceCheckEngine.checkTargetDataSources(dataSources, importerConfig);
assertDoesNotThrow(() -> pipelineDataSourceCheckEngine.checkTargetDataSources(dataSources, importerConfig));
}

@Test
Expand All @@ -103,4 +110,10 @@ void assertCheckTargetDataSourcesFailed() throws SQLException {
when(importerConfig.getQualifiedTables()).thenReturn(Collections.singleton(new CaseInsensitiveQualifiedTable(null, "t_order")));
assertThrows(PrepareJobWithTargetTableNotEmptyException.class, () -> pipelineDataSourceCheckEngine.checkTargetDataSources(dataSources, importerConfig));
}

@Test
void assertCheckTargetDataSourcesWhenSQLExceptionThrown() throws SQLException {
when(dataSource.getConnection()).thenThrow(new SQLException(""));
assertThrows(SQLWrapperException.class, () -> pipelineDataSourceCheckEngine.checkTargetDataSources(dataSources, mock(ImporterConfiguration.class)));
}
}
Loading

0 comments on commit d87aedd

Please sign in to comment.