From e1696847239999ec57586e1bc272e38e4a4c4a53 Mon Sep 17 00:00:00 2001 From: Senthil Sayeebaba Date: Fri, 14 Dec 2018 11:20:25 -0800 Subject: [PATCH] Removed date range from the query to search for archivable workflows. --- .../dao/es/index/ElasticSearchDAO.java | 51 ++++++++++--------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/es2-persistence/src/main/java/com/netflix/conductor/dao/es/index/ElasticSearchDAO.java b/es2-persistence/src/main/java/com/netflix/conductor/dao/es/index/ElasticSearchDAO.java index 8237839819..c37df2ad7a 100644 --- a/es2-persistence/src/main/java/com/netflix/conductor/dao/es/index/ElasticSearchDAO.java +++ b/es2-persistence/src/main/java/com/netflix/conductor/dao/es/index/ElasticSearchDAO.java @@ -19,7 +19,6 @@ package com.netflix.conductor.dao.es.index; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Predicate; import com.netflix.conductor.annotations.Trace; import com.netflix.conductor.common.metadata.events.EventExecution; import com.netflix.conductor.common.metadata.tasks.Task; @@ -37,6 +36,25 @@ import com.netflix.conductor.dao.es.index.query.parser.ParserException; import com.netflix.conductor.dao.es.utils.RetryUtil; import com.netflix.conductor.metrics.Monitors; +import java.io.InputStream; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import javax.inject.Inject; +import javax.inject.Singleton; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; @@ -68,28 +86,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.inject.Inject; -import javax.inject.Singleton; -import java.io.InputStream; -import java.text.SimpleDateFormat; -import java.time.LocalDate; -import java.time.temporal.ChronoUnit; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.TimeZone; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - /** * @author Viren * @@ -114,6 +110,8 @@ public class ElasticSearchDAO implements IndexDAO { private static final int RETRY_COUNT = 3; + private final int archiveSearchBatchSize; + private String indexName; private String logIndexName; @@ -140,6 +138,7 @@ public ElasticSearchDAO(Client elasticSearchClient, Configuration config, Object this.objectMapper = objectMapper; this.elasticSearchClient = elasticSearchClient; this.indexName = config.getProperty("workflow.elasticsearch.index.name", null); + this.archiveSearchBatchSize = config.getIntProperty("workflow.elasticsearch.archive.search.batchSize", 5000); try { @@ -499,7 +498,6 @@ private SearchResult search(String structuredQuery, int start, int size, @Override public List searchArchivableWorkflows(String indexName, long archiveTtlDays) { QueryBuilder q = QueryBuilders.boolQuery() - .must(QueryBuilders.rangeQuery("endTime").lt(LocalDate.now().minusDays(archiveTtlDays))) .should(QueryBuilders.termQuery("status", "COMPLETED")) .should(QueryBuilders.termQuery("status", "FAILED")) .mustNot(QueryBuilders.existsQuery("archived")) @@ -507,11 +505,14 @@ public List searchArchivableWorkflows(String indexName, long archiveTtlD SearchRequestBuilder s = elasticSearchClient.prepareSearch(indexName) .setTypes("workflow") .setQuery(q) - .setSize(1000); + .addSort("endTime", SortOrder.ASC) + .setSize(archiveSearchBatchSize); SearchResponse response = s.execute().actionGet(); SearchHits hits = response.getHits(); + logger.info("Archive search totalHits - {}", hits.getTotalHits()); + List ids = new LinkedList<>(); for (SearchHit hit : hits.getHits()) { ids.add(hit.getId());