diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/extract/ExtractDataReaderFactory.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/extract/ExtractDataReaderFactory.java index c7a2dbc681..0bf2a99775 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/extract/ExtractDataReaderFactory.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/extract/ExtractDataReaderFactory.java @@ -20,11 +20,14 @@ */ package org.jumpmind.symmetric.extract; +import java.util.List; + import org.jumpmind.db.platform.IDatabasePlatform; import org.jumpmind.symmetric.ISymmetricEngine; import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.db.ISymmetricDialect; import org.jumpmind.symmetric.io.data.reader.ExtractDataReader; +import org.jumpmind.symmetric.io.data.reader.IExtractDataFilter; import org.jumpmind.symmetric.io.data.reader.IExtractDataReaderSource; import org.jumpmind.symmetric.model.Node; @@ -38,6 +41,7 @@ public ExtractDataReaderFactory(ISymmetricEngine engine) { public ExtractDataReader getReader(IDatabasePlatform platform, IExtractDataReaderSource source, Node sourceNode, Node targetNode) { ISymmetricDialect symmetricDialect = engine.getSymmetricDialect(); boolean isUsingUnitypes = symmetricDialect.getParameterService().is(ParameterConstants.DBDIALECT_SYBASE_ASE_CONVERT_UNITYPES_FOR_SYNC); - return new ExtractDataReader(platform, source, isUsingUnitypes); + List filters = engine.getExtensionService().getExtensionPointList(IExtractDataFilter.class); + return new ExtractDataReader(platform, source, filters, isUsingUnitypes); } } diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/ExtractDataReader.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/ExtractDataReader.java index 59d02d52b6..5e7266404f 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/ExtractDataReader.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/ExtractDataReader.java @@ -54,6 +54,7 @@ public class ExtractDataReader implements IDataReader { protected IDatabasePlatform platform; protected List sourcesToUse; protected IExtractDataReaderSource currentSource; + protected List filters; protected Batch batch; protected Table table; protected CsvData data; @@ -68,10 +69,11 @@ public ExtractDataReader(IDatabasePlatform platform, IExtractDataReaderSource so this.isSybaseASE = platform.getName().equals(DatabaseNamesConstants.ASE); } - public ExtractDataReader(IDatabasePlatform platform, IExtractDataReaderSource source, Boolean isUsingUnitypes) { + public ExtractDataReader(IDatabasePlatform platform, IExtractDataReaderSource source, List filters, boolean isUsingUnitypes) { this.sourcesToUse = new ArrayList(); this.sourcesToUse.add(source); this.platform = platform; + this.filters = filters; this.isUsingUnitypes = isUsingUnitypes; this.isSybaseASE = platform.getName().equals(DatabaseNamesConstants.ASE); } @@ -130,6 +132,25 @@ protected String substituteVariables(String sourceString) { } public CsvData nextData() { + CsvData nextData = nextDataFromSource(); + if (nextData != null && filters != null && filters.size() != 0) { + boolean shouldExtract = true; + while (shouldExtract) { + for (IExtractDataFilter filter : filters) { + shouldExtract &= filter.filterData(dataContext, batch, table, nextData); + } + if (shouldExtract) { + break; + } else { + nextData = nextDataFromSource(); + shouldExtract = nextData != null; + } + } + } + return nextData; + } + + protected CsvData nextDataFromSource() { if (this.table != null) { if (this.data == null) { this.data = this.currentSource.next(); diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/IExtractDataFilter.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/IExtractDataFilter.java new file mode 100644 index 0000000000..9652fdf6dc --- /dev/null +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/IExtractDataFilter.java @@ -0,0 +1,36 @@ +/** + * Licensed to JumpMind Inc under one or more contributor + * license agreements. See the NOTICE file distributed + * with this work for additional information regarding + * copyright ownership. JumpMind Inc licenses this file + * to you under the GNU General Public License, version 3.0 (GPLv3) + * (the "License"); you may not use this file except in compliance + * with the License. + * + * You should have received a copy of the GNU General Public License, + * version 3.0 (GPLv3) along with this library; if not, see + * . + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.jumpmind.symmetric.io.data.reader; + +import org.jumpmind.db.model.Table; +import org.jumpmind.extension.IExtensionPoint; +import org.jumpmind.symmetric.io.data.Batch; +import org.jumpmind.symmetric.io.data.CsvData; +import org.jumpmind.symmetric.io.data.DataContext; + +public interface IExtractDataFilter extends IExtensionPoint { + /** + * Called during extraction of sym_data or source table + * + * @return true if the row should be extracted + */ + public boolean filterData(DataContext context, Batch batch, Table table, CsvData data); +}