Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Removed date range from the query to search for archivable workflows.
Browse files Browse the repository at this point in the history
  • Loading branch information
Senthil Sayeebaba authored and s3nthil committed Dec 14, 2018
1 parent 07ec385 commit e169684
Showing 1 changed file with 26 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package com.netflix.conductor.dao.es.index;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicate;
import com.netflix.conductor.annotations.Trace;
import com.netflix.conductor.common.metadata.events.EventExecution;
import com.netflix.conductor.common.metadata.tasks.Task;
Expand All @@ -37,6 +36,25 @@
import com.netflix.conductor.dao.es.index.query.parser.ParserException;
import com.netflix.conductor.dao.es.utils.RetryUtil;
import com.netflix.conductor.metrics.Monitors;
import java.io.InputStream;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
Expand Down Expand Up @@ -68,28 +86,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import javax.inject.Singleton;
import java.io.InputStream;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
* @author Viren
*
Expand All @@ -114,6 +110,8 @@ public class ElasticSearchDAO implements IndexDAO {

private static final int RETRY_COUNT = 3;

private final int archiveSearchBatchSize;

private String indexName;

private String logIndexName;
Expand All @@ -140,6 +138,7 @@ public ElasticSearchDAO(Client elasticSearchClient, Configuration config, Object
this.objectMapper = objectMapper;
this.elasticSearchClient = elasticSearchClient;
this.indexName = config.getProperty("workflow.elasticsearch.index.name", null);
this.archiveSearchBatchSize = config.getIntProperty("workflow.elasticsearch.archive.search.batchSize", 5000);

try {

Expand Down Expand Up @@ -499,19 +498,21 @@ private SearchResult<String> search(String structuredQuery, int start, int size,
@Override
public List<String> searchArchivableWorkflows(String indexName, long archiveTtlDays) {
QueryBuilder q = QueryBuilders.boolQuery()
.must(QueryBuilders.rangeQuery("endTime").lt(LocalDate.now().minusDays(archiveTtlDays)))
.should(QueryBuilders.termQuery("status", "COMPLETED"))
.should(QueryBuilders.termQuery("status", "FAILED"))
.mustNot(QueryBuilders.existsQuery("archived"))
.minimumNumberShouldMatch(1);
SearchRequestBuilder s = elasticSearchClient.prepareSearch(indexName)
.setTypes("workflow")
.setQuery(q)
.setSize(1000);
.addSort("endTime", SortOrder.ASC)
.setSize(archiveSearchBatchSize);

SearchResponse response = s.execute().actionGet();

SearchHits hits = response.getHits();
logger.info("Archive search totalHits - {}", hits.getTotalHits());

List<String> ids = new LinkedList<>();
for (SearchHit hit : hits.getHits()) {
ids.add(hit.getId());
Expand Down

0 comments on commit e169684

Please sign in to comment.