Skip to content

Commit

Permalink
Filter out feature sets that dont share the same source (#339)
Browse files Browse the repository at this point in the history
* Filter out feature sets that dont share the same source

* Apply spotless

* Change typo
  • Loading branch information
Chen Zhiling authored and feast-ci-bot committed Nov 29, 2019
1 parent 3b4dcaa commit e8d1b01
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 19 deletions.
33 changes: 14 additions & 19 deletions core/src/main/java/feast/core/grpc/CoreServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@
import feast.core.service.SpecService;
import io.grpc.stub.StreamObserver;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.lognet.springboot.grpc.GRpcService;
Expand All @@ -56,8 +54,14 @@
@GRpcService
public class CoreServiceImpl extends CoreServiceImplBase {

@Autowired private SpecService specService;
@Autowired private JobCoordinatorService jobCoordinatorService;
private SpecService specService;
private JobCoordinatorService jobCoordinatorService;

@Autowired
public CoreServiceImpl(SpecService specService, JobCoordinatorService jobCoordinatorService) {
this.specService = specService;
this.jobCoordinatorService = jobCoordinatorService;
}

@Override
public void getFeastCoreVersion(
Expand Down Expand Up @@ -114,23 +118,10 @@ public void applyFeatureSet(
ApplyFeatureSetRequest request, StreamObserver<ApplyFeatureSetResponse> responseObserver) {
try {
ApplyFeatureSetResponse response = specService.applyFeatureSet(request.getFeatureSet());
String featureSetName = response.getFeatureSet().getName();
ListStoresResponse stores = specService.listStores(Filter.newBuilder().build());
for (Store store : stores.getStoreList()) {
List<Subscription> relevantSubscriptions =
store.getSubscriptionsList().stream()
.filter(
sub -> {
String subString = sub.getName();
if (!subString.contains(".*")) {
subString = subString.replace("*", ".*");
}
Pattern p = Pattern.compile(subString);
return p.matcher(featureSetName).matches();
})
.collect(Collectors.toList());
Set<FeatureSetSpec> featureSetSpecs = new HashSet<>();
for (Subscription subscription : relevantSubscriptions) {
for (Subscription subscription : store.getSubscriptionsList()) {
featureSetSpecs.addAll(
specService
.listFeatureSets(
Expand All @@ -141,9 +132,13 @@ public void applyFeatureSet(
.getFeatureSetsList());
}
if (!featureSetSpecs.isEmpty() && featureSetSpecs.contains(response.getFeatureSet())) {
// We use the request featureSet source because it contains the information
// We use the response featureSet source because it contains the information
// about whether to default to the default feature stream or not
SourceProto.Source source = response.getFeatureSet().getSource();
featureSetSpecs =
featureSetSpecs.stream()
.filter(fs -> fs.getSource().equals(source))
.collect(Collectors.toSet());
jobCoordinatorService.startOrUpdateJob(
Lists.newArrayList(featureSetSpecs), source, store);
}
Expand Down
153 changes: 153 additions & 0 deletions core/src/test/java/feast/core/grpc/CoreServiceImplTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2019 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.core.grpc;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;

import com.google.protobuf.InvalidProtocolBufferException;
import feast.core.CoreServiceProto.ApplyFeatureSetRequest;
import feast.core.CoreServiceProto.ApplyFeatureSetResponse;
import feast.core.CoreServiceProto.ApplyFeatureSetResponse.Status;
import feast.core.CoreServiceProto.ListFeatureSetsRequest;
import feast.core.CoreServiceProto.ListFeatureSetsResponse;
import feast.core.CoreServiceProto.ListStoresResponse;
import feast.core.FeatureSetProto.FeatureSetSpec;
import feast.core.SourceProto.KafkaSourceConfig;
import feast.core.SourceProto.Source;
import feast.core.SourceProto.SourceType;
import feast.core.StoreProto.Store;
import feast.core.StoreProto.Store.RedisConfig;
import feast.core.StoreProto.Store.StoreType;
import feast.core.StoreProto.Store.Subscription;
import feast.core.service.JobCoordinatorService;
import feast.core.service.SpecService;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.Mock;

public class CoreServiceImplTest {

@Mock private JobCoordinatorService jobCoordinatorService;

@Mock private SpecService specService;

@Captor private ArgumentCaptor<ArrayList<FeatureSetSpec>> fsListArgCaptor;

@Before
public void setUp() {
initMocks(this);
}

@Test
public void shouldPassCorrectListOfFeatureSetsToJobService()
throws InvalidProtocolBufferException {
CoreServiceImpl coreService = new CoreServiceImpl(specService, jobCoordinatorService);
Store store =
Store.newBuilder()
.setType(StoreType.REDIS)
.setRedisConfig(RedisConfig.newBuilder().setHost("localhost").setPort(6379).build())
.addSubscriptions(Subscription.newBuilder().setName("*").setVersion(">0"))
.build();
FeatureSetSpec fs1Sc1 =
FeatureSetSpec.newBuilder()
.setName("feature_set")
.setVersion(1)
.setSource(
Source.newBuilder()
.setType(SourceType.KAFKA)
.setKafkaSourceConfig(
KafkaSourceConfig.newBuilder()
.setBootstrapServers("kafka:9092")
.setTopic("topic1")
.build()))
.build();
FeatureSetSpec fs2Sc1 =
FeatureSetSpec.newBuilder()
.setName("feature_set_other")
.setVersion(1)
.setSource(
Source.newBuilder()
.setType(SourceType.KAFKA)
.setKafkaSourceConfig(
KafkaSourceConfig.newBuilder()
.setBootstrapServers("kafka:9092")
.setTopic("topic1")
.build()))
.build();
FeatureSetSpec fs3Sc2 =
FeatureSetSpec.newBuilder()
.setName("feature_set")
.setVersion(2)
.setSource(
Source.newBuilder()
.setType(SourceType.KAFKA)
.setKafkaSourceConfig(
KafkaSourceConfig.newBuilder()
.setBootstrapServers("kafka:9092")
.setTopic("topic2")
.build()))
.build();
when(specService.applyFeatureSet(fs1Sc1))
.thenReturn(
ApplyFeatureSetResponse.newBuilder()
.setStatus(Status.CREATED)
.setFeatureSet(fs1Sc1)
.build());
when(specService.listStores(ArgumentMatchers.any()))
.thenReturn(ListStoresResponse.newBuilder().addStore(store).build());
when(specService.listFeatureSets(
ListFeatureSetsRequest.Filter.newBuilder()
.setFeatureSetName("*")
.setFeatureSetVersion(">0")
.build()))
.thenReturn(
ListFeatureSetsResponse.newBuilder()
.addFeatureSets(fs1Sc1)
.addFeatureSets(fs3Sc2)
.addFeatureSets(fs2Sc1)
.build());

coreService.applyFeatureSet(
ApplyFeatureSetRequest.newBuilder().setFeatureSet(fs1Sc1).build(),
new StreamObserver<ApplyFeatureSetResponse>() {
@Override
public void onNext(ApplyFeatureSetResponse applyFeatureSetResponse) {}

@Override
public void onError(Throwable throwable) {}

@Override
public void onCompleted() {}
});

verify(jobCoordinatorService, times(1))
.startOrUpdateJob(fsListArgCaptor.capture(), eq(fs1Sc1.getSource()), eq(store));

assertThat(fsListArgCaptor.getValue(), containsInAnyOrder(fs1Sc1, fs2Sc1));
}
}

0 comments on commit e8d1b01

Please sign in to comment.