From cd37be08685a0fbfc407adfde66c862e41974149 Mon Sep 17 00:00:00 2001 From: Eric Long Date: Wed, 14 Aug 2024 12:14:49 -0400 Subject: [PATCH] 0006557: Filter extension point during extraction --- .../extract/ExtractDataReaderFactory.java | 6 +++- .../io/data/reader/ExtractDataReader.java | 23 +++++++++++- .../io/data/reader/IExtractDataFilter.java | 36 +++++++++++++++++++ 3 files changed, 63 insertions(+), 2 deletions(-) create mode 100644 symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/IExtractDataFilter.java diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/extract/ExtractDataReaderFactory.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/extract/ExtractDataReaderFactory.java index c7a2dbc681..0bf2a99775 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/extract/ExtractDataReaderFactory.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/extract/ExtractDataReaderFactory.java @@ -20,11 +20,14 @@ */ package org.jumpmind.symmetric.extract; +import java.util.List; + import org.jumpmind.db.platform.IDatabasePlatform; import org.jumpmind.symmetric.ISymmetricEngine; import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.db.ISymmetricDialect; import org.jumpmind.symmetric.io.data.reader.ExtractDataReader; +import org.jumpmind.symmetric.io.data.reader.IExtractDataFilter; import org.jumpmind.symmetric.io.data.reader.IExtractDataReaderSource; import org.jumpmind.symmetric.model.Node; @@ -38,6 +41,7 @@ public ExtractDataReaderFactory(ISymmetricEngine engine) { public ExtractDataReader getReader(IDatabasePlatform platform, IExtractDataReaderSource source, Node sourceNode, Node targetNode) { ISymmetricDialect symmetricDialect = engine.getSymmetricDialect(); boolean isUsingUnitypes = symmetricDialect.getParameterService().is(ParameterConstants.DBDIALECT_SYBASE_ASE_CONVERT_UNITYPES_FOR_SYNC); - return new ExtractDataReader(platform, source, isUsingUnitypes); + List filters = engine.getExtensionService().getExtensionPointList(IExtractDataFilter.class); + return new ExtractDataReader(platform, source, filters, isUsingUnitypes); } } diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/ExtractDataReader.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/ExtractDataReader.java index 59d02d52b6..5e7266404f 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/ExtractDataReader.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/ExtractDataReader.java @@ -54,6 +54,7 @@ public class ExtractDataReader implements IDataReader { protected IDatabasePlatform platform; protected List sourcesToUse; protected IExtractDataReaderSource currentSource; + protected List filters; protected Batch batch; protected Table table; protected CsvData data; @@ -68,10 +69,11 @@ public ExtractDataReader(IDatabasePlatform platform, IExtractDataReaderSource so this.isSybaseASE = platform.getName().equals(DatabaseNamesConstants.ASE); } - public ExtractDataReader(IDatabasePlatform platform, IExtractDataReaderSource source, Boolean isUsingUnitypes) { + public ExtractDataReader(IDatabasePlatform platform, IExtractDataReaderSource source, List filters, boolean isUsingUnitypes) { this.sourcesToUse = new ArrayList(); this.sourcesToUse.add(source); this.platform = platform; + this.filters = filters; this.isUsingUnitypes = isUsingUnitypes; this.isSybaseASE = platform.getName().equals(DatabaseNamesConstants.ASE); } @@ -130,6 +132,25 @@ protected String substituteVariables(String sourceString) { } public CsvData nextData() { + CsvData nextData = nextDataFromSource(); + if (nextData != null && filters != null && filters.size() != 0) { + boolean shouldExtract = true; + while (shouldExtract) { + for (IExtractDataFilter filter : filters) { + shouldExtract &= filter.filterData(dataContext, batch, table, nextData); + } + if (shouldExtract) { + break; + } else { + nextData = nextDataFromSource(); + shouldExtract = nextData != null; + } + } + } + return nextData; + } + + protected CsvData nextDataFromSource() { if (this.table != null) { if (this.data == null) { this.data = this.currentSource.next(); diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/IExtractDataFilter.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/IExtractDataFilter.java new file mode 100644 index 0000000000..9652fdf6dc --- /dev/null +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/IExtractDataFilter.java @@ -0,0 +1,36 @@ +/** + * Licensed to JumpMind Inc under one or more contributor + * license agreements. See the NOTICE file distributed + * with this work for additional information regarding + * copyright ownership. JumpMind Inc licenses this file + * to you under the GNU General Public License, version 3.0 (GPLv3) + * (the "License"); you may not use this file except in compliance + * with the License. + * + * You should have received a copy of the GNU General Public License, + * version 3.0 (GPLv3) along with this library; if not, see + * . + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.jumpmind.symmetric.io.data.reader; + +import org.jumpmind.db.model.Table; +import org.jumpmind.extension.IExtensionPoint; +import org.jumpmind.symmetric.io.data.Batch; +import org.jumpmind.symmetric.io.data.CsvData; +import org.jumpmind.symmetric.io.data.DataContext; + +public interface IExtractDataFilter extends IExtensionPoint { + /** + * Called during extraction of sym_data or source table + * + * @return true if the row should be extracted + */ + public boolean filterData(DataContext context, Batch batch, Table table, CsvData data); +}