From e8d1b01ee87c21f6aab37448d073582b62e610fe Mon Sep 17 00:00:00 2001 From: Chen Zhiling Date: Fri, 29 Nov 2019 17:02:44 +0800 Subject: [PATCH] Filter out feature sets that dont share the same source (#339) * Filter out feature sets that dont share the same source * Apply spotless * Change typo --- .../java/feast/core/grpc/CoreServiceImpl.java | 33 ++-- .../feast/core/grpc/CoreServiceImplTest.java | 153 ++++++++++++++++++ 2 files changed, 167 insertions(+), 19 deletions(-) create mode 100644 core/src/test/java/feast/core/grpc/CoreServiceImplTest.java diff --git a/core/src/main/java/feast/core/grpc/CoreServiceImpl.java b/core/src/main/java/feast/core/grpc/CoreServiceImpl.java index 5861742e00..a057789df7 100644 --- a/core/src/main/java/feast/core/grpc/CoreServiceImpl.java +++ b/core/src/main/java/feast/core/grpc/CoreServiceImpl.java @@ -42,9 +42,7 @@ import feast.core.service.SpecService; import io.grpc.stub.StreamObserver; import java.util.HashSet; -import java.util.List; import java.util.Set; -import java.util.regex.Pattern; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.lognet.springboot.grpc.GRpcService; @@ -56,8 +54,14 @@ @GRpcService public class CoreServiceImpl extends CoreServiceImplBase { - @Autowired private SpecService specService; - @Autowired private JobCoordinatorService jobCoordinatorService; + private SpecService specService; + private JobCoordinatorService jobCoordinatorService; + + @Autowired + public CoreServiceImpl(SpecService specService, JobCoordinatorService jobCoordinatorService) { + this.specService = specService; + this.jobCoordinatorService = jobCoordinatorService; + } @Override public void getFeastCoreVersion( @@ -114,23 +118,10 @@ public void applyFeatureSet( ApplyFeatureSetRequest request, StreamObserver responseObserver) { try { ApplyFeatureSetResponse response = specService.applyFeatureSet(request.getFeatureSet()); - String featureSetName = response.getFeatureSet().getName(); ListStoresResponse stores = specService.listStores(Filter.newBuilder().build()); for (Store store : stores.getStoreList()) { - List relevantSubscriptions = - store.getSubscriptionsList().stream() - .filter( - sub -> { - String subString = sub.getName(); - if (!subString.contains(".*")) { - subString = subString.replace("*", ".*"); - } - Pattern p = Pattern.compile(subString); - return p.matcher(featureSetName).matches(); - }) - .collect(Collectors.toList()); Set featureSetSpecs = new HashSet<>(); - for (Subscription subscription : relevantSubscriptions) { + for (Subscription subscription : store.getSubscriptionsList()) { featureSetSpecs.addAll( specService .listFeatureSets( @@ -141,9 +132,13 @@ public void applyFeatureSet( .getFeatureSetsList()); } if (!featureSetSpecs.isEmpty() && featureSetSpecs.contains(response.getFeatureSet())) { - // We use the request featureSet source because it contains the information + // We use the response featureSet source because it contains the information // about whether to default to the default feature stream or not SourceProto.Source source = response.getFeatureSet().getSource(); + featureSetSpecs = + featureSetSpecs.stream() + .filter(fs -> fs.getSource().equals(source)) + .collect(Collectors.toSet()); jobCoordinatorService.startOrUpdateJob( Lists.newArrayList(featureSetSpecs), source, store); } diff --git a/core/src/test/java/feast/core/grpc/CoreServiceImplTest.java b/core/src/test/java/feast/core/grpc/CoreServiceImplTest.java new file mode 100644 index 0000000000..5a3794cc65 --- /dev/null +++ b/core/src/test/java/feast/core/grpc/CoreServiceImplTest.java @@ -0,0 +1,153 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.grpc; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +import com.google.protobuf.InvalidProtocolBufferException; +import feast.core.CoreServiceProto.ApplyFeatureSetRequest; +import feast.core.CoreServiceProto.ApplyFeatureSetResponse; +import feast.core.CoreServiceProto.ApplyFeatureSetResponse.Status; +import feast.core.CoreServiceProto.ListFeatureSetsRequest; +import feast.core.CoreServiceProto.ListFeatureSetsResponse; +import feast.core.CoreServiceProto.ListStoresResponse; +import feast.core.FeatureSetProto.FeatureSetSpec; +import feast.core.SourceProto.KafkaSourceConfig; +import feast.core.SourceProto.Source; +import feast.core.SourceProto.SourceType; +import feast.core.StoreProto.Store; +import feast.core.StoreProto.Store.RedisConfig; +import feast.core.StoreProto.Store.StoreType; +import feast.core.StoreProto.Store.Subscription; +import feast.core.service.JobCoordinatorService; +import feast.core.service.SpecService; +import io.grpc.stub.StreamObserver; +import java.util.ArrayList; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatchers; +import org.mockito.Captor; +import org.mockito.Mock; + +public class CoreServiceImplTest { + + @Mock private JobCoordinatorService jobCoordinatorService; + + @Mock private SpecService specService; + + @Captor private ArgumentCaptor> fsListArgCaptor; + + @Before + public void setUp() { + initMocks(this); + } + + @Test + public void shouldPassCorrectListOfFeatureSetsToJobService() + throws InvalidProtocolBufferException { + CoreServiceImpl coreService = new CoreServiceImpl(specService, jobCoordinatorService); + Store store = + Store.newBuilder() + .setType(StoreType.REDIS) + .setRedisConfig(RedisConfig.newBuilder().setHost("localhost").setPort(6379).build()) + .addSubscriptions(Subscription.newBuilder().setName("*").setVersion(">0")) + .build(); + FeatureSetSpec fs1Sc1 = + FeatureSetSpec.newBuilder() + .setName("feature_set") + .setVersion(1) + .setSource( + Source.newBuilder() + .setType(SourceType.KAFKA) + .setKafkaSourceConfig( + KafkaSourceConfig.newBuilder() + .setBootstrapServers("kafka:9092") + .setTopic("topic1") + .build())) + .build(); + FeatureSetSpec fs2Sc1 = + FeatureSetSpec.newBuilder() + .setName("feature_set_other") + .setVersion(1) + .setSource( + Source.newBuilder() + .setType(SourceType.KAFKA) + .setKafkaSourceConfig( + KafkaSourceConfig.newBuilder() + .setBootstrapServers("kafka:9092") + .setTopic("topic1") + .build())) + .build(); + FeatureSetSpec fs3Sc2 = + FeatureSetSpec.newBuilder() + .setName("feature_set") + .setVersion(2) + .setSource( + Source.newBuilder() + .setType(SourceType.KAFKA) + .setKafkaSourceConfig( + KafkaSourceConfig.newBuilder() + .setBootstrapServers("kafka:9092") + .setTopic("topic2") + .build())) + .build(); + when(specService.applyFeatureSet(fs1Sc1)) + .thenReturn( + ApplyFeatureSetResponse.newBuilder() + .setStatus(Status.CREATED) + .setFeatureSet(fs1Sc1) + .build()); + when(specService.listStores(ArgumentMatchers.any())) + .thenReturn(ListStoresResponse.newBuilder().addStore(store).build()); + when(specService.listFeatureSets( + ListFeatureSetsRequest.Filter.newBuilder() + .setFeatureSetName("*") + .setFeatureSetVersion(">0") + .build())) + .thenReturn( + ListFeatureSetsResponse.newBuilder() + .addFeatureSets(fs1Sc1) + .addFeatureSets(fs3Sc2) + .addFeatureSets(fs2Sc1) + .build()); + + coreService.applyFeatureSet( + ApplyFeatureSetRequest.newBuilder().setFeatureSet(fs1Sc1).build(), + new StreamObserver() { + @Override + public void onNext(ApplyFeatureSetResponse applyFeatureSetResponse) {} + + @Override + public void onError(Throwable throwable) {} + + @Override + public void onCompleted() {} + }); + + verify(jobCoordinatorService, times(1)) + .startOrUpdateJob(fsListArgCaptor.capture(), eq(fs1Sc1.getSource()), eq(store)); + + assertThat(fsListArgCaptor.getValue(), containsInAnyOrder(fs1Sc1, fs2Sc1)); + } +}