Skip to content

Commit

Permalink
1. Made the elastic search index name configurable
Browse files Browse the repository at this point in the history
2. Fixed the deletion of partition documents in elastic search
  • Loading branch information
ajoymajumdar committed Jun 21, 2016
1 parent c18e1a0 commit 1b42f85
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

public class ArchaiusConfigImpl implements Config {
private final DynamicStringProperty defaultTypeConverter;
private final DynamicStringProperty elasticSearchIndexName;
private final DynamicStringProperty elasticSearchClusterName;
private final DynamicStringProperty elasticSearchClusterNodes;
private final DynamicIntProperty elasticSearchClusterPort;
Expand Down Expand Up @@ -60,6 +61,7 @@ public ArchaiusConfigImpl() {
public ArchaiusConfigImpl(DynamicPropertyFactory factory) {
this.defaultTypeConverter = factory
.getStringProperty("metacat.type.converter", "com.netflix.metacat.converters.impl.PrestoTypeConverter");
this.elasticSearchIndexName = factory.getStringProperty("metacat.elacticsearch.index.name", "metacat");
this.elasticSearchClusterName = factory.getStringProperty("metacat.elacticsearch.cluster.name", null);
this.elasticSearchClusterNodes = factory.getStringProperty("metacat.elacticsearch.cluster.nodes", null);
this.elasticSearchClusterPort = factory.getIntProperty("metacat.elacticsearch.cluster.port", 7102);
Expand Down Expand Up @@ -232,4 +234,9 @@ public int getServiceMaxNumberOfThreads() {
public List<QualifiedName> getQualifiedNamesToThrowErrorWhenNoFilterOnListPartitions() {
return qualifiedNamesToThrowErrorWhenNoFilterOnListPartitions;
}

@Override
public String getEsIndex() {
return elasticSearchIndexName.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,5 @@ public interface Config {
boolean isUsePigTypes();
int getServiceMaxNumberOfThreads();
List<QualifiedName> getQualifiedNamesToThrowErrorWhenNoFilterOnListPartitions();
String getEsIndex();
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
*/
public class ElasticSearchUtil {
private XContentType contentType = Requests.INDEX_CONTENT_TYPE;
private static final String ES_INDEX = "metacat";
private final String esIndex;
private static final Retryer<Void> RETRY_ES_PUBLISH = RetryerBuilder.<Void>newBuilder()
.retryIfExceptionOfType(ElasticsearchException.class)
.withWaitStrategy(WaitStrategies.incrementingWait(10, TimeUnit.SECONDS, 30, TimeUnit.SECONDS))
Expand All @@ -81,6 +81,7 @@ public ElasticSearchUtil(@Nullable Client client, Config config, MetacatJson met
this.config = config;
this.client = client;
this.metacatJson = metacatJson;
this.esIndex = config.getEsIndex();
}

/**
Expand All @@ -91,7 +92,7 @@ public ElasticSearchUtil(@Nullable Client client, Config config, MetacatJson met
public void delete(String type, String id) {
try {
RETRY_ES_PUBLISH.call(() -> {
client.prepareDelete(ES_INDEX, type, id).execute().actionGet();
client.prepareDelete(esIndex, type, id).execute().actionGet();
return null;
});
} catch (Exception e) {
Expand All @@ -113,7 +114,7 @@ public void delete(String type, List<String> ids) {
try {
RETRY_ES_PUBLISH.call(() -> {
BulkRequestBuilder bulkRequest = client.prepareBulk();
ids.forEach(id -> bulkRequest.add( client.prepareDelete(ES_INDEX, type, id)));
ids.forEach(id -> bulkRequest.add( client.prepareDelete(esIndex, type, id)));
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if(bulkResponse.hasFailures()){
for(BulkItemResponse item: bulkResponse.getItems()){
Expand Down Expand Up @@ -145,7 +146,7 @@ public void softDelete(String type, String id, MetacatContext metacatContext) {
XContentBuilder builder = XContentFactory.contentBuilder(contentType);
builder.startObject().field(DELETED, true).field(USER,
metacatContext.getUserName()).endObject();
client.prepareUpdate(ES_INDEX, type, id).setDoc(builder).get();
client.prepareUpdate(esIndex, type, id).setDoc(builder).get();
return null;
});
} catch (Exception e) {
Expand Down Expand Up @@ -175,7 +176,7 @@ private void _softDelete(String type, List<String> ids, MetacatContext metacatCo
XContentBuilder builder = XContentFactory.contentBuilder(contentType);
builder.startObject().field(DELETED, true).field(USER,
metacatContext.getUserName()).endObject();
ids.forEach(id -> bulkRequest.add( client.prepareUpdate(ES_INDEX, type, id).setDoc(builder)));
ids.forEach(id -> bulkRequest.add( client.prepareUpdate(esIndex, type, id).setDoc(builder)));
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
for (BulkItemResponse item : bulkResponse.getItems()) {
Expand Down Expand Up @@ -212,7 +213,7 @@ public void updates(String type, List<String> ids, MetacatContext metacatContext
BulkRequestBuilder bulkRequest = client.prepareBulk();
ids.forEach(id -> {
node.put(USER, metacatContext.getUserName());
bulkRequest.add( client.prepareUpdate(ES_INDEX, type, id).setDoc(metacatJson.toJsonAsBytes(node)));
bulkRequest.add( client.prepareUpdate(esIndex, type, id).setDoc(metacatJson.toJsonAsBytes(node)));
});
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
Expand Down Expand Up @@ -242,7 +243,7 @@ public void updates(String type, List<String> ids, MetacatContext metacatContext
public void save(String type, String id, String body) {
try {
RETRY_ES_PUBLISH.call(() -> {
client.prepareIndex(ES_INDEX, type, id).setSource(body).execute().actionGet();
client.prepareIndex(esIndex, type, id).setSource(body).execute().actionGet();
return null;
});
} catch (Exception e) {
Expand Down Expand Up @@ -274,7 +275,7 @@ private void _save(String type, List<ElasticSearchDoc> docs) {
try {
RETRY_ES_PUBLISH.call(() -> {
BulkRequestBuilder bulkRequest = client.prepareBulk();
docs.forEach(doc -> bulkRequest.add(client.prepareIndex(ES_INDEX, type, doc.getId())
docs.forEach(doc -> bulkRequest.add(client.prepareIndex(esIndex, type, doc.getId())
.setSource(doc.toJsonString())));
if (bulkRequest.numberOfActions() > 0) {
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
Expand Down Expand Up @@ -311,7 +312,7 @@ public List<String> getTableIdsByUri(String type, String dataUri) {
if( dataUri != null) {
//
// Run the query and get the response.
SearchRequestBuilder request = client.prepareSearch(ES_INDEX)
SearchRequestBuilder request = client.prepareSearch(esIndex)
.setTypes(type)
.setSearchType(SearchType.QUERY_THEN_FETCH)
.setQuery(QueryBuilders.termQuery("serde.uri", dataUri))
Expand All @@ -329,7 +330,7 @@ public List<String> getTableIdsByCatalogs(String type, List<QualifiedName> quali
List<String> ids = Lists.newArrayList();
//
// Run the query and get the response.
SearchRequestBuilder request = client.prepareSearch(ES_INDEX)
SearchRequestBuilder request = client.prepareSearch(esIndex)
.setTypes(type)
.setSearchType(SearchType.QUERY_THEN_FETCH)
.setQuery(QueryBuilders.termsQuery("name.qualifiedName.tree", qualifiedNames))
Expand All @@ -351,7 +352,7 @@ public <T> List<T> getQualifiedNamesByMarkerByNames(String type, List<QualifiedN
.must(QueryBuilders.termsQuery("name.qualifiedName.tree", names))
.must(QueryBuilders.termQuery("deleted_", false))
.mustNot(QueryBuilders.termQuery("refreshMarker_", marker));
SearchRequestBuilder request = client.prepareSearch(ES_INDEX)
SearchRequestBuilder request = client.prepareSearch(esIndex)
.setTypes(type)
.setSearchType(SearchType.QUERY_THEN_FETCH)
.setQuery(queryBuilder)
Expand All @@ -378,20 +379,20 @@ private <T> List<T> parseResponse(final SearchResponse response, final Class<T>
}

public void refresh(){
client.admin().indices().refresh(new RefreshRequest(ES_INDEX)).actionGet();
client.admin().indices().refresh(new RefreshRequest(esIndex)).actionGet();
}

public ElasticSearchDoc get(String type, String id) {
ElasticSearchDoc result = null;
GetResponse response = client.prepareGet(ES_INDEX, type, id).execute().actionGet();
GetResponse response = client.prepareGet(esIndex, type, id).execute().actionGet();
if( response.isExists()){
result = ElasticSearchDoc.parse(response);
}
return result;
}

public void delete(MetacatContext metacatContext, String type, boolean softDelete) {
SearchResponse response = client.prepareSearch(ES_INDEX)
SearchResponse response = client.prepareSearch(esIndex)
.setSearchType(SearchType.SCAN)
.setScroll(new TimeValue(config.getElasticSearchScrollTimeout()))
.setSize(config.getElasticSearchScrollFetchSize())
Expand Down Expand Up @@ -424,15 +425,15 @@ public void log(String method, String type, String name, String data, String log
source.put("error", error);
source.put("message", logMessage);
source.put("details", Throwables.getStackTraceAsString(ex));
client.prepareIndex(ES_INDEX, "metacat-log").setSource(source).execute().actionGet();
client.prepareIndex(esIndex, "metacat-log").setSource(source).execute().actionGet();
} catch(Exception e){
log.warn("Failed saving the log message in elastic search for method {}, name {}. Message: {}", method, name, e.getMessage());
}
}

public List<TableDto> simpleSearch(String searchString){
List<TableDto> result = Lists.newArrayList();
SearchResponse response = client.prepareSearch(ES_INDEX)
SearchResponse response = client.prepareSearch(esIndex)
.setTypes(table.name())
.setSearchType(SearchType.QUERY_THEN_FETCH)
.setQuery(QueryBuilders.termQuery("_all", searchString))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,17 @@ public void metacatDeleteTablePostEventHandler(MetacatDeleteTablePostEvent event
@Subscribe
public void metacatDeleteMViewPartitionPostEventHandler(MetacatDeleteMViewPartitionPostEvent event) {
List<String> partitionIds = event.getPartitionIds();
es.softDelete(partition.name(), partitionIds, event.getMetacatContext());
List<String> esPartitionIds = partitionIds.stream()
.map(partitionId -> event.getName().toString() + "/" + partitionId).collect(Collectors.toList());
es.softDelete(partition.name(), esPartitionIds, event.getMetacatContext());
}

@Subscribe
public void metacatDeleteTablePartitionPostEventHandler(MetacatDeleteTablePartitionPostEvent event) {
List<String> partitionIds = event.getPartitionIds();
es.softDelete(partition.name(), partitionIds, event.getMetacatContext());
List<String> esPartitionIds = partitionIds.stream()
.map(partitionId -> event.getName().toString() + "/" + partitionId).collect(Collectors.toList());
es.softDelete(partition.name(), esPartitionIds, event.getMetacatContext());
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class BaseEsSpec extends Specification {
index.source(getFile('metacat.json').getText())
client.admin().indices().create( index).actionGet()
metacatJson = MetacatJsonLocator.INSTANCE
config.getEsIndex() >> "metacat"
es = new ElasticSearchUtil(client, config, metacatJson)
}

Expand Down
44 changes: 36 additions & 8 deletions metacat-main/src/test/resources/search/mapping/metacat.json
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@
}
}
},
"com.netflix.dse.mds.metric.MaxStrLen": {
"type": "object",
"enabled": false
},
"lifetime": {
"properties": {
"partitionedBy": {
Expand All @@ -140,15 +144,39 @@
},
"dataMetadata":{
"properties": {
"com.netflix.dse.mds.metric.TopK": {
"type": "object",
"enabled": false
},
"com.netflix.dse.mds.metric.GenieJobId": {
"metrics": {
"properties": {
"value": {
"type": "string",
"index": "not_analyzed"
"com.netflix.dse.mds.metric.TopK": {
"type": "object",
"enabled": false
},
"com.netflix.dse.mds.metric.MaxFieldMetric": {
"type": "object",
"enabled": false
},
"com.netflix.dse.mds.metric.MaxStrLen": {
"type": "object",
"enabled": false
},
"com.netflix.dse.mds.metric.MinFieldMetric": {
"type": "object",
"enabled": false
},
"com.netflix.dse.mds.metric.NullCountFieldMetric": {
"type": "object",
"enabled": false
},
"com.netflix.dse.mds.metric.SumFieldMetric": {
"type": "object",
"enabled": false
},
"com.netflix.dse.mds.metric.GenieJobId": {
"properties": {
"value": {
"type": "string",
"index": "not_analyzed"
}
}
}
}
}
Expand Down

0 comments on commit 1b42f85

Please sign in to comment.