From 62f8244720f69d3e35d71b61b22e333eaa390e99 Mon Sep 17 00:00:00 2001 From: Jeffrey Starker Date: Fri, 26 Oct 2018 16:52:26 -0500 Subject: [PATCH 1/3] Adds ES5 REST index DAO for conductor + With ES5 and later, there is a big push to use the REST client rather than the TCP transport client. Not only that, but some services such as AWS ElasticSearch only support the HTTP REST client. This is an attempt to add the ES 5 REST client for conductor. + Fix logger in es5-persistence tests --- es5-persistence/README.md | 65 ++ es5-persistence/build.gradle | 3 + es5-persistence/dependencies.lock | 60 +- .../dao/es5/index/ElasticSearchRestDAOV5.java | 680 ++++++++++++++++++ .../elasticsearch/ElasticSearchModule.java | 8 + .../ElasticSearchRestClientProvider.java | 12 +- .../es5/ElasticSearchV5Module.java | 30 +- .../TestElasticSearchRestConfiguration.java | 11 + .../es5/index/TestElasticSearchRestDAOV5.java | 179 +++++ .../src/test/resources/log4j.properties | 9 + server/dependencies.lock | 66 ++ .../conductor/bootstrap/ModulesProvider.java | 4 +- test-harness/dependencies.lock | 24 + 13 files changed, 1134 insertions(+), 17 deletions(-) create mode 100644 es5-persistence/src/main/java/com/netflix/conductor/dao/es5/index/ElasticSearchRestDAOV5.java create mode 100644 es5-persistence/src/test/java/com/netflix/conductor/com/netflix/conductor/elasticsearch/TestElasticSearchRestConfiguration.java create mode 100644 es5-persistence/src/test/java/com/netflix/conductor/dao/es5/index/TestElasticSearchRestDAOV5.java create mode 100644 es5-persistence/src/test/resources/log4j.properties diff --git a/es5-persistence/README.md b/es5-persistence/README.md index 272dd6d516..fed42832df 100644 --- a/es5-persistence/README.md +++ b/es5-persistence/README.md @@ -1,2 +1,67 @@ +# ES5 Persistence + +This module provides ES5 persistence when indexing workflows and tasks. + ## Usage +This module uses the following configuration options: + +* `workflow.elasticsearch.instanceType` - This determines the type of ES instance we are using with conductor. +The two values are either `MEMORY` or `EXTERNAL`. +If `MEMORY`, then an embedded server will be run. +Default is `MEMORY`. +* `workflow.elasticsearch.url` - A comma separated list of schema/host/port of the ES nodes to communicate with. +Schema can be ignored when using `tcp` transport; otherwise, you must specify `http` or `https`. +If using the `http` or `https`, then conductor will use the REST transport protocol. +* `workflow.elasticsearch.index.name` - The name of the workflow and task index. +Defaults to `conductor` +* `workflow.elasticsearch.tasklog.index.name` - The name of the task log index. +Defaults to `task_log` + +### Embedded Configuration + +If `workflow.elasticsearch.instanceType=MEMORY`, then you can configure the embedded server using the following configurations: + +* `workflow.elasticsearch.embedded.port` - The starting port of the embedded server. +This is the port used for the TCP transport. +It will also use this + 100 in order to setup the http transport. +Default is `9200` +* `workflow.elasticsearch.embedded.cluster.name` - The name of the embedded cluster name. +Default is `elasticsearch_test` +* `workflow.elasticsearch.embedded.host` - The host of the embedded server. +Default is `127.0.0.1` + +### REST Transport + +If you are using AWS ElasticSearch, you should use the `rest` transport as that's the only version transport that they support. +However, this module currently only works with open IAM, VPC version of ElasticSearch. +Eventually, we should create ES modules that can be loaded in to support authentication and request signing, but this currently does not support that. + +### Example Configurations + +**In-memory ES with TCP transport** + +``` +workflow.elasticsearch.instanceType=MEMORY +``` + +**In-memory ES with REST transport** + +``` +workflow.elasticsearch.instanceType=MEMORY +workflow.elasticsearch.url=http://localhost:9300 +``` + +**ES with TCP transport** + +``` +workflow.elasticsearch.instanceType=EXTERNAL +workflow.elasticsearch.url=127.0.0.1:9300 +``` + +**ES with REST transport** + +``` +workflow.elasticsearch.instanceType=EXTERNAL +workflow.elasticsearch.url=http://127.0.0.1:9200 +``` diff --git a/es5-persistence/build.gradle b/es5-persistence/build.gradle index a36128e0b4..944ba7e051 100644 --- a/es5-persistence/build.gradle +++ b/es5-persistence/build.gradle @@ -5,9 +5,12 @@ dependencies { compile "org.elasticsearch:elasticsearch:${revElasticSearch5}" compile "org.elasticsearch.client:transport:${revElasticSearch5}" + compile "org.elasticsearch.client:elasticsearch-rest-client:${revElasticSearch5}" compile "org.elasticsearch.client:elasticsearch-rest-high-level-client:${revElasticSearch5}" //ES5 Dependency compile "org.apache.logging.log4j:log4j-api:${revLog4jApi}" compile "org.apache.logging.log4j:log4j-core:${revLog4jCore}" + + testCompile "org.slf4j:slf4j-log4j12:${revSlf4jlog4j}" } diff --git a/es5-persistence/dependencies.lock b/es5-persistence/dependencies.lock index 88f9de49a4..6085cb8c6f 100644 --- a/es5-persistence/dependencies.lock +++ b/es5-persistence/dependencies.lock @@ -113,6 +113,10 @@ "locked": "2.9.1", "requested": "2.9.1" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "locked": "5.6.8", + "requested": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "locked": "5.6.8", "requested": "5.6.8" @@ -246,6 +250,10 @@ "locked": "2.9.1", "requested": "2.9.1" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "locked": "5.6.8", + "requested": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "locked": "5.6.8", "requested": "5.6.8" @@ -379,6 +387,10 @@ "locked": "2.9.1", "requested": "2.9.1" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "locked": "5.6.8", + "requested": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "locked": "5.6.8", "requested": "5.6.8" @@ -522,6 +534,10 @@ "locked": "2.9.1", "requested": "2.9.1" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "locked": "5.6.8", + "requested": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "locked": "5.6.8", "requested": "5.6.8" @@ -655,6 +671,10 @@ "locked": "2.9.1", "requested": "2.9.1" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "locked": "5.6.8", + "requested": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "locked": "5.6.8", "requested": "5.6.8" @@ -792,6 +812,10 @@ "locked": "2.9.1", "requested": "2.9.1" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "locked": "5.6.8", + "requested": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "locked": "5.6.8", "requested": "5.6.8" @@ -812,7 +836,11 @@ "firstLevelTransitive": [ "com.netflix.conductor:conductor-common" ], - "locked": "1.7.25" + "locked": "1.8.0-alpha1" + }, + "org.slf4j:slf4j-log4j12": { + "locked": "1.8.0-alpha1", + "requested": "1.8.0-alpha1" } }, "testCompileClasspath": { @@ -933,6 +961,10 @@ "locked": "2.9.1", "requested": "2.9.1" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "locked": "5.6.8", + "requested": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "locked": "5.6.8", "requested": "5.6.8" @@ -953,7 +985,11 @@ "firstLevelTransitive": [ "com.netflix.conductor:conductor-common" ], - "locked": "1.7.25" + "locked": "1.8.0-alpha1" + }, + "org.slf4j:slf4j-log4j12": { + "locked": "1.8.0-alpha1", + "requested": "1.8.0-alpha1" } }, "testRuntime": { @@ -1074,6 +1110,10 @@ "locked": "2.9.1", "requested": "2.9.1" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "locked": "5.6.8", + "requested": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "locked": "5.6.8", "requested": "5.6.8" @@ -1094,7 +1134,11 @@ "firstLevelTransitive": [ "com.netflix.conductor:conductor-common" ], - "locked": "1.7.25" + "locked": "1.8.0-alpha1" + }, + "org.slf4j:slf4j-log4j12": { + "locked": "1.8.0-alpha1", + "requested": "1.8.0-alpha1" } }, "testRuntimeClasspath": { @@ -1215,6 +1259,10 @@ "locked": "2.9.1", "requested": "2.9.1" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "locked": "5.6.8", + "requested": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "locked": "5.6.8", "requested": "5.6.8" @@ -1235,7 +1283,11 @@ "firstLevelTransitive": [ "com.netflix.conductor:conductor-common" ], - "locked": "1.7.25" + "locked": "1.8.0-alpha1" + }, + "org.slf4j:slf4j-log4j12": { + "locked": "1.8.0-alpha1", + "requested": "1.8.0-alpha1" } } } \ No newline at end of file diff --git a/es5-persistence/src/main/java/com/netflix/conductor/dao/es5/index/ElasticSearchRestDAOV5.java b/es5-persistence/src/main/java/com/netflix/conductor/dao/es5/index/ElasticSearchRestDAOV5.java new file mode 100644 index 0000000000..0b7c19671e --- /dev/null +++ b/es5-persistence/src/main/java/com/netflix/conductor/dao/es5/index/ElasticSearchRestDAOV5.java @@ -0,0 +1,680 @@ +package com.netflix.conductor.dao.es5.index; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.netflix.conductor.annotations.Trace; +import com.netflix.conductor.common.metadata.events.EventExecution; +import com.netflix.conductor.common.metadata.tasks.Task; +import com.netflix.conductor.common.metadata.tasks.TaskExecLog; +import com.netflix.conductor.common.run.SearchResult; +import com.netflix.conductor.common.run.TaskSummary; +import com.netflix.conductor.common.run.Workflow; +import com.netflix.conductor.common.run.WorkflowSummary; +import com.netflix.conductor.common.utils.RetryUtil; +import com.netflix.conductor.core.events.queue.Message; +import com.netflix.conductor.core.execution.ApplicationException; +import com.netflix.conductor.dao.IndexDAO; +import com.netflix.conductor.dao.es5.index.query.parser.Expression; +import com.netflix.conductor.elasticsearch.query.parser.ParserException; +import com.netflix.conductor.elasticsearch.ElasticSearchConfiguration; +import com.netflix.conductor.metrics.Monitors; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpEntity; +import org.apache.http.HttpStatus; +import org.apache.http.entity.ContentType; +import org.apache.http.nio.entity.NByteArrayEntity; +import org.apache.http.util.EntityUtils; +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.action.update.UpdateResponse; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.query.QueryStringQueryBuilder; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.FieldSortBuilder; +import org.elasticsearch.search.sort.SortOrder; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Inject; +import javax.inject.Singleton; +import java.io.IOException; +import java.io.InputStream; +import java.text.SimpleDateFormat; +import java.time.LocalDate; +import java.util.*; +import java.util.concurrent.*; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + + +@Trace +@Singleton +public class ElasticSearchRestDAOV5 implements IndexDAO { + + private static Logger logger = LoggerFactory.getLogger(ElasticSearchRestDAOV5.class); + + private static final int RETRY_COUNT = 3; + + private static final String WORKFLOW_DOC_TYPE = "workflow"; + + private static final String TASK_DOC_TYPE = "task"; + + private static final String LOG_DOC_TYPE = "task_log"; + + private static final String EVENT_DOC_TYPE = "event"; + + private static final String MSG_DOC_TYPE = "message"; + + private @interface HttpMethod { + String GET = "GET"; + String POST = "POST"; + String PUT = "PUT"; + String HEAD = "HEAD"; + } + + private static final String className = ElasticSearchRestDAOV5.class.getSimpleName(); + + private String indexName; + private String logIndexPrefix; + private String logIndexName; + + private ObjectMapper objectMapper; + private RestHighLevelClient elasticSearchClient; + private RestClient elasticSearchAdminClient; + + private final ExecutorService executorService; + + private static final TimeZone GMT = TimeZone.getTimeZone("GMT"); + private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat("yyyyMMww"); + static { + SIMPLE_DATE_FORMAT.setTimeZone(GMT); + } + + @Inject + public ElasticSearchRestDAOV5(RestClient lowLevelRestClient, ElasticSearchConfiguration config, ObjectMapper objectMapper) { + + this.objectMapper = objectMapper; + this.elasticSearchAdminClient = lowLevelRestClient; + this.elasticSearchClient = new RestHighLevelClient(lowLevelRestClient); + this.indexName = config.getIndexName(); + this.logIndexPrefix = config.getTasklogIndexName(); + + // Set up a workerpool for performing async operations. + int corePoolSize = 6; + int maximumPoolSize = 12; + long keepAliveTime = 1L; + this.executorService = new ThreadPoolExecutor(corePoolSize, + maximumPoolSize, + keepAliveTime, + TimeUnit.MINUTES, + new LinkedBlockingQueue<>()); + + } + + @Override + public void setup() throws Exception { + waitForGreenCluster(); + + try { + initIndex(); + updateIndexName(); + Executors.newScheduledThreadPool(1).scheduleAtFixedRate(this::updateIndexName, 0, 1, TimeUnit.HOURS); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + + //1. Create the required index + try { + addIndex(indexName); + } catch (IOException e) { + logger.error("Failed to initialize index '{}'", indexName, e); + } + + //2. Add mappings for the workflow document type + try { + addMappingToIndex(indexName, WORKFLOW_DOC_TYPE, "/mappings_docType_workflow.json"); + } catch (IOException e) { + logger.error("Failed to add {} mapping", WORKFLOW_DOC_TYPE); + } + + //3. Add mappings for task document type + try { + addMappingToIndex(indexName, TASK_DOC_TYPE, "/mappings_docType_task.json"); + } catch (IOException e) { + logger.error("Failed to add {} mapping", TASK_DOC_TYPE); + } + } + + /** + * Waits for the ES cluster to become green. + * @throws Exception If there is an issue connecting with the ES cluster. + */ + private void waitForGreenCluster() throws Exception { + Map params = new HashMap<>(); + params.put("wait_for_status", "green"); + params.put("timeout", "30s"); + + elasticSearchAdminClient.performRequest("GET", "/_cluster/health", params); + } + + /** + * Roll the tasklog index daily. + */ + private void updateIndexName() { + this.logIndexName = this.logIndexPrefix + "_" + SIMPLE_DATE_FORMAT.format(new Date()); + + try { + addIndex(logIndexName); + } catch (IOException e) { + logger.error("Failed to update log index name: {}", logIndexName, e); + } + } + + /** + * Initializes the index with the required templates and mappings. + */ + private void initIndex() throws Exception { + + //0. Add the tasklog template + if (doesResourceNotExist("/_template/tasklog_template")) { + logger.info("Creating the index template 'tasklog_template'"); + InputStream stream = ElasticSearchDAOV5.class.getResourceAsStream("/template_tasklog.json"); + byte[] templateSource = IOUtils.toByteArray(stream); + + HttpEntity entity = new NByteArrayEntity(templateSource, ContentType.APPLICATION_JSON); + try { + elasticSearchAdminClient.performRequest(HttpMethod.PUT, "/_template/tasklog_template", Collections.emptyMap(), entity); + } catch (IOException e) { + logger.error("Failed to initialize tasklog_template", e); + } + } + } + + /** + * Adds an index to elasticsearch if it does not exist. + * + * @param index The name of the index to create. + * @throws IOException If an error occurred during requests to ES. + */ + private void addIndex(final String index) throws IOException { + + logger.info("Adding index '{}'...", index); + + String resourcePath = "/" + index; + + if (doesResourceNotExist(resourcePath)) { + + try { + elasticSearchAdminClient.performRequest(HttpMethod.PUT, resourcePath); + + logger.info("Added '{}' index", index); + } catch (ResponseException e) { + + boolean errorCreatingIndex = true; + + Response errorResponse = e.getResponse(); + if (errorResponse.getStatusLine().getStatusCode() == HttpStatus.SC_BAD_REQUEST) { + JsonNode root = objectMapper.readTree(EntityUtils.toString(errorResponse.getEntity())); + String errorCode = root.get("error").get("type").asText(); + if ("index_already_exists_exception".equals(errorCode)) { + errorCreatingIndex = false; + } + } + + if (errorCreatingIndex) { + throw e; + } + } + } else { + logger.info("Index '{}' already exists", index); + } + } + + /** + * Adds a mapping type to an index if it does not exist. + * + * @param index The name of the index. + * @param mappingType The name of the mapping type. + * @param mappingFilename The name of the mapping file to use to add the mapping if it does not exist. + * @throws IOException If an error occurred during requests to ES. + */ + private void addMappingToIndex(final String index, final String mappingType, final String mappingFilename) throws IOException { + + logger.info("Adding '{}' mapping to index '{}'...", mappingType, index); + + String resourcePath = "/" + index + "/_mapping/" + mappingType; + + if (doesResourceNotExist(resourcePath)) { + InputStream stream = ElasticSearchDAOV5.class.getResourceAsStream(mappingFilename); + byte[] mappingSource = IOUtils.toByteArray(stream); + + HttpEntity entity = new NByteArrayEntity(mappingSource, ContentType.APPLICATION_JSON); + elasticSearchAdminClient.performRequest(HttpMethod.PUT, resourcePath, Collections.emptyMap(), entity); + logger.info("Added '{}' mapping", mappingType); + } else { + logger.info("Mapping '{}' already exists", mappingType); + } + } + + /** + * Determines whether a resource exists in ES. This will call a GET method to a particular path and + * return true if status 200; false otherwise. + * + * @param resourcePath The path of the resource to get. + * @return True if it exists; false otherwise. + * @throws IOException If an error occurred during requests to ES. + */ + public boolean doesResourceExist(final String resourcePath) throws IOException { + Response response = elasticSearchAdminClient.performRequest(HttpMethod.HEAD, resourcePath); + return response.getStatusLine().getStatusCode() == HttpStatus.SC_OK; + } + + /** + * The inverse of doesResourceExist. + * + * @param resourcePath The path of the resource to check. + * @return True if it does not exist; false otherwise. + * @throws IOException If an error occurred during requests to ES. + */ + public boolean doesResourceNotExist(final String resourcePath) throws IOException { + return !doesResourceExist(resourcePath); + } + + @Override + public void indexWorkflow(Workflow workflow) { + + String workflowId = workflow.getWorkflowId(); + WorkflowSummary summary = new WorkflowSummary(workflow); + + indexObject(indexName, WORKFLOW_DOC_TYPE, workflowId, summary); + } + + @Override + public CompletableFuture asyncIndexWorkflow(Workflow workflow) { + return CompletableFuture.runAsync(() -> indexWorkflow(workflow), executorService); + } + + @Override + public void indexTask(Task task) { + + String taskId = task.getTaskId(); + TaskSummary summary = new TaskSummary(task); + + indexObject(indexName, TASK_DOC_TYPE, taskId, summary); + } + + @Override + public CompletableFuture asyncIndexTask(Task task) { + return CompletableFuture.runAsync(() -> indexTask(task), executorService); + } + + @Override + public void addTaskExecutionLogs(List taskExecLogs) { + if (taskExecLogs.isEmpty()) { + return; + } + + BulkRequest bulkRequest = new BulkRequest(); + + for (TaskExecLog log : taskExecLogs) { + + byte[] docBytes; + try { + docBytes = objectMapper.writeValueAsBytes(log); + } catch (JsonProcessingException e) { + logger.error("Failed to convert task log to JSON for task {}", log.getTaskId()); + continue; + } + + IndexRequest request = new IndexRequest(logIndexName, LOG_DOC_TYPE); + request.source(docBytes, XContentType.JSON); + bulkRequest.add(request); + } + + try { + new RetryUtil().retryOnException(() -> { + try { + return elasticSearchClient.bulk(bulkRequest); + } catch (IOException e) { + throw new RuntimeException(e); + } + }, null, BulkResponse::hasFailures, RETRY_COUNT, "Indexing all execution logs into doc_type task", "addTaskExecutionLogs"); + } catch (Exception e) { + List taskIds = taskExecLogs.stream().map(TaskExecLog::getTaskId).collect(Collectors.toList()); + logger.error("Failed to index task execution logs for tasks: {}", taskIds, e); + } + } + + @Override + public CompletableFuture asyncAddTaskExecutionLogs(List logs) { + return CompletableFuture.runAsync(() -> addTaskExecutionLogs(logs), executorService); + } + + @Override + public List getTaskExecutionLogs(String taskId) { + + try { + + // Build Query + Expression expression = Expression.fromString("taskId='" + taskId + "'"); + QueryBuilder queryBuilder = expression.getFilterBuilder(); + + BoolQueryBuilder filterQuery = QueryBuilders.boolQuery().must(queryBuilder); + QueryStringQueryBuilder stringQuery = QueryBuilders.queryStringQuery("*"); + BoolQueryBuilder fq = QueryBuilders.boolQuery().must(stringQuery).must(filterQuery); + + // Create the searchObjectIdsViaExpression source + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(fq); + searchSourceBuilder.sort(new FieldSortBuilder("createdTime").order(SortOrder.ASC)); + + // Generate the actual request to send to ES. + SearchRequest searchRequest = new SearchRequest(logIndexPrefix + "*"); + searchRequest.types(LOG_DOC_TYPE); + searchRequest.source(searchSourceBuilder); + + SearchResponse response = elasticSearchClient.search(searchRequest); + + SearchHit[] hits = response.getHits().getHits(); + List logs = new ArrayList<>(hits.length); + for(SearchHit hit : hits) { + String source = hit.getSourceAsString(); + TaskExecLog tel = objectMapper.readValue(source, TaskExecLog.class); + logs.add(tel); + } + + return logs; + + }catch(Exception e) { + logger.error("Failed to get task execution logs for task: {}", taskId, e); + } + + return null; + } + + @Override + public void addMessage(String queue, Message message) { + Map doc = new HashMap<>(); + doc.put("messageId", message.getId()); + doc.put("payload", message.getPayload()); + doc.put("queue", queue); + doc.put("created", System.currentTimeMillis()); + + indexObject(logIndexName, MSG_DOC_TYPE, doc); + } + + @Override + public void addEventExecution(EventExecution eventExecution) { + String id = eventExecution.getName() + "." + eventExecution.getEvent() + "." + eventExecution.getMessageId() + "." + eventExecution.getId(); + + indexObject(logIndexName, EVENT_DOC_TYPE, id, eventExecution); + } + + @Override + public CompletableFuture asyncAddEventExecution(EventExecution eventExecution) { + return CompletableFuture.runAsync(() -> addEventExecution(eventExecution), executorService); + } + + @Override + public SearchResult searchWorkflows(String query, String freeText, int start, int count, List sort) { + try { + return searchObjectIdsViaExpression(query, start, count, sort, freeText, WORKFLOW_DOC_TYPE); + } catch (Exception e) { + throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, e.getMessage(), e); + } + } + + @Override + public SearchResult searchTasks(String query, String freeText, int start, int count, List sort) { + try { + return searchObjectIdsViaExpression(query, start, count, sort, freeText, TASK_DOC_TYPE); + } catch (Exception e) { + throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, e.getMessage(), e); + } + } + + @Override + public void removeWorkflow(String workflowId) { + + DeleteRequest request = new DeleteRequest(indexName, WORKFLOW_DOC_TYPE, workflowId); + + try { + DeleteResponse response = elasticSearchClient.delete(request); + + if (response.getResult() == DocWriteResponse.Result.NOT_FOUND) { + logger.error("Index removal failed - document not found by id: {}", workflowId); + } + + } catch (IOException e) { + logger.error("Failed to remove workflow {} from index", workflowId, e); + Monitors.error(className, "remove"); + } + } + + @Override + public CompletableFuture asyncRemoveWorkflow(String workflowId) { + return CompletableFuture.runAsync(() -> removeWorkflow(workflowId), executorService); + } + + @Override + public void updateWorkflow(String workflowInstanceId, String[] keys, Object[] values) { + + if (keys.length != values.length) { + throw new ApplicationException(ApplicationException.Code.INVALID_INPUT, "Number of keys and values do not match"); + } + + UpdateRequest request = new UpdateRequest(indexName, WORKFLOW_DOC_TYPE, workflowInstanceId); + Map source = IntStream.range(0, keys.length).boxed() + .collect(Collectors.toMap(i -> keys[i], i -> values[i])); + request.doc(source); + + logger.debug("Updating workflow {} with {}", workflowInstanceId, source); + + new RetryUtil().retryOnException(() -> { + try { + return elasticSearchClient.update(request); + } catch (IOException e) { + throw new RuntimeException(e); + } + }, null, null, RETRY_COUNT, "Updating index for doc_type workflow", "updateWorkflow"); + } + + @Override + public CompletableFuture asyncUpdateWorkflow(String workflowInstanceId, String[] keys, Object[] values) { + return CompletableFuture.runAsync(() -> updateWorkflow(workflowInstanceId, keys, values), executorService); + } + + @Override + public String get(String workflowInstanceId, String fieldToGet) { + + GetRequest request = new GetRequest(indexName, WORKFLOW_DOC_TYPE, workflowInstanceId); + + GetResponse response; + try { + response = elasticSearchClient.get(request); + } catch (IOException e) { + logger.error("Unable to get Workflow: {} from ElasticSearch index: {}", workflowInstanceId, indexName, e); + return null; + } + + if (response.isExists()){ + Map sourceAsMap = response.getSourceAsMap(); + if (sourceAsMap.containsKey(fieldToGet)){ + return sourceAsMap.get(fieldToGet).toString(); + } + } + + logger.debug("Unable to find Workflow: {} in ElasticSearch index: {}.", workflowInstanceId, indexName); + return null; + } + + private SearchResult searchObjectIdsViaExpression(String structuredQuery, int start, int size, List sortOptions, String freeTextQuery, String docType) throws ParserException, IOException { + + // Build query + QueryBuilder queryBuilder = QueryBuilders.matchAllQuery(); + if(StringUtils.isNotEmpty(structuredQuery)) { + Expression expression = Expression.fromString(structuredQuery); + queryBuilder = expression.getFilterBuilder(); + } + + BoolQueryBuilder filterQuery = QueryBuilders.boolQuery().must(queryBuilder); + QueryStringQueryBuilder stringQuery = QueryBuilders.queryStringQuery(freeTextQuery); + BoolQueryBuilder fq = QueryBuilders.boolQuery().must(stringQuery).must(filterQuery); + + return searchObjectIds(indexName, fq, start, size, sortOptions, docType); + } + + private SearchResult searchObjectIds(String indexName, QueryBuilder queryBuilder, int start, int size, String docType) throws IOException { + return searchObjectIds(indexName, queryBuilder, start, size, null, docType); + } + + /** + * Tries to find object ids for a given query in an index. + * + * @param indexName The name of the index. + * @param queryBuilder The query to use for searching. + * @param start The start to use. + * @param size The total return size. + * @param sortOptions A list of string options to sort in the form VALUE:ORDER; where ORDER is optional and can be either ASC OR DESC. + * @param docType The document type to searchObjectIdsViaExpression for. + * + * @return The SearchResults which includes the count and IDs that were found. + * @throws IOException If we cannot communicate with ES. + */ + private SearchResult searchObjectIds(String indexName, QueryBuilder queryBuilder, int start, int size, List sortOptions, String docType) throws IOException { + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(queryBuilder); + searchSourceBuilder.from(start); + searchSourceBuilder.size(size); + + if (sortOptions != null && !sortOptions.isEmpty()) { + + for (String sortOption : sortOptions) { + SortOrder order = SortOrder.ASC; + String field = sortOption; + int index = sortOption.indexOf(":"); + if (index > 0) { + field = sortOption.substring(0, index); + order = SortOrder.valueOf(sortOption.substring(index + 1)); + } + searchSourceBuilder.sort(new FieldSortBuilder(field).order(order)); + } + } + + // Generate the actual request to send to ES. + SearchRequest searchRequest = new SearchRequest(indexName); + searchRequest.types(docType); + searchRequest.source(searchSourceBuilder); + + SearchResponse response = elasticSearchClient.search(searchRequest); + + List result = new LinkedList<>(); + response.getHits().forEach(hit -> result.add(hit.getId())); + long count = response.getHits().getTotalHits(); + return new SearchResult<>(count, result); + } + + @Override + public List searchArchivableWorkflows(String indexName, long archiveTtlDays) { + QueryBuilder q = QueryBuilders.boolQuery() + .must(QueryBuilders.rangeQuery("endTime").lt(LocalDate.now().minusDays(archiveTtlDays).toString())) + .should(QueryBuilders.termQuery("status", "COMPLETED")) + .should(QueryBuilders.termQuery("status", "FAILED")) + .mustNot(QueryBuilders.existsQuery("archived")) + .minimumShouldMatch(1); + + SearchResult workflowIds; + try { + workflowIds = searchObjectIds(indexName, q, 0, 1000, WORKFLOW_DOC_TYPE); + } catch (IOException e) { + logger.error("Unable to communicate with ES to find archivable workflows", e); + return Collections.emptyList(); + } + + return workflowIds.getResults(); + } + + public List searchRecentRunningWorkflows(int lastModifiedHoursAgoFrom, int lastModifiedHoursAgoTo) { + DateTime dateTime = new DateTime(); + QueryBuilder q = QueryBuilders.boolQuery() + .must(QueryBuilders.rangeQuery("updateTime") + .gt(dateTime.minusHours(lastModifiedHoursAgoFrom))) + .must(QueryBuilders.rangeQuery("updateTime") + .lt(dateTime.minusHours(lastModifiedHoursAgoTo))) + .must(QueryBuilders.termQuery("status", "RUNNING")); + + SearchResult workflowIds; + try { + workflowIds = searchObjectIds(indexName, q, 0, 5000, Collections.singletonList("updateTime:ASC"), WORKFLOW_DOC_TYPE); + } catch (IOException e) { + logger.error("Unable to communicate with ES to find recent running workflows", e); + return Collections.emptyList(); + } + + return workflowIds.getResults(); + } + + private void indexObject(final String index, final String docType, final Object doc) { + indexObject(index, docType, null, doc); + } + + private void indexObject(final String index, final String docType, final String docId, final Object doc) { + + byte[] docBytes; + try { + docBytes = objectMapper.writeValueAsBytes(doc); + } catch (JsonProcessingException e) { + logger.error("Failed to convert {} '{}' to byte string", docType, docId); + return; + } + + IndexRequest request = new IndexRequest(index, docType, docId); + request.source(docBytes, XContentType.JSON); + + indexWithRetry(request, "Indexing " + docType + ": " + docId); + } + + /** + * Performs an index operation with a retry. + * @param request The index request that we want to perform. + * @param operationDescription The type of operation that we are performing. + */ + private void indexWithRetry(final IndexRequest request, final String operationDescription) { + + try { + new RetryUtil().retryOnException(() -> { + try { + return elasticSearchClient.index(request); + } catch (IOException e) { + throw new RuntimeException(e); + } + }, null, null, RETRY_COUNT, operationDescription, "indexWithRetry"); + } catch (Exception e) { + Monitors.error(className, "index"); + logger.error("Failed to index {} for request type: {}", request.id(), request.type(), e); + } + } + +} \ No newline at end of file diff --git a/es5-persistence/src/main/java/com/netflix/conductor/elasticsearch/ElasticSearchModule.java b/es5-persistence/src/main/java/com/netflix/conductor/elasticsearch/ElasticSearchModule.java index b385442b66..5a0cd00817 100644 --- a/es5-persistence/src/main/java/com/netflix/conductor/elasticsearch/ElasticSearchModule.java +++ b/es5-persistence/src/main/java/com/netflix/conductor/elasticsearch/ElasticSearchModule.java @@ -3,12 +3,20 @@ import com.google.inject.AbstractModule; import com.google.inject.Singleton; +import com.netflix.conductor.elasticsearch.es5.ElasticSearchV5Module; import org.elasticsearch.client.Client; +import org.elasticsearch.client.RestClient; public class ElasticSearchModule extends AbstractModule { @Override protected void configure() { + + ElasticSearchConfiguration esConfiguration = new SystemPropertiesElasticSearchConfiguration(); + bind(ElasticSearchConfiguration.class).to(SystemPropertiesElasticSearchConfiguration.class); bind(Client.class).toProvider(ElasticSearchTransportClientProvider.class).in(Singleton.class); + bind(RestClient.class).toProvider(ElasticSearchRestClientProvider.class).in(Singleton.class); + + install(new ElasticSearchV5Module(esConfiguration)); } } diff --git a/es5-persistence/src/main/java/com/netflix/conductor/elasticsearch/ElasticSearchRestClientProvider.java b/es5-persistence/src/main/java/com/netflix/conductor/elasticsearch/ElasticSearchRestClientProvider.java index a21e0bc2a6..c51207af8d 100644 --- a/es5-persistence/src/main/java/com/netflix/conductor/elasticsearch/ElasticSearchRestClientProvider.java +++ b/es5-persistence/src/main/java/com/netflix/conductor/elasticsearch/ElasticSearchRestClientProvider.java @@ -2,7 +2,6 @@ import org.apache.http.HttpHost; import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestHighLevelClient; import java.net.URI; import java.util.List; @@ -11,7 +10,7 @@ import javax.inject.Inject; import javax.inject.Provider; -public class ElasticSearchRestClientProvider implements Provider { +public class ElasticSearchRestClientProvider implements Provider { private final ElasticSearchConfiguration configuration; @Inject @@ -20,15 +19,14 @@ public ElasticSearchRestClientProvider(ElasticSearchConfiguration configuration) } @Override - public RestHighLevelClient get() { - RestClient lowLevelRestClient = RestClient.builder(convertToHttpHosts(configuration.getURIs())).build(); - return new RestHighLevelClient(lowLevelRestClient); + public RestClient get() { + return RestClient.builder(convertToHttpHosts(configuration.getURIs())).build(); } private HttpHost[] convertToHttpHosts(List hosts) { List list = hosts.stream().map(host -> - new HttpHost(host.getHost(), host.getPort())) + new HttpHost(host.getHost(), host.getPort(), host.getScheme())) .collect(Collectors.toList()); - return list.toArray(new HttpHost[0]); + return list.toArray(new HttpHost[list.size()]); } } diff --git a/es5-persistence/src/main/java/com/netflix/conductor/elasticsearch/es5/ElasticSearchV5Module.java b/es5-persistence/src/main/java/com/netflix/conductor/elasticsearch/es5/ElasticSearchV5Module.java index 3592fd1bca..f10e2bc287 100644 --- a/es5-persistence/src/main/java/com/netflix/conductor/elasticsearch/es5/ElasticSearchV5Module.java +++ b/es5-persistence/src/main/java/com/netflix/conductor/elasticsearch/es5/ElasticSearchV5Module.java @@ -19,20 +19,42 @@ import com.netflix.conductor.dao.IndexDAO; import com.netflix.conductor.dao.es5.index.ElasticSearchDAOV5; -import com.netflix.conductor.elasticsearch.ElasticSearchModule; +import com.netflix.conductor.dao.es5.index.ElasticSearchRestDAOV5; +import com.netflix.conductor.elasticsearch.ElasticSearchConfiguration; import com.netflix.conductor.elasticsearch.EmbeddedElasticSearchProvider; +import java.util.HashSet; +import java.util.Set; + /** * @author Viren - * Provider for the elasticsearch transport client + * Provider for the elasticsearch index DAO. */ public class ElasticSearchV5Module extends AbstractModule { + private boolean restTransport; + + public ElasticSearchV5Module(ElasticSearchConfiguration elasticSearchConfiguration) { + + Set REST_SCHEMAS = new HashSet<>(); + REST_SCHEMAS.add("http"); + REST_SCHEMAS.add("https"); + + String esTransport = elasticSearchConfiguration.getURIs().get(0).getScheme(); + + this.restTransport = REST_SCHEMAS.contains(esTransport); + } + @Override protected void configure() { - install(new ElasticSearchModule()); - bind(IndexDAO.class).to(ElasticSearchDAOV5.class); + + if (restTransport) { + bind(IndexDAO.class).to(ElasticSearchRestDAOV5.class); + } else { + bind(IndexDAO.class).to(ElasticSearchDAOV5.class); + } + bind(EmbeddedElasticSearchProvider.class).to(EmbeddedElasticSearchV5Provider.class); } } diff --git a/es5-persistence/src/test/java/com/netflix/conductor/com/netflix/conductor/elasticsearch/TestElasticSearchRestConfiguration.java b/es5-persistence/src/test/java/com/netflix/conductor/com/netflix/conductor/elasticsearch/TestElasticSearchRestConfiguration.java new file mode 100644 index 0000000000..a216d363cc --- /dev/null +++ b/es5-persistence/src/test/java/com/netflix/conductor/com/netflix/conductor/elasticsearch/TestElasticSearchRestConfiguration.java @@ -0,0 +1,11 @@ +package com.netflix.conductor.com.netflix.conductor.elasticsearch; + +import com.netflix.conductor.elasticsearch.ElasticSearchConfiguration; +import com.netflix.conductor.elasticsearch.SystemPropertiesElasticSearchConfiguration; + +public class TestElasticSearchRestConfiguration extends SystemPropertiesElasticSearchConfiguration implements ElasticSearchConfiguration { + + public String getURL() { + return "http://localhost:9200"; + } +} diff --git a/es5-persistence/src/test/java/com/netflix/conductor/dao/es5/index/TestElasticSearchRestDAOV5.java b/es5-persistence/src/test/java/com/netflix/conductor/dao/es5/index/TestElasticSearchRestDAOV5.java new file mode 100644 index 0000000000..7669910c90 --- /dev/null +++ b/es5-persistence/src/test/java/com/netflix/conductor/dao/es5/index/TestElasticSearchRestDAOV5.java @@ -0,0 +1,179 @@ +package com.netflix.conductor.dao.es5.index; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.netflix.conductor.com.netflix.conductor.elasticsearch.TestElasticSearchRestConfiguration; +import com.netflix.conductor.common.metadata.tasks.Task; +import com.netflix.conductor.common.run.Workflow; +import com.netflix.conductor.elasticsearch.ElasticSearchConfiguration; +import com.netflix.conductor.elasticsearch.ElasticSearchRestClientProvider; +import com.netflix.conductor.elasticsearch.EmbeddedElasticSearch; +import com.netflix.conductor.elasticsearch.es5.EmbeddedElasticSearchV5; +import org.elasticsearch.client.RestClient; +import org.junit.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.*; + +import static org.junit.Assert.*; + + +public class TestElasticSearchRestDAOV5 { + + private static Logger logger = LoggerFactory.getLogger(TestElasticSearchRestDAOV5.class); + + private static RestClient restClient; + private static ElasticSearchRestDAOV5 indexDAO; + private static EmbeddedElasticSearch embeddedElasticSearch; + + private Workflow workflow; + + @BeforeClass + public static void setup() throws Exception { + + ElasticSearchConfiguration configuration = new TestElasticSearchRestConfiguration(); + + String host = configuration.getEmbeddedHost(); + int port = configuration.getEmbeddedPort(); + String clusterName = configuration.getEmbeddedClusterName(); + + embeddedElasticSearch = new EmbeddedElasticSearchV5(clusterName, host, port); + embeddedElasticSearch.start(); + + ElasticSearchRestClientProvider restClientProvider = new ElasticSearchRestClientProvider(configuration); + restClient = restClientProvider.get(); + + long startTime = System.currentTimeMillis(); + + Map params = new HashMap<>(); + params.put("wait_for_status", "yellow"); + params.put("timeout", "30s"); + + while (true) { + try { + restClient.performRequest("GET", "/_cluster/health", params); + break; + } catch (IOException e) { + logger.info("No ES nodes available yet."); + } + Thread.sleep(10000); + + if (System.currentTimeMillis() - startTime > 60000) { + logger.error("Unable to connect to the ES cluster in time."); + throw new RuntimeException("Unable to connect to ES cluster in time."); + } + } + + ObjectMapper objectMapper = new ObjectMapper(); + indexDAO = new ElasticSearchRestDAOV5(restClient, configuration, objectMapper); + indexDAO.setup(); + + } + + @AfterClass + public static void closeClient() throws Exception { + if (restClient != null) { + restClient.close(); + } + + embeddedElasticSearch.stop(); + } + + @Before + public void createTestWorkflow() { + + workflow = new Workflow(); + workflow.getInput().put("requestId", "request id 001"); + workflow.getInput().put("hasAwards", true); + workflow.getInput().put("channelMapping", 5); + Map name = new HashMap<>(); + name.put("name", "The Who"); + name.put("year", 1970); + Map name2 = new HashMap<>(); + name2.put("name", "The Doors"); + name2.put("year", 1975); + + List names = new LinkedList<>(); + names.add(name); + names.add(name2); + + workflow.getOutput().put("name", name); + workflow.getOutput().put("names", names); + workflow.getOutput().put("awards", 200); + + Task task = new Task(); + task.setReferenceTaskName("task2"); + task.getOutputData().put("location", "http://location"); + task.setStatus(Task.Status.COMPLETED); + + Task task2 = new Task(); + task2.setReferenceTaskName("task3"); + task2.getOutputData().put("refId", "abcddef_1234_7890_aaffcc"); + task2.setStatus(Task.Status.SCHEDULED); + + workflow.getTasks().add(task); + workflow.getTasks().add(task2); + } + + private boolean indexExists(final String index) throws IOException { + return indexDAO.doesResourceExist("/" + index); + } + + private boolean doesMappingExist(final String index, final String mappingName) throws IOException { + return indexDAO.doesResourceExist("/" + index + "/_mapping/" + mappingName); + } + + @Test + public void assertInitialSetup() throws Exception { + + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMww"); + dateFormat.setTimeZone(TimeZone.getTimeZone("GMT")); + + String taskLogIndex = "task_log_" + dateFormat.format(new Date()); + + assertTrue("Index 'conductor' should exist", indexExists("conductor")); + assertTrue("Index '" + taskLogIndex + "' should exist", indexExists(taskLogIndex)); + + assertTrue("Mapping 'workflow' for index 'conductor' should exist", doesMappingExist("conductor", "workflow")); + assertTrue("Mapping 'task' for inndex 'conductor' should exist", doesMappingExist("conductor", "task")); + } + + @Test + public void testWorkflowCRUD() { + + String testWorkflowType = "testworkflow"; + String testId = "1"; + + workflow.setWorkflowId(testId); + workflow.setWorkflowType(testWorkflowType); + + // Create + String workflowType = indexDAO.get(testId, "workflowType"); + assertNull("Workflow should not exist", workflowType); + + // Get + indexDAO.indexWorkflow(workflow); + + workflowType = indexDAO.get(testId, "workflowType"); + assertEquals("Should have found our workflow type", testWorkflowType, workflowType); + + // Update + String newWorkflowType = "newworkflowtype"; + String[] keyChanges = {"workflowType"}; + String[] valueChanges = {newWorkflowType}; + + indexDAO.updateWorkflow(testId, keyChanges, valueChanges); + + workflowType = indexDAO.get(testId, "workflowType"); + assertEquals("Should have updated our new workflow type", newWorkflowType, workflowType); + + // Delete + indexDAO.removeWorkflow(testId); + + workflowType = indexDAO.get(testId, "workflowType"); + assertNull("We should no longer have our workflow in the system", workflowType); + } + +} \ No newline at end of file diff --git a/es5-persistence/src/test/resources/log4j.properties b/es5-persistence/src/test/resources/log4j.properties new file mode 100644 index 0000000000..e4fcb01308 --- /dev/null +++ b/es5-persistence/src/test/resources/log4j.properties @@ -0,0 +1,9 @@ +# Set root logger level to DEBUG and its only appender to A1. +log4j.rootLogger=DEBUG, A1 + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender + +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n \ No newline at end of file diff --git a/server/dependencies.lock b/server/dependencies.lock index d2604f8178..402a136b5c 100644 --- a/server/dependencies.lock +++ b/server/dependencies.lock @@ -299,6 +299,12 @@ "locked": "9.3.9.v20160517", "requested": "9.3.9.v20160517" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-es5-persistence" + ], + "locked": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-es5-persistence" @@ -642,6 +648,12 @@ "locked": "9.3.9.v20160517", "requested": "9.3.9.v20160517" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-es5-persistence" + ], + "locked": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-es5-persistence" @@ -985,6 +997,12 @@ "locked": "9.3.9.v20160517", "requested": "9.3.9.v20160517" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-es5-persistence" + ], + "locked": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-es5-persistence" @@ -1328,6 +1346,12 @@ "locked": "9.3.9.v20160517", "requested": "9.3.9.v20160517" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-es5-persistence" + ], + "locked": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-es5-persistence" @@ -1741,6 +1765,12 @@ "locked": "9.3.9.v20160517", "requested": "9.3.9.v20160517" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-es5-persistence" + ], + "locked": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-es5-persistence" @@ -2084,6 +2114,12 @@ "locked": "9.3.9.v20160517", "requested": "9.3.9.v20160517" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-es5-persistence" + ], + "locked": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-es5-persistence" @@ -2427,6 +2463,12 @@ "project": true, "requested": "9.3.9.v20160517" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-es5-persistence" + ], + "locked": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-es5-persistence" @@ -2774,6 +2816,12 @@ "locked": "9.3.9.v20160517", "requested": "9.3.9.v20160517" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-es5-persistence" + ], + "locked": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-es5-persistence" @@ -3125,6 +3173,12 @@ "locked": "9.3.9.v20160517", "requested": "9.3.9.v20160517" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-es5-persistence" + ], + "locked": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-es5-persistence" @@ -3476,6 +3530,12 @@ "locked": "9.3.9.v20160517", "requested": "9.3.9.v20160517" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-es5-persistence" + ], + "locked": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-es5-persistence" @@ -3827,6 +3887,12 @@ "locked": "9.3.9.v20160517", "requested": "9.3.9.v20160517" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-es5-persistence" + ], + "locked": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-es5-persistence" diff --git a/server/src/main/java/com/netflix/conductor/bootstrap/ModulesProvider.java b/server/src/main/java/com/netflix/conductor/bootstrap/ModulesProvider.java index 350665a9b9..403765ce93 100644 --- a/server/src/main/java/com/netflix/conductor/bootstrap/ModulesProvider.java +++ b/server/src/main/java/com/netflix/conductor/bootstrap/ModulesProvider.java @@ -11,7 +11,7 @@ import com.netflix.conductor.core.utils.DummyPayloadStorage; import com.netflix.conductor.core.utils.S3PayloadStorage; import com.netflix.conductor.dao.RedisWorkflowModule; -import com.netflix.conductor.elasticsearch.es5.ElasticSearchV5Module; +import com.netflix.conductor.elasticsearch.ElasticSearchModule; import com.netflix.conductor.mysql.MySQLWorkflowModule; import com.netflix.conductor.server.DynomiteClusterModule; import com.netflix.conductor.server.JerseyModule; @@ -90,7 +90,7 @@ private List selectModulesToLoad() { break; } - modules.add(new ElasticSearchV5Module()); + modules.add(new ElasticSearchModule()); modules.add(new WorkflowExecutorModule()); diff --git a/test-harness/dependencies.lock b/test-harness/dependencies.lock index d85895eead..af43b5ecf9 100644 --- a/test-harness/dependencies.lock +++ b/test-harness/dependencies.lock @@ -401,6 +401,12 @@ "locked": "9.3.9.v20160517", "requested": "9.3.9.v20160517" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-es5-persistence" + ], + "locked": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-es5-persistence" @@ -842,6 +848,12 @@ "locked": "9.3.9.v20160517", "requested": "9.3.9.v20160517" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-es5-persistence" + ], + "locked": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-es5-persistence" @@ -1283,6 +1295,12 @@ "locked": "9.3.9.v20160517", "requested": "9.3.9.v20160517" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-es5-persistence" + ], + "locked": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-es5-persistence" @@ -1724,6 +1742,12 @@ "locked": "9.3.9.v20160517", "requested": "9.3.9.v20160517" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-es5-persistence" + ], + "locked": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-es5-persistence" From 6fed6f453581da2fe4a152d62916d6ea90f7af43 Mon Sep 17 00:00:00 2001 From: Samuel Sanders Date: Sun, 9 Dec 2018 19:31:48 -0800 Subject: [PATCH 2/3] Creates end-to-end tests for MySQL and ES REST Client + Extracts end-to-end tests into base classes + Deconflicts ports between end-to-end test setups Signed-off-by: Adam Neumann --- es5-persistence/dependencies.lock | 16 +++ test-harness/build.gradle | 1 + test-harness/dependencies.lock | 16 +++ ...sts.java => AbstractGrpcEndToEndTest.java} | 103 ++++++------------ ...sts.java => AbstractHttpEndToEndTest.java} | 81 ++++---------- .../ESRestClientHttpEndToEndTest.java | 99 +++++++++++++++++ .../tests/integration/GrpcEndToEndTest.java | 75 +++++++++++++ .../tests/integration/HttpEndToEndTest.java | 69 ++++++++++++ .../integration/MySQLGrpcEndToEndTest.java | 79 ++++++++++++++ versionsOfDependencies.gradle | 1 + 10 files changed, 409 insertions(+), 131 deletions(-) rename test-harness/src/test/java/com/netflix/conductor/tests/integration/{End2EndGrpcTests.java => AbstractGrpcEndToEndTest.java} (77%) rename test-harness/src/test/java/com/netflix/conductor/tests/integration/{End2EndTests.java => AbstractHttpEndToEndTest.java} (85%) create mode 100644 test-harness/src/test/java/com/netflix/conductor/tests/integration/ESRestClientHttpEndToEndTest.java create mode 100644 test-harness/src/test/java/com/netflix/conductor/tests/integration/GrpcEndToEndTest.java create mode 100644 test-harness/src/test/java/com/netflix/conductor/tests/integration/HttpEndToEndTest.java create mode 100644 test-harness/src/test/java/com/netflix/conductor/tests/integration/MySQLGrpcEndToEndTest.java diff --git a/es5-persistence/dependencies.lock b/es5-persistence/dependencies.lock index 6085cb8c6f..bc75e63283 100644 --- a/es5-persistence/dependencies.lock +++ b/es5-persistence/dependencies.lock @@ -812,6 +812,10 @@ "locked": "2.9.1", "requested": "2.9.1" }, + "org.awaitility:awaitility": { + "locked": "3.1.2", + "requested": "3.1.2" + }, "org.elasticsearch.client:elasticsearch-rest-client": { "locked": "5.6.8", "requested": "5.6.8" @@ -961,6 +965,10 @@ "locked": "2.9.1", "requested": "2.9.1" }, + "org.awaitility:awaitility": { + "locked": "3.1.2", + "requested": "3.1.2" + }, "org.elasticsearch.client:elasticsearch-rest-client": { "locked": "5.6.8", "requested": "5.6.8" @@ -1110,6 +1118,10 @@ "locked": "2.9.1", "requested": "2.9.1" }, + "org.awaitility:awaitility": { + "locked": "3.1.2", + "requested": "3.1.2" + }, "org.elasticsearch.client:elasticsearch-rest-client": { "locked": "5.6.8", "requested": "5.6.8" @@ -1259,6 +1271,10 @@ "locked": "2.9.1", "requested": "2.9.1" }, + "org.awaitility:awaitility": { + "locked": "3.1.2", + "requested": "3.1.2" + }, "org.elasticsearch.client:elasticsearch-rest-client": { "locked": "5.6.8", "requested": "5.6.8" diff --git a/test-harness/build.gradle b/test-harness/build.gradle index 6b98298e3f..7d103cdc70 100644 --- a/test-harness/build.gradle +++ b/test-harness/build.gradle @@ -23,6 +23,7 @@ dependencies { testCompile "io.swagger:swagger-jersey-jaxrs:${revSwaggerJersey}" testCompile "ch.vorburger.mariaDB4j:mariaDB4j:${revMariaDB4j}" + testCompile "org.awaitility:awaitility:${revAwaitility}" } test { diff --git a/test-harness/dependencies.lock b/test-harness/dependencies.lock index af43b5ecf9..55fbb94872 100644 --- a/test-harness/dependencies.lock +++ b/test-harness/dependencies.lock @@ -387,6 +387,10 @@ ], "locked": "2.9.1" }, + "org.awaitility:awaitility": { + "locked": "3.1.2", + "requested": "3.1.2" + }, "org.eclipse.jetty:jetty-server": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-server" @@ -834,6 +838,10 @@ ], "locked": "2.9.1" }, + "org.awaitility:awaitility": { + "locked": "3.1.2", + "requested": "3.1.2" + }, "org.eclipse.jetty:jetty-server": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-server" @@ -1281,6 +1289,10 @@ ], "locked": "2.9.1" }, + "org.awaitility:awaitility": { + "locked": "3.1.2", + "requested": "3.1.2" + }, "org.eclipse.jetty:jetty-server": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-server" @@ -1728,6 +1740,10 @@ ], "locked": "2.9.1" }, + "org.awaitility:awaitility": { + "locked": "3.1.2", + "requested": "3.1.2" + }, "org.eclipse.jetty:jetty-server": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-server" diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/integration/End2EndGrpcTests.java b/test-harness/src/test/java/com/netflix/conductor/tests/integration/AbstractGrpcEndToEndTest.java similarity index 77% rename from test-harness/src/test/java/com/netflix/conductor/tests/integration/End2EndGrpcTests.java rename to test-harness/src/test/java/com/netflix/conductor/tests/integration/AbstractGrpcEndToEndTest.java index cddd44c0d9..535ba0fb08 100644 --- a/test-harness/src/test/java/com/netflix/conductor/tests/integration/End2EndGrpcTests.java +++ b/test-harness/src/test/java/com/netflix/conductor/tests/integration/AbstractGrpcEndToEndTest.java @@ -15,10 +15,10 @@ */ package com.netflix.conductor.tests.integration; -import com.google.inject.Guice; -import com.google.inject.Injector; -import com.netflix.conductor.bootstrap.BootstrapModule; -import com.netflix.conductor.bootstrap.ModulesProvider; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + import com.netflix.conductor.client.grpc.MetadataClient; import com.netflix.conductor.client.grpc.TaskClient; import com.netflix.conductor.client.grpc.WorkflowClient; @@ -34,60 +34,42 @@ import com.netflix.conductor.common.run.Workflow; import com.netflix.conductor.common.run.Workflow.WorkflowStatus; import com.netflix.conductor.common.run.WorkflowSummary; -import com.netflix.conductor.elasticsearch.ElasticSearchConfiguration; import com.netflix.conductor.elasticsearch.EmbeddedElasticSearch; -import com.netflix.conductor.elasticsearch.EmbeddedElasticSearchProvider; -import com.netflix.conductor.grpc.server.GRPCServer; -import com.netflix.conductor.grpc.server.GRPCServerConfiguration; -import com.netflix.conductor.grpc.server.GRPCServerProvider; -import com.netflix.conductor.tests.utils.TestEnvironment; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - import java.util.LinkedList; import java.util.List; -import java.util.Optional; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import org.junit.Test; /** * @author Viren */ -public class End2EndGrpcTests extends AbstractEndToEndTest { - private static TaskClient taskClient; - private static WorkflowClient workflowClient; - private static MetadataClient metadataClient; - private static EmbeddedElasticSearch search; - - @BeforeClass - public static void setup() throws Exception { - TestEnvironment.setup(); - System.setProperty(GRPCServerConfiguration.ENABLED_PROPERTY_NAME, "true"); - System.setProperty(ElasticSearchConfiguration.EMBEDDED_PORT_PROPERTY_NAME, "9202"); - System.setProperty(ElasticSearchConfiguration.ELASTIC_SEARCH_URL_PROPERTY_NAME, "localhost:9302"); - - Injector bootInjector = Guice.createInjector(new BootstrapModule()); - Injector serverInjector = Guice.createInjector(bootInjector.getInstance(ModulesProvider.class).get()); - - search = serverInjector.getInstance(EmbeddedElasticSearchProvider.class).get().get(); - search.start(); - - Optional server = serverInjector.getInstance(GRPCServerProvider.class).get(); - assertTrue("failed to instantiate GRPCServer", server.isPresent()); - server.get().start(); - - taskClient = new TaskClient("localhost", 8090); - workflowClient = new WorkflowClient("localhost", 8090); - metadataClient = new MetadataClient("localhost", 8090); +public abstract class AbstractGrpcEndToEndTest extends AbstractEndToEndTest { + + protected static TaskClient taskClient; + protected static WorkflowClient workflowClient; + protected static MetadataClient metadataClient; + protected static EmbeddedElasticSearch search; + + @Override + protected String startWorkflow(String workflowExecutionName, WorkflowDef workflowDefinition) { + StartWorkflowRequest workflowRequest = new StartWorkflowRequest() + .withName(workflowExecutionName) + .withWorkflowDef(workflowDefinition); + return workflowClient.startWorkflow(workflowRequest); } - @AfterClass - public static void teardown() throws Exception { - TestEnvironment.teardown(); - search.stop(); + @Override + protected Workflow getWorkflow(String workflowId, boolean includeTasks) { + return workflowClient.getWorkflow(workflowId, includeTasks); + } + + @Override + protected TaskDef getTaskDefinition(String taskName) { + return metadataClient.getTaskDef(taskName); + } + + @Override + protected void registerTaskDefinitions(List taskDefinitionList) { + metadataClient.registerTaskDefs(taskDefinitionList); } @Test @@ -213,27 +195,4 @@ public void testAll() throws Exception { assertEquals(WorkflowStatus.RUNNING, wf.getStatus()); assertEquals(1, wf.getTasks().size()); } - - @Override - protected String startWorkflow(String workflowExecutionName, WorkflowDef workflowDefinition) { - StartWorkflowRequest workflowRequest = new StartWorkflowRequest() - .withName(workflowExecutionName) - .withWorkflowDef(workflowDefinition); - return workflowClient.startWorkflow(workflowRequest); - } - - @Override - protected Workflow getWorkflow(String workflowId, boolean includeTasks) { - return workflowClient.getWorkflow(workflowId, includeTasks); - } - - @Override - protected TaskDef getTaskDefinition(String taskName) { - return metadataClient.getTaskDef(taskName); - } - - @Override - protected void registerTaskDefinitions(List taskDefinitionList) { - metadataClient.registerTaskDefs(taskDefinitionList); - } } diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/integration/End2EndTests.java b/test-harness/src/test/java/com/netflix/conductor/tests/integration/AbstractHttpEndToEndTest.java similarity index 85% rename from test-harness/src/test/java/com/netflix/conductor/tests/integration/End2EndTests.java rename to test-harness/src/test/java/com/netflix/conductor/tests/integration/AbstractHttpEndToEndTest.java index c17c7c727f..4f4c6def5a 100644 --- a/test-harness/src/test/java/com/netflix/conductor/tests/integration/End2EndTests.java +++ b/test-harness/src/test/java/com/netflix/conductor/tests/integration/AbstractHttpEndToEndTest.java @@ -12,10 +12,11 @@ */ package com.netflix.conductor.tests.integration; -import com.google.inject.Guice; -import com.google.inject.Injector; -import com.netflix.conductor.bootstrap.BootstrapModule; -import com.netflix.conductor.bootstrap.ModulesProvider; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + import com.netflix.conductor.client.exceptions.ConductorClientException; import com.netflix.conductor.client.http.MetadataClient; import com.netflix.conductor.client.http.TaskClient; @@ -32,67 +33,25 @@ import com.netflix.conductor.common.run.Workflow; import com.netflix.conductor.common.run.Workflow.WorkflowStatus; import com.netflix.conductor.common.run.WorkflowSummary; -import com.netflix.conductor.elasticsearch.ElasticSearchConfiguration; import com.netflix.conductor.elasticsearch.EmbeddedElasticSearch; -import com.netflix.conductor.elasticsearch.EmbeddedElasticSearchProvider; -import com.netflix.conductor.jetty.server.JettyServer; -import com.netflix.conductor.tests.utils.TestEnvironment; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - import java.util.ArrayList; import java.util.HashMap; import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import java.util.concurrent.TimeUnit; +import org.awaitility.Awaitility; +import org.junit.Test; /** * @author Viren */ +public abstract class AbstractHttpEndToEndTest extends AbstractEndToEndTest { -public class End2EndTests extends AbstractEndToEndTest { - - private static TaskClient taskClient; - private static WorkflowClient workflowClient; - private static EmbeddedElasticSearch search; - private static MetadataClient metadataClient; - - private static final int SERVER_PORT = 8080; - - @BeforeClass - public static void setup() throws Exception { - TestEnvironment.setup(); - System.setProperty(ElasticSearchConfiguration.EMBEDDED_PORT_PROPERTY_NAME, "9201"); - System.setProperty(ElasticSearchConfiguration.ELASTIC_SEARCH_URL_PROPERTY_NAME, "localhost:9301"); - - Injector bootInjector = Guice.createInjector(new BootstrapModule()); - Injector serverInjector = Guice.createInjector(bootInjector.getInstance(ModulesProvider.class).get()); - - search = serverInjector.getInstance(EmbeddedElasticSearchProvider.class).get().get(); - search.start(); + protected static String apiRoot; - JettyServer server = new JettyServer(SERVER_PORT, false); - server.start(); - - taskClient = new TaskClient(); - taskClient.setRootURI("http://localhost:8080/api/"); - - workflowClient = new WorkflowClient(); - workflowClient.setRootURI("http://localhost:8080/api/"); - - metadataClient = new MetadataClient(); - metadataClient.setRootURI("http://localhost:8080/api/"); - } - - @AfterClass - public static void teardown() throws Exception { - TestEnvironment.teardown(); - search.stop(); - } + protected static TaskClient taskClient; + protected static WorkflowClient workflowClient; + protected static EmbeddedElasticSearch search; + protected static MetadataClient metadataClient; @Override protected String startWorkflow(String workflowExecutionName, WorkflowDef workflowDefinition) { @@ -154,9 +113,13 @@ public void testAll() throws Exception { assertEquals(0, workflow.getTasks().size()); assertEquals(workflowId, workflow.getWorkflowId()); - List workflowList = workflowClient.getWorkflows(def.getName(), correlationId, false, false); - assertEquals(1, workflowList.size()); - assertEquals(workflowId, workflowList.get(0).getWorkflowId()); + Awaitility.await() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + List workflowList = workflowClient.getWorkflows(def.getName(), correlationId, false, false); + assertEquals(1, workflowList.size()); + assertEquals(workflowId, workflowList.get(0).getWorkflowId()); + }); workflow = workflowClient.getWorkflow(workflowId, true); assertNotNull(workflow); @@ -274,7 +237,7 @@ public void testMetadataWorkflowDefinition() { @Test public void testInvalidResource() { MetadataClient metadataClient = new MetadataClient(); - metadataClient.setRootURI("http://localhost:8080/api/invalid"); + metadataClient.setRootURI(String.format("%sinvalid", apiRoot)); WorkflowDef def = new WorkflowDef(); def.setName("testWorkflowDel"); def.setVersion(1); diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/integration/ESRestClientHttpEndToEndTest.java b/test-harness/src/test/java/com/netflix/conductor/tests/integration/ESRestClientHttpEndToEndTest.java new file mode 100644 index 0000000000..e4024dd360 --- /dev/null +++ b/test-harness/src/test/java/com/netflix/conductor/tests/integration/ESRestClientHttpEndToEndTest.java @@ -0,0 +1,99 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.tests.integration; + +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.netflix.conductor.bootstrap.BootstrapModule; +import com.netflix.conductor.bootstrap.ModulesProvider; +import com.netflix.conductor.client.http.MetadataClient; +import com.netflix.conductor.client.http.TaskClient; +import com.netflix.conductor.client.http.WorkflowClient; +import com.netflix.conductor.elasticsearch.ElasticSearchConfiguration; +import com.netflix.conductor.elasticsearch.ElasticSearchRestClientProvider; +import com.netflix.conductor.elasticsearch.EmbeddedElasticSearchProvider; +import com.netflix.conductor.elasticsearch.SystemPropertiesElasticSearchConfiguration; +import com.netflix.conductor.jetty.server.JettyServer; +import com.netflix.conductor.tests.utils.TestEnvironment; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import org.elasticsearch.client.RestClient; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Viren + */ +public class ESRestClientHttpEndToEndTest extends AbstractHttpEndToEndTest { + + private static final Logger logger = + LoggerFactory.getLogger(ESRestClientHttpEndToEndTest.class); + + private static final int SERVER_PORT = 8083; + + private static RestClient elasticSearchAdminClient; + + @BeforeClass + public static void setup() throws Exception { + TestEnvironment.setup(); + System.setProperty(ElasticSearchConfiguration.EMBEDDED_PORT_PROPERTY_NAME, "9203"); + System.setProperty(ElasticSearchConfiguration.ELASTIC_SEARCH_URL_PROPERTY_NAME, "http://localhost:9203"); + + Injector bootInjector = Guice.createInjector(new BootstrapModule()); + Injector serverInjector = Guice.createInjector(bootInjector.getInstance(ModulesProvider.class).get()); + + search = serverInjector.getInstance(EmbeddedElasticSearchProvider.class).get().get(); + search.start(); + + SystemPropertiesElasticSearchConfiguration configuration = new SystemPropertiesElasticSearchConfiguration(); + ElasticSearchRestClientProvider restClientProvider = new ElasticSearchRestClientProvider(configuration); + elasticSearchAdminClient = restClientProvider.get(); + + waitForGreenCluster(); + + JettyServer server = new JettyServer(SERVER_PORT, false); + server.start(); + + apiRoot = String.format("http://localhost:%d/api/", SERVER_PORT); + + taskClient = new TaskClient(); + taskClient.setRootURI(apiRoot); + + workflowClient = new WorkflowClient(); + workflowClient.setRootURI(apiRoot); + + metadataClient = new MetadataClient(); + metadataClient.setRootURI(apiRoot); + } + + @AfterClass + public static void teardown() throws Exception { + TestEnvironment.teardown(); + search.stop(); + } + + private static void waitForGreenCluster() throws Exception { + long startTime = System.currentTimeMillis(); + + Map params = new HashMap<>(); + params.put("wait_for_status", "green"); + params.put("timeout", "30s"); + + elasticSearchAdminClient.performRequest("GET", "/_cluster/health", params); + logger.info("Elasticsearch Cluster ready in {} ms", System.currentTimeMillis() - startTime); + } + +} diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/integration/GrpcEndToEndTest.java b/test-harness/src/test/java/com/netflix/conductor/tests/integration/GrpcEndToEndTest.java new file mode 100644 index 0000000000..9f30f41a01 --- /dev/null +++ b/test-harness/src/test/java/com/netflix/conductor/tests/integration/GrpcEndToEndTest.java @@ -0,0 +1,75 @@ +/** + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +/** + * + */ +package com.netflix.conductor.tests.integration; + +import static org.junit.Assert.assertTrue; + +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.netflix.conductor.bootstrap.BootstrapModule; +import com.netflix.conductor.bootstrap.ModulesProvider; +import com.netflix.conductor.client.grpc.MetadataClient; +import com.netflix.conductor.client.grpc.TaskClient; +import com.netflix.conductor.client.grpc.WorkflowClient; +import com.netflix.conductor.core.config.Configuration; +import com.netflix.conductor.elasticsearch.ElasticSearchConfiguration; +import com.netflix.conductor.elasticsearch.EmbeddedElasticSearchProvider; +import com.netflix.conductor.grpc.server.GRPCServer; +import com.netflix.conductor.grpc.server.GRPCServerConfiguration; +import com.netflix.conductor.grpc.server.GRPCServerProvider; +import com.netflix.conductor.mysql.MySQLConfiguration; +import com.netflix.conductor.tests.utils.TestEnvironment; +import java.util.Optional; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +/** + * @author Viren + */ +public class GrpcEndToEndTest extends AbstractGrpcEndToEndTest { + + private static final int SERVER_PORT = 8092; + + @BeforeClass + public static void setup() throws Exception { + TestEnvironment.setup(); + System.setProperty(GRPCServerConfiguration.ENABLED_PROPERTY_NAME, "true"); + System.setProperty(GRPCServerConfiguration.PORT_PROPERTY_NAME, "8092"); + System.setProperty(ElasticSearchConfiguration.EMBEDDED_PORT_PROPERTY_NAME, "9202"); + System.setProperty(ElasticSearchConfiguration.ELASTIC_SEARCH_URL_PROPERTY_NAME, "localhost:9302"); + + Injector bootInjector = Guice.createInjector(new BootstrapModule()); + Injector serverInjector = Guice.createInjector(bootInjector.getInstance(ModulesProvider.class).get()); + + search = serverInjector.getInstance(EmbeddedElasticSearchProvider.class).get().get(); + search.start(); + + Optional server = serverInjector.getInstance(GRPCServerProvider.class).get(); + assertTrue("failed to instantiate GRPCServer", server.isPresent()); + server.get().start(); + + taskClient = new TaskClient("localhost", SERVER_PORT); + workflowClient = new WorkflowClient("localhost", SERVER_PORT); + metadataClient = new MetadataClient("localhost", SERVER_PORT); + } + + @AfterClass + public static void teardown() throws Exception { + TestEnvironment.teardown(); + search.stop(); + } + +} diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/integration/HttpEndToEndTest.java b/test-harness/src/test/java/com/netflix/conductor/tests/integration/HttpEndToEndTest.java new file mode 100644 index 0000000000..456070aa2d --- /dev/null +++ b/test-harness/src/test/java/com/netflix/conductor/tests/integration/HttpEndToEndTest.java @@ -0,0 +1,69 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.tests.integration; + +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.netflix.conductor.bootstrap.BootstrapModule; +import com.netflix.conductor.bootstrap.ModulesProvider; +import com.netflix.conductor.client.http.MetadataClient; +import com.netflix.conductor.client.http.TaskClient; +import com.netflix.conductor.client.http.WorkflowClient; +import com.netflix.conductor.elasticsearch.ElasticSearchConfiguration; +import com.netflix.conductor.elasticsearch.EmbeddedElasticSearchProvider; +import com.netflix.conductor.jetty.server.JettyServer; +import com.netflix.conductor.tests.utils.TestEnvironment; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +/** + * @author Viren + */ +public class HttpEndToEndTest extends AbstractHttpEndToEndTest { + + private static final int SERVER_PORT = 8080; + + @BeforeClass + public static void setup() throws Exception { + TestEnvironment.setup(); + System.setProperty(ElasticSearchConfiguration.EMBEDDED_PORT_PROPERTY_NAME, "9201"); + System.setProperty(ElasticSearchConfiguration.ELASTIC_SEARCH_URL_PROPERTY_NAME, "localhost:9301"); + + Injector bootInjector = Guice.createInjector(new BootstrapModule()); + Injector serverInjector = Guice.createInjector(bootInjector.getInstance(ModulesProvider.class).get()); + + search = serverInjector.getInstance(EmbeddedElasticSearchProvider.class).get().get(); + search.start(); + + JettyServer server = new JettyServer(SERVER_PORT, false); + server.start(); + + apiRoot = String.format("http://localhost:%d/api/", SERVER_PORT); + + taskClient = new TaskClient(); + taskClient.setRootURI(apiRoot); + + workflowClient = new WorkflowClient(); + workflowClient.setRootURI(apiRoot); + + metadataClient = new MetadataClient(); + metadataClient.setRootURI(apiRoot); + } + + @AfterClass + public static void teardown() throws Exception { + TestEnvironment.teardown(); + search.stop(); + } + +} diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/integration/MySQLGrpcEndToEndTest.java b/test-harness/src/test/java/com/netflix/conductor/tests/integration/MySQLGrpcEndToEndTest.java new file mode 100644 index 0000000000..852828e8d6 --- /dev/null +++ b/test-harness/src/test/java/com/netflix/conductor/tests/integration/MySQLGrpcEndToEndTest.java @@ -0,0 +1,79 @@ +/** + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +/** + * + */ +package com.netflix.conductor.tests.integration; + +import static org.junit.Assert.assertTrue; + +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.netflix.conductor.bootstrap.BootstrapModule; +import com.netflix.conductor.bootstrap.ModulesProvider; +import com.netflix.conductor.client.grpc.MetadataClient; +import com.netflix.conductor.client.grpc.TaskClient; +import com.netflix.conductor.client.grpc.WorkflowClient; +import com.netflix.conductor.core.config.Configuration; +import com.netflix.conductor.elasticsearch.ElasticSearchConfiguration; +import com.netflix.conductor.elasticsearch.EmbeddedElasticSearchProvider; +import com.netflix.conductor.grpc.server.GRPCServer; +import com.netflix.conductor.grpc.server.GRPCServerConfiguration; +import com.netflix.conductor.grpc.server.GRPCServerProvider; +import com.netflix.conductor.mysql.MySQLConfiguration; +import com.netflix.conductor.tests.utils.MySQLTestModule; +import com.netflix.conductor.tests.utils.MySQLTestRunner; +import com.netflix.conductor.tests.utils.TestEnvironment; +import java.util.Optional; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.runner.RunWith; + +/** + * @author Viren + */ +@RunWith(MySQLTestRunner.class) +public class MySQLGrpcEndToEndTest extends AbstractGrpcEndToEndTest { + + private static final int SERVER_PORT = 8094; + + @BeforeClass + public static void setup() throws Exception { + TestEnvironment.setup(); + System.setProperty(GRPCServerConfiguration.ENABLED_PROPERTY_NAME, "true"); + System.setProperty(GRPCServerConfiguration.PORT_PROPERTY_NAME, "8094"); + System.setProperty(ElasticSearchConfiguration.EMBEDDED_PORT_PROPERTY_NAME, "9204"); + System.setProperty(ElasticSearchConfiguration.ELASTIC_SEARCH_URL_PROPERTY_NAME, "localhost:9304"); + + Injector bootInjector = Guice.createInjector(new BootstrapModule()); + Injector serverInjector = Guice.createInjector(bootInjector.getInstance(ModulesProvider.class).get()); + + search = serverInjector.getInstance(EmbeddedElasticSearchProvider.class).get().get(); + search.start(); + + Optional server = serverInjector.getInstance(GRPCServerProvider.class).get(); + assertTrue("failed to instantiate GRPCServer", server.isPresent()); + server.get().start(); + + taskClient = new TaskClient("localhost", SERVER_PORT); + workflowClient = new WorkflowClient("localhost", SERVER_PORT); + metadataClient = new MetadataClient("localhost", SERVER_PORT); + } + + @AfterClass + public static void teardown() throws Exception { + TestEnvironment.teardown(); + search.stop(); + } + +} diff --git a/versionsOfDependencies.gradle b/versionsOfDependencies.gradle index 41bfda0028..19f62dc839 100644 --- a/versionsOfDependencies.gradle +++ b/versionsOfDependencies.gradle @@ -2,6 +2,7 @@ * Common place to define all the version dependencies */ ext { + revAwaitility = '3.1.2' revAwsSdk = '1.11.86' revArchaius = '0.7.5' revCommonsLang3 = '3.0' From 3601347bdad965364c90387b16c0a379e0d2a6d1 Mon Sep 17 00:00:00 2001 From: David Wadden Date: Sun, 9 Dec 2018 19:36:56 -0800 Subject: [PATCH 3/3] Improves test coverage of Elasticsearch clients + Backfills Java client tests and augments REST tests + Re-creates indexes for every REST ES test method + Adds Awaitility dependency for async testing + Runs in-memory ES instance in config-local.props Signed-off-by: Nick Wade --- docker/server/config/config-local.properties | 2 +- es5-persistence/build.gradle | 1 + .../dao/es5/index/ElasticSearchDAOV5.java | 427 +++++++++++------- .../dao/es5/index/ElasticSearchRestDAOV5.java | 21 +- .../ElasticSearchRestClientProvider.java | 7 +- .../TestElasticSearchRestConfiguration.java | 11 - .../dao/es5/index/TestElasticSearchDAOV5.java | 402 +++++++++++++++++ .../es5/index/TestElasticSearchRestDAOV5.java | 333 ++++++++++++-- .../src/test/resources/log4j.properties | 4 +- .../tests/utils/MySQLTestModule.java | 3 + 10 files changed, 976 insertions(+), 235 deletions(-) delete mode 100644 es5-persistence/src/test/java/com/netflix/conductor/com/netflix/conductor/elasticsearch/TestElasticSearchRestConfiguration.java create mode 100644 es5-persistence/src/test/java/com/netflix/conductor/dao/es5/index/TestElasticSearchDAOV5.java diff --git a/docker/server/config/config-local.properties b/docker/server/config/config-local.properties index b72d893d56..8916e8b594 100755 --- a/docker/server/config/config-local.properties +++ b/docker/server/config/config-local.properties @@ -35,7 +35,7 @@ queues.dynomite.nonQuorum.port=22122 # memory: The instance is created in memory and lost when the server dies. Useful for development and testing. # external: Elastic search instance runs outside of the server. Data is persisted and does not get lost when # the server dies. Useful for more stable environments like staging or production. -workflow.elasticsearch.instanceType=external +workflow.elasticsearch.instanceType=memory # Transport address to elasticsearch workflow.elasticsearch.url=localhost:9300 diff --git a/es5-persistence/build.gradle b/es5-persistence/build.gradle index 944ba7e051..6206ca2edf 100644 --- a/es5-persistence/build.gradle +++ b/es5-persistence/build.gradle @@ -13,4 +13,5 @@ dependencies { compile "org.apache.logging.log4j:log4j-core:${revLog4jCore}" testCompile "org.slf4j:slf4j-log4j12:${revSlf4jlog4j}" + testCompile "org.awaitility:awaitility:${revAwaitility}" } diff --git a/es5-persistence/src/main/java/com/netflix/conductor/dao/es5/index/ElasticSearchDAOV5.java b/es5-persistence/src/main/java/com/netflix/conductor/dao/es5/index/ElasticSearchDAOV5.java index 2e8a184c54..816406aea2 100644 --- a/es5-persistence/src/main/java/com/netflix/conductor/dao/es5/index/ElasticSearchDAOV5.java +++ b/es5-persistence/src/main/java/com/netflix/conductor/dao/es5/index/ElasticSearchDAOV5.java @@ -1,20 +1,15 @@ /** * Copyright 2016 Netflix, Inc. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** + * http://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. */ package com.netflix.conductor.dao.es5.index; @@ -64,23 +59,24 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; +import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortOrder; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.inject.Inject; -import javax.inject.Singleton; +import java.io.IOException; import java.io.InputStream; import java.text.SimpleDateFormat; import java.time.LocalDate; -import java.util.ArrayList; +import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.TimeZone; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -90,6 +86,9 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.StreamSupport; +import javax.inject.Inject; +import javax.inject.Singleton; /** * @author Viren @@ -101,34 +100,22 @@ public class ElasticSearchDAOV5 implements IndexDAO { private static Logger logger = LoggerFactory.getLogger(ElasticSearchDAOV5.class); private static final String WORKFLOW_DOC_TYPE = "workflow"; - private static final String TASK_DOC_TYPE = "task"; - private static final String LOG_DOC_TYPE = "task_log"; - private static final String EVENT_DOC_TYPE = "event"; - private static final String MSG_DOC_TYPE = "message"; private static final String className = ElasticSearchDAOV5.class.getSimpleName(); + private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat("yyyyMMww"); + private static final TimeZone GMT = TimeZone.getTimeZone("GMT"); private static final int RETRY_COUNT = 3; - private String indexName; - + private final String indexName; private String logIndexName; - - private String logIndexPrefix; - - private ObjectMapper objectMapper; - - private Client elasticSearchClient; - - - private static final TimeZone GMT = TimeZone.getTimeZone("GMT"); - - private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat("yyyyMMww"); - + private final String logIndexPrefix; + private final ObjectMapper objectMapper; + private final Client elasticSearchClient; private final ExecutorService executorService; static { @@ -136,7 +123,8 @@ public class ElasticSearchDAOV5 implements IndexDAO { } @Inject - public ElasticSearchDAOV5(Client elasticSearchClient, ElasticSearchConfiguration config, ObjectMapper objectMapper) { + public ElasticSearchDAOV5(Client elasticSearchClient, ElasticSearchConfiguration config, + ObjectMapper objectMapper) { this.objectMapper = objectMapper; this.elasticSearchClient = elasticSearchClient; this.indexName = config.getIndexName(); @@ -146,74 +134,108 @@ public ElasticSearchDAOV5(Client elasticSearchClient, ElasticSearchConfiguration int maximumPoolSize = 12; long keepAliveTime = 1L; this.executorService = new ThreadPoolExecutor(corePoolSize, - maximumPoolSize, - keepAliveTime, - TimeUnit.MINUTES, - new LinkedBlockingQueue<>()); + maximumPoolSize, + keepAliveTime, + TimeUnit.MINUTES, + new LinkedBlockingQueue<>()); } @Override public void setup() throws Exception { - elasticSearchClient.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().get(); + elasticSearchClient.admin() + .cluster() + .prepareHealth() + .setWaitForGreenStatus() + .execute() + .get(); try { - initIndex(); - updateIndexName(); - Executors.newScheduledThreadPool(1).scheduleAtFixedRate(() -> updateIndexName(), 0, 1, TimeUnit.HOURS); - + updateLogIndexName(); + Executors.newScheduledThreadPool(1) + .scheduleAtFixedRate(() -> updateLogIndexName(), 0, 1, TimeUnit.HOURS); } catch (Exception e) { logger.error(e.getMessage(), e); } //1. Create the required index - try { - elasticSearchClient.admin().indices().prepareGetIndex().addIndices(indexName).execute().actionGet(); - }catch(IndexNotFoundException infe) { - try { - elasticSearchClient.admin().indices().prepareCreate(indexName).execute().actionGet(); - }catch(ResourceAlreadyExistsException done) {} - } + addIndex(indexName); //2. Add Mappings for the workflow document type - GetMappingsResponse getMappingsResponse = elasticSearchClient.admin().indices().prepareGetMappings(indexName).addTypes(WORKFLOW_DOC_TYPE).execute().actionGet(); - if(getMappingsResponse.mappings().isEmpty()) { - logger.info("Adding the workflow type mappings"); - InputStream stream = ElasticSearchDAOV5.class.getResourceAsStream("/mappings_docType_workflow.json"); - byte[] bytes = IOUtils.toByteArray(stream); - String source = new String(bytes); + addMappingToIndex(indexName, WORKFLOW_DOC_TYPE, "/mappings_docType_workflow.json"); + + //3. Add Mappings for task document type + addMappingToIndex(indexName, TASK_DOC_TYPE, "/mappings_docType_task.json"); + } + + private void addIndex(String indexName) { + try { + elasticSearchClient.admin() + .indices() + .prepareGetIndex() + .addIndices(indexName) + .execute() + .actionGet(); + } catch (IndexNotFoundException infe) { try { - elasticSearchClient.admin().indices().preparePutMapping(indexName).setType(WORKFLOW_DOC_TYPE).setSource(source).execute().actionGet(); - }catch(Exception e) { - logger.error("Failed to init index mappings", e); + elasticSearchClient.admin() + .indices() + .prepareCreate(indexName) + .execute() + .actionGet(); + } catch (ResourceAlreadyExistsException done) { + // no-op } } + } + + private void addMappingToIndex(String indexName, String mappingType, String mappingFilename) + throws IOException { + GetMappingsResponse getMappingsResponse = elasticSearchClient.admin() + .indices() + .prepareGetMappings(indexName) + .addTypes(mappingType) + .execute() + .actionGet(); - //3. Add Mappings for task document type - getMappingsResponse = elasticSearchClient.admin().indices().prepareGetMappings(indexName).addTypes(TASK_DOC_TYPE).execute().actionGet(); if (getMappingsResponse.mappings().isEmpty()) { - logger.info("Adding the task type mappings"); - InputStream stream = ElasticSearchDAOV5.class.getResourceAsStream("/mappings_docType_task.json"); + logger.info("Adding the workflow type mappings"); + InputStream stream = ElasticSearchDAOV5.class.getResourceAsStream(mappingFilename); byte[] bytes = IOUtils.toByteArray(stream); String source = new String(bytes); try { - elasticSearchClient.admin().indices().preparePutMapping(indexName).setType(TASK_DOC_TYPE).setSource(source).execute().actionGet(); + elasticSearchClient.admin() + .indices() + .preparePutMapping(indexName) + .setType(mappingType) + .setSource(source) + .execute() + .actionGet(); } catch (Exception e) { - logger.error(e.getMessage(), e); + logger.error("Failed to init index mappings", e); } } } - private void updateIndexName() { + private void updateLogIndexName() { this.logIndexName = this.logIndexPrefix + "_" + SIMPLE_DATE_FORMAT.format(new Date()); try { - elasticSearchClient.admin().indices().prepareGetIndex().addIndices(logIndexName).execute().actionGet(); + elasticSearchClient.admin() + .indices() + .prepareGetIndex() + .addIndices(logIndexName) + .execute() + .actionGet(); } catch (IndexNotFoundException infe) { try { - elasticSearchClient.admin().indices().prepareCreate(logIndexName).execute().actionGet(); + elasticSearchClient.admin() + .indices() + .prepareCreate(logIndexName) + .execute() + .actionGet(); } catch (ResourceAlreadyExistsException ilee) { - + // no-op } catch (Exception e) { logger.error("Failed to update log index name: {}", logIndexName, e); } @@ -225,26 +247,35 @@ private void updateIndexName() { */ private void initIndex() throws Exception { - //0. Add the tasklog template - GetIndexTemplatesResponse result = elasticSearchClient.admin().indices().prepareGetTemplates("tasklog_template").execute().actionGet(); - if(result.getIndexTemplates().isEmpty()) { + // 0. Add the tasklog template + GetIndexTemplatesResponse result = elasticSearchClient.admin() + .indices() + .prepareGetTemplates("tasklog_template") + .execute() + .actionGet(); + + if (result.getIndexTemplates().isEmpty()) { logger.info("Creating the index template 'tasklog_template'"); - InputStream stream = ElasticSearchDAOV5.class.getResourceAsStream("/template_tasklog.json"); + InputStream stream = ElasticSearchDAOV5.class + .getResourceAsStream("/template_tasklog.json"); byte[] templateSource = IOUtils.toByteArray(stream); try { - elasticSearchClient.admin().indices().preparePutTemplate("tasklog_template").setSource(templateSource, XContentType.JSON).execute().actionGet(); - }catch(Exception e) { + elasticSearchClient.admin() + .indices() + .preparePutTemplate("tasklog_template") + .setSource(templateSource, XContentType.JSON) + .execute() + .actionGet(); + } catch (Exception e) { logger.error("Failed to init tasklog_template", e); } } - } @Override public void indexWorkflow(Workflow workflow) { try { - String id = workflow.getWorkflowId(); WorkflowSummary summary = new WorkflowSummary(workflow); byte[] doc = objectMapper.writeValueAsBytes(summary); @@ -254,8 +285,7 @@ public void indexWorkflow(Workflow workflow) { req.upsert(doc, XContentType.JSON); req.retryOnConflict(5); updateWithRetry(req, "Index workflow into doc_type workflow"); - - } catch (Throwable e) { + } catch (Exception e) { logger.error("Failed to index workflow: {}", workflow.getWorkflowId(), e); } } @@ -268,7 +298,6 @@ public CompletableFuture asyncIndexWorkflow(Workflow workflow) { @Override public void indexTask(Task task) { try { - String id = task.getTaskId(); TaskSummary summary = new TaskSummary(task); byte[] doc = objectMapper.writeValueAsBytes(summary); @@ -277,8 +306,7 @@ public void indexTask(Task task) { req.doc(doc, XContentType.JSON); req.upsert(doc, XContentType.JSON); updateWithRetry(req, "Index workflow into doc_type workflow"); - - } catch (Throwable e) { + } catch (Exception e) { logger.error("Failed to index task: {}", task.getTaskId(), e); } } @@ -288,12 +316,12 @@ public CompletableFuture asyncIndexTask(Task task) { return CompletableFuture.runAsync(() -> indexTask(task), executorService); } - @Override public void addTaskExecutionLogs(List taskExecLogs) { if (taskExecLogs.isEmpty()) { return; } + try { BulkRequestBuilder bulkRequestBuilder = elasticSearchClient.prepareBulk(); for (TaskExecLog log : taskExecLogs) { @@ -301,10 +329,18 @@ public void addTaskExecutionLogs(List taskExecLogs) { request.source(objectMapper.writeValueAsBytes(log), XContentType.JSON); bulkRequestBuilder.add(request); } - new RetryUtil().retryOnException(() -> bulkRequestBuilder.execute().actionGet(), null , - BulkResponse::hasFailures, RETRY_COUNT, "Indexing all execution logs into doc_type task", "addTaskExecutionLogs"); - } catch (Throwable e) { - List taskIds = taskExecLogs.stream().map(TaskExecLog::getTaskId).collect(Collectors.toList()); + new RetryUtil().retryOnException( + () -> bulkRequestBuilder.execute().actionGet(), + null, + BulkResponse::hasFailures, + RETRY_COUNT, + "Indexing all execution logs into doc_type task", + "addTaskExecutionLogs" + ); + } catch (Exception e) { + List taskIds = taskExecLogs.stream() + .map(TaskExecLog::getTaskId) + .collect(Collectors.toList()); logger.error("Failed to index task execution logs for tasks: ", taskIds, e); } } @@ -324,22 +360,28 @@ public List getTaskExecutionLogs(String taskId) { QueryStringQueryBuilder stringQuery = QueryBuilders.queryStringQuery("*"); BoolQueryBuilder fq = QueryBuilders.boolQuery().must(stringQuery).must(filterQuery); + FieldSortBuilder sortBuilder = SortBuilders.fieldSort("createdTime") + .order(SortOrder.ASC); final SearchRequestBuilder srb = elasticSearchClient.prepareSearch(logIndexPrefix + "*") - .setQuery(fq).setTypes(LOG_DOC_TYPE) - .addSort(SortBuilders.fieldSort("createdTime") - .order(SortOrder.ASC)); - SearchResponse response = srb.execute().actionGet(); - SearchHit[] hits = response.getHits().getHits(); - List logs = new ArrayList<>(hits.length); - for(SearchHit hit : hits) { - String source = hit.getSourceAsString(); - TaskExecLog tel = objectMapper.readValue(source, TaskExecLog.class); - logs.add(tel); - } + .setQuery(fq) + .setTypes(LOG_DOC_TYPE) + .addSort(sortBuilder); - return logs; + SearchResponse response = srb.execute().actionGet(); - }catch(Exception e) { + return Arrays.stream(response.getHits().getHits()) + .map(hit -> { + String source = hit.getSourceAsString(); + try { + return objectMapper.readValue(source, TaskExecLog.class); + } catch (IOException e) { + logger.error("exception deserializing taskExecLog: {}", source); + } + return null; + }) + .filter(taskExecLog -> Objects.nonNull(taskExecLog)) + .collect(Collectors.toList()); + } catch (Exception e) { logger.error("Failed to get task execution logs for task: {}", taskId, e); } @@ -356,9 +398,14 @@ public void addMessage(String queue, Message message) { IndexRequest request = new IndexRequest(logIndexName, MSG_DOC_TYPE); request.source(doc); try { - new RetryUtil<>().retryOnException(() -> elasticSearchClient.index(request).actionGet(), null, - null, RETRY_COUNT, "Indexing document in for docType: message", "addMessage"); - } catch (Throwable e) { + new RetryUtil<>().retryOnException( + () -> elasticSearchClient.index(request).actionGet(), + null, + null, + RETRY_COUNT, + "Indexing document in for docType: message", "addMessage" + ); + } catch (Exception e) { logger.error("Failed to index message: {}", message.getId(), e); } } @@ -367,13 +414,15 @@ public void addMessage(String queue, Message message) { public void addEventExecution(EventExecution eventExecution) { try { byte[] doc = objectMapper.writeValueAsBytes(eventExecution); - String id = eventExecution.getName() + "." + eventExecution.getEvent() + "." + eventExecution.getMessageId() + "." + eventExecution.getId(); + String id = + eventExecution.getName() + "." + eventExecution.getEvent() + "." + eventExecution + .getMessageId() + "." + eventExecution.getId(); UpdateRequest req = new UpdateRequest(logIndexName, EVENT_DOC_TYPE, id); req.doc(doc, XContentType.JSON); req.upsert(doc, XContentType.JSON); req.retryOnConflict(5); updateWithRetry(req, "Update Event execution for doc_type event"); - } catch (Throwable e) { + } catch (Exception e) { logger.error("Failed to index event execution: {}", eventExecution.getId(), e); } } @@ -385,29 +434,36 @@ public CompletableFuture asyncAddEventExecution(EventExecution eventExecut private void updateWithRetry(UpdateRequest request, String operationDescription) { try { - new RetryUtil().retryOnException(() -> elasticSearchClient.update(request).actionGet(), null, - null, RETRY_COUNT, operationDescription, "updateWithRetry"); + new RetryUtil().retryOnException( + () -> elasticSearchClient.update(request).actionGet(), + null, + null, + RETRY_COUNT, + operationDescription, + "updateWithRetry" + ); } catch (Exception e) { Monitors.error(className, "index"); - logger.error("Failed to index {} for request type: {}", request.index(), request.type(), e); + logger.error("Failed to index {} for request type: {}", request.index(), request.type(), + e); } } @Override - public SearchResult searchWorkflows(String query, String freeText, int start, int count, List sort) { + public SearchResult searchWorkflows(String query, String freeText, int start, int count, + List sort) { try { - - return search(query, start, count, sort, freeText, WORKFLOW_DOC_TYPE); - + return search(indexName, query, start, count, sort, freeText, WORKFLOW_DOC_TYPE); } catch (ParserException e) { throw new ApplicationException(Code.BACKEND_ERROR, e.getMessage(), e); } } @Override - public SearchResult searchTasks(String query, String freeText, int start, int count, List sort) { + public SearchResult searchTasks(String query, String freeText, int start, int count, + List sort) { try { - return search(query, start, count, sort, freeText, TASK_DOC_TYPE); + return search(indexName, query, start, count, sort, freeText, TASK_DOC_TYPE); } catch (ParserException e) { throw new ApplicationException(Code.BACKEND_ERROR, e.getMessage(), e); } @@ -421,7 +477,7 @@ public void removeWorkflow(String workflowId) { if (response.getResult() == DocWriteResponse.Result.DELETED) { logger.error("Index removal failed - document not found by id: {}", workflowId); } - } catch (Throwable e) { + } catch (Exception e) { logger.error("Failed to remove workflow {} from index", workflowId, e); Monitors.error(className, "remove"); } @@ -435,43 +491,56 @@ public CompletableFuture asyncRemoveWorkflow(String workflowId) { @Override public void updateWorkflow(String workflowInstanceId, String[] keys, Object[] values) { if (keys.length != values.length) { - throw new ApplicationException(Code.INVALID_INPUT, "Number of keys and values do not match"); + throw new ApplicationException(Code.INVALID_INPUT, + "Number of keys and values do not match"); } UpdateRequest request = new UpdateRequest(indexName, WORKFLOW_DOC_TYPE, workflowInstanceId); - Map source = IntStream.range(0, keys.length).boxed() - .collect(Collectors.toMap(i -> keys[i], i -> values[i])); + Map source = IntStream.range(0, keys.length) + .boxed() + .collect(Collectors.toMap(i -> keys[i], i -> values[i])); request.doc(source); logger.debug("Updating workflow {} with {}", workflowInstanceId, source); - new RetryUtil<>().retryOnException(() -> elasticSearchClient.update(request), null, null, RETRY_COUNT, - "Updating index for doc_type workflow", "updateWorkflow"); + new RetryUtil<>().retryOnException( + () -> elasticSearchClient.update(request), + null, + null, + RETRY_COUNT, + "Updating index for doc_type workflow", + "updateWorkflow" + ); } @Override - public CompletableFuture asyncUpdateWorkflow(String workflowInstanceId, String[] keys, Object[] values) { + public CompletableFuture asyncUpdateWorkflow(String workflowInstanceId, String[] keys, + Object[] values) { return CompletableFuture.runAsync(() -> updateWorkflow(workflowInstanceId, keys, values), executorService); } @Override public String get(String workflowInstanceId, String fieldToGet) { GetRequest request = new GetRequest(indexName, WORKFLOW_DOC_TYPE, workflowInstanceId) - .fetchSourceContext(new FetchSourceContext(true, new String[]{fieldToGet}, Strings.EMPTY_ARRAY)); + .fetchSourceContext( + new FetchSourceContext(true, new String[]{fieldToGet}, Strings.EMPTY_ARRAY)); GetResponse response = elasticSearchClient.get(request).actionGet(); - if (response.isExists()){ + if (response.isExists()) { Map sourceAsMap = response.getSourceAsMap(); - if (sourceAsMap.containsKey(fieldToGet)){ + if (sourceAsMap.containsKey(fieldToGet)) { return sourceAsMap.get(fieldToGet).toString(); } } - logger.debug("Unable to find Workflow: {} in ElasticSearch index: {}.", workflowInstanceId, indexName); + logger.debug("Unable to find Workflow: {} in ElasticSearch index: {}.", workflowInstanceId, + indexName); return null; } - private SearchResult search(String structuredQuery, int start, int size, List sortOptions, String freeTextQuery, String docType) throws ParserException { + private SearchResult search(String indexName, String structuredQuery, int start, + int size, + List sortOptions, String freeTextQuery, String docType) throws ParserException { QueryBuilder queryBuilder = QueryBuilders.matchAllQuery(); - if(StringUtils.isNotEmpty(structuredQuery)) { + if (StringUtils.isNotEmpty(structuredQuery)) { Expression expression = Expression.fromString(structuredQuery); queryBuilder = expression.getFilterBuilder(); } @@ -479,72 +548,82 @@ private SearchResult search(String structuredQuery, int start, int size, BoolQueryBuilder filterQuery = QueryBuilders.boolQuery().must(queryBuilder); QueryStringQueryBuilder stringQuery = QueryBuilders.queryStringQuery(freeTextQuery); BoolQueryBuilder fq = QueryBuilders.boolQuery().must(stringQuery).must(filterQuery); - final SearchRequestBuilder srb = elasticSearchClient.prepareSearch(indexName).setQuery(fq).setTypes(docType).storedFields("_id").setFrom(start).setSize(size); - if(sortOptions != null){ - sortOptions.forEach(sortOption -> { - SortOrder order = SortOrder.ASC; - String field = sortOption; - int indx = sortOption.indexOf(':'); - if(indx > 0){ //Can't be 0, need the field name at-least - field = sortOption.substring(0, indx); - order = SortOrder.valueOf(sortOption.substring(indx+1)); - } - srb.addSort(field, order); - }); + final SearchRequestBuilder srb = elasticSearchClient.prepareSearch(indexName) + .setQuery(fq) + .setTypes(docType) + .storedFields("_id") + .setFrom(start) + .setSize(size); + + if (sortOptions != null) { + sortOptions.forEach(sortOption -> addSortOptionToSearchRequest(srb, sortOption)); } - List result = new LinkedList(); + SearchResponse response = srb.get(); - response.getHits().forEach(hit -> { - result.add(hit.getId()); - }); + + LinkedList result = StreamSupport.stream(response.getHits().spliterator(), false) + .map(SearchHit::getId) + .collect(Collectors.toCollection(LinkedList::new)); long count = response.getHits().getTotalHits(); + return new SearchResult(count, result); } + private void addSortOptionToSearchRequest(SearchRequestBuilder searchRequestBuilder, + String sortOption) { + SortOrder order = SortOrder.ASC; + String field = sortOption; + int indx = sortOption.indexOf(':'); + if (indx > 0) { // Can't be 0, need the field name at-least + field = sortOption.substring(0, indx); + order = SortOrder.valueOf(sortOption.substring(indx + 1)); + } + searchRequestBuilder.addSort(field, order); + } + @Override public List searchArchivableWorkflows(String indexName, long archiveTtlDays) { QueryBuilder q = QueryBuilders.boolQuery() - .must(QueryBuilders.rangeQuery("endTime").lt(LocalDate.now().minusDays(archiveTtlDays).toString())) - .should(QueryBuilders.termQuery("status", "COMPLETED")) - .should(QueryBuilders.termQuery("status", "FAILED")) - .mustNot(QueryBuilders.existsQuery("archived")) - .minimumShouldMatch(1); + .must(QueryBuilders.rangeQuery("endTime") + .lt(LocalDate.now().minusDays(archiveTtlDays).toString())) + .should(QueryBuilders.termQuery("status", "COMPLETED")) + .should(QueryBuilders.termQuery("status", "FAILED")) + .mustNot(QueryBuilders.existsQuery("archived")) + .minimumShouldMatch(1); SearchRequestBuilder s = elasticSearchClient.prepareSearch(indexName) - .setTypes("workflow") - .setQuery(q) - .setSize(1000); + .setTypes("workflow") + .setQuery(q) + .setSize(1000); + SearchResponse response = s.execute().actionGet(); + SearchHits hits = response.getHits(); - List ids = new LinkedList<>(); - for (SearchHit hit : hits.getHits()) { - ids.add(hit.getId()); - } - return ids; + return Arrays.stream(hits.getHits()) + .map(hit -> hit.getId()) + .collect(Collectors.toCollection(LinkedList::new)); } - public List searchRecentRunningWorkflows(int lastModifiedHoursAgoFrom, int lastModifiedHoursAgoTo) { + @Override + public List searchRecentRunningWorkflows(int lastModifiedHoursAgoFrom, + int lastModifiedHoursAgoTo) { DateTime dateTime = new DateTime(); QueryBuilder q = QueryBuilders.boolQuery() - .must(QueryBuilders.rangeQuery("updateTime") - .gt(dateTime.minusHours(lastModifiedHoursAgoFrom))) - .must(QueryBuilders.rangeQuery("updateTime") - .lt(dateTime.minusHours(lastModifiedHoursAgoTo))) - .must(QueryBuilders.termQuery("status", "RUNNING")); + .must(QueryBuilders.rangeQuery("updateTime") + .gt(dateTime.minusHours(lastModifiedHoursAgoFrom))) + .must(QueryBuilders.rangeQuery("updateTime") + .lt(dateTime.minusHours(lastModifiedHoursAgoTo))) + .must(QueryBuilders.termQuery("status", "RUNNING")); SearchRequestBuilder s = elasticSearchClient.prepareSearch(indexName) - .setTypes("workflow") - .setQuery(q) - .setSize(5000) - .addSort("updateTime",SortOrder.ASC); + .setTypes("workflow") + .setQuery(q) + .setSize(5000) + .addSort("updateTime", SortOrder.ASC); SearchResponse response = s.execute().actionGet(); - SearchHits hits = response.getHits(); - List ids = new LinkedList<>(); - for (SearchHit hit : hits.getHits()) { - ids.add(hit.getId()); - } - return ids; + return StreamSupport.stream(response.getHits().spliterator(), false) + .map(hit -> hit.getId()) + .collect(Collectors.toCollection(LinkedList::new)); } - -} \ No newline at end of file +} diff --git a/es5-persistence/src/main/java/com/netflix/conductor/dao/es5/index/ElasticSearchRestDAOV5.java b/es5-persistence/src/main/java/com/netflix/conductor/dao/es5/index/ElasticSearchRestDAOV5.java index 0b7c19671e..ce6b2a3df8 100644 --- a/es5-persistence/src/main/java/com/netflix/conductor/dao/es5/index/ElasticSearchRestDAOV5.java +++ b/es5-persistence/src/main/java/com/netflix/conductor/dao/es5/index/ElasticSearchRestDAOV5.java @@ -77,15 +77,14 @@ public class ElasticSearchRestDAOV5 implements IndexDAO { private static final int RETRY_COUNT = 3; private static final String WORKFLOW_DOC_TYPE = "workflow"; - private static final String TASK_DOC_TYPE = "task"; - private static final String LOG_DOC_TYPE = "task_log"; - private static final String EVENT_DOC_TYPE = "event"; - private static final String MSG_DOC_TYPE = "message"; + private static final TimeZone GMT = TimeZone.getTimeZone("GMT"); + private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat("yyyyMMww"); + private @interface HttpMethod { String GET = "GET"; String POST = "POST"; @@ -95,18 +94,14 @@ public class ElasticSearchRestDAOV5 implements IndexDAO { private static final String className = ElasticSearchRestDAOV5.class.getSimpleName(); - private String indexName; - private String logIndexPrefix; + private final String indexName; + private final String logIndexPrefix; private String logIndexName; - - private ObjectMapper objectMapper; - private RestHighLevelClient elasticSearchClient; - private RestClient elasticSearchAdminClient; - + private final ObjectMapper objectMapper; + private final RestHighLevelClient elasticSearchClient; + private final RestClient elasticSearchAdminClient; private final ExecutorService executorService; - private static final TimeZone GMT = TimeZone.getTimeZone("GMT"); - private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat("yyyyMMww"); static { SIMPLE_DATE_FORMAT.setTimeZone(GMT); } diff --git a/es5-persistence/src/main/java/com/netflix/conductor/elasticsearch/ElasticSearchRestClientProvider.java b/es5-persistence/src/main/java/com/netflix/conductor/elasticsearch/ElasticSearchRestClientProvider.java index c51207af8d..7bc2cc6da2 100644 --- a/es5-persistence/src/main/java/com/netflix/conductor/elasticsearch/ElasticSearchRestClientProvider.java +++ b/es5-persistence/src/main/java/com/netflix/conductor/elasticsearch/ElasticSearchRestClientProvider.java @@ -24,9 +24,10 @@ public RestClient get() { } private HttpHost[] convertToHttpHosts(List hosts) { - List list = hosts.stream().map(host -> - new HttpHost(host.getHost(), host.getPort(), host.getScheme())) - .collect(Collectors.toList()); + List list = hosts.stream() + .map(host -> new HttpHost(host.getHost(), host.getPort(), host.getScheme())) + .collect(Collectors.toList()); + return list.toArray(new HttpHost[list.size()]); } } diff --git a/es5-persistence/src/test/java/com/netflix/conductor/com/netflix/conductor/elasticsearch/TestElasticSearchRestConfiguration.java b/es5-persistence/src/test/java/com/netflix/conductor/com/netflix/conductor/elasticsearch/TestElasticSearchRestConfiguration.java deleted file mode 100644 index a216d363cc..0000000000 --- a/es5-persistence/src/test/java/com/netflix/conductor/com/netflix/conductor/elasticsearch/TestElasticSearchRestConfiguration.java +++ /dev/null @@ -1,11 +0,0 @@ -package com.netflix.conductor.com.netflix.conductor.elasticsearch; - -import com.netflix.conductor.elasticsearch.ElasticSearchConfiguration; -import com.netflix.conductor.elasticsearch.SystemPropertiesElasticSearchConfiguration; - -public class TestElasticSearchRestConfiguration extends SystemPropertiesElasticSearchConfiguration implements ElasticSearchConfiguration { - - public String getURL() { - return "http://localhost:9200"; - } -} diff --git a/es5-persistence/src/test/java/com/netflix/conductor/dao/es5/index/TestElasticSearchDAOV5.java b/es5-persistence/src/test/java/com/netflix/conductor/dao/es5/index/TestElasticSearchDAOV5.java new file mode 100644 index 0000000000..85b43a196d --- /dev/null +++ b/es5-persistence/src/test/java/com/netflix/conductor/dao/es5/index/TestElasticSearchDAOV5.java @@ -0,0 +1,402 @@ +package com.netflix.conductor.dao.es5.index; + +import static org.awaitility.Awaitility.await; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.netflix.conductor.common.metadata.events.EventExecution; +import com.netflix.conductor.common.metadata.events.EventHandler.Action.Type; +import com.netflix.conductor.common.metadata.tasks.Task; +import com.netflix.conductor.common.metadata.tasks.Task.Status; +import com.netflix.conductor.common.metadata.tasks.TaskExecLog; +import com.netflix.conductor.common.run.SearchResult; +import com.netflix.conductor.common.run.Workflow; +import com.netflix.conductor.core.events.queue.Message; +import com.netflix.conductor.dao.es5.index.query.parser.Expression; +import com.netflix.conductor.elasticsearch.ElasticSearchConfiguration; +import com.netflix.conductor.elasticsearch.ElasticSearchTransportClientProvider; +import com.netflix.conductor.elasticsearch.EmbeddedElasticSearch; +import com.netflix.conductor.elasticsearch.SystemPropertiesElasticSearchConfiguration; +import com.netflix.conductor.elasticsearch.es5.EmbeddedElasticSearchV5; +import com.netflix.conductor.elasticsearch.query.parser.ParserException; +import java.text.SimpleDateFormat; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.query.QueryStringQueryBuilder; +import org.elasticsearch.search.SearchHit; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestElasticSearchDAOV5 { + + private static final Logger logger = LoggerFactory.getLogger(TestElasticSearchDAOV5.class); + + private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat("yyyyMMww"); + + private static final String MSG_DOC_TYPE = "message"; + private static final String EVENT_DOC_TYPE = "event"; + private static final String LOG_INDEX_PREFIX = "task_log"; + + private static ElasticSearchConfiguration configuration; + private static Client elasticSearchClient; + private static ElasticSearchDAOV5 indexDAO; + private static EmbeddedElasticSearch embeddedElasticSearch; + + private Workflow workflow; + + @BeforeClass + public static void startServer() throws Exception { + System.setProperty(ElasticSearchConfiguration.EMBEDDED_PORT_PROPERTY_NAME, "9203"); + System.setProperty(ElasticSearchConfiguration.ELASTIC_SEARCH_URL_PROPERTY_NAME, "localhost:9303"); + + configuration = new SystemPropertiesElasticSearchConfiguration(); + String host = configuration.getEmbeddedHost(); + int port = configuration.getEmbeddedPort(); + String clusterName = configuration.getEmbeddedClusterName(); + + embeddedElasticSearch = new EmbeddedElasticSearchV5(clusterName, host, port); + embeddedElasticSearch.start(); + + ElasticSearchTransportClientProvider transportClientProvider = + new ElasticSearchTransportClientProvider(configuration); + elasticSearchClient = transportClientProvider.get(); + + elasticSearchClient.admin() + .cluster() + .prepareHealth() + .setWaitForGreenStatus() + .execute() + .get(); + + ObjectMapper objectMapper = new ObjectMapper(); + indexDAO = new ElasticSearchDAOV5(elasticSearchClient, configuration, objectMapper); + } + + @AfterClass + public static void closeClient() throws Exception { + if (elasticSearchClient != null) { + elasticSearchClient.close(); + } + + embeddedElasticSearch.stop(); + } + + @Before + public void createTestWorkflow() throws Exception { + // define indices + indexDAO.setup(); + + // initialize workflow + workflow = new Workflow(); + workflow.getInput().put("requestId", "request id 001"); + workflow.getInput().put("hasAwards", true); + workflow.getInput().put("channelMapping", 5); + Map name = new HashMap<>(); + name.put("name", "The Who"); + name.put("year", 1970); + Map name2 = new HashMap<>(); + name2.put("name", "The Doors"); + name2.put("year", 1975); + + List names = new LinkedList<>(); + names.add(name); + names.add(name2); + + workflow.getOutput().put("name", name); + workflow.getOutput().put("names", names); + workflow.getOutput().put("awards", 200); + + Task task = new Task(); + task.setReferenceTaskName("task2"); + task.getOutputData().put("location", "http://location"); + task.setStatus(Task.Status.COMPLETED); + + Task task2 = new Task(); + task2.setReferenceTaskName("task3"); + task2.getOutputData().put("refId", "abcddef_1234_7890_aaffcc"); + task2.setStatus(Task.Status.SCHEDULED); + + workflow.getTasks().add(task); + workflow.getTasks().add(task2); + } + + @After + public void tearDown() { + deleteAllIndices(); + } + + private void deleteAllIndices() { + + ImmutableOpenMap indices = elasticSearchClient.admin().cluster() + .prepareState().get().getState() + .getMetaData().getIndices(); + + indices.forEach(cursor -> { + try { + elasticSearchClient.admin() + .indices() + .delete(new DeleteIndexRequest(cursor.value.getIndex().getName())) + .get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + } + + private boolean indexExists(final String index) { + IndicesExistsRequest request = new IndicesExistsRequest(index); + try { + return elasticSearchClient.admin().indices().exists(request).get().isExists(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + private boolean doesMappingExist(final String index, final String mappingName) { + GetMappingsRequest request = new GetMappingsRequest() + .indices(index); + try { + GetMappingsResponse response = elasticSearchClient.admin() + .indices() + .getMappings(request) + .get(); + + return response.getMappings() + .get(index) + .containsKey(mappingName); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + @Test + public void assertInitialSetup() throws Exception { + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMww"); + dateFormat.setTimeZone(TimeZone.getTimeZone("GMT")); + + String taskLogIndex = "task_log_" + dateFormat.format(new Date()); + + assertTrue("Index 'conductor' should exist", indexExists("conductor")); + assertTrue("Index '" + taskLogIndex + "' should exist", indexExists(taskLogIndex)); + + assertTrue("Mapping 'workflow' for index 'conductor' should exist", doesMappingExist("conductor", "workflow")); + assertTrue("Mapping 'task' for index 'conductor' should exist", doesMappingExist("conductor", "task")); + } + + @Test + public void testWorkflowCRUD() throws Exception { + String testWorkflowType = "testworkflow"; + String testId = "1"; + + workflow.setWorkflowId(testId); + workflow.setWorkflowType(testWorkflowType); + + // Create + String workflowType = indexDAO.get(testId, "workflowType"); + assertNull("Workflow should not exist", workflowType); + + // Get + indexDAO.indexWorkflow(workflow); + + workflowType = indexDAO.get(testId, "workflowType"); + assertEquals("Should have found our workflow type", testWorkflowType, workflowType); + + // Update + String newWorkflowType = "newworkflowtype"; + String[] keyChanges = {"workflowType"}; + String[] valueChanges = {newWorkflowType}; + + indexDAO.updateWorkflow(testId, keyChanges, valueChanges); + + await() + .atMost(3, TimeUnit.SECONDS) + .untilAsserted( + () -> { + String actualWorkflowType = indexDAO.get(testId, "workflowType"); + assertEquals("Should have updated our new workflow type", newWorkflowType, actualWorkflowType); + } + ); + + // Delete + indexDAO.removeWorkflow(testId); + + workflowType = indexDAO.get(testId, "workflowType"); + assertNull("We should no longer have our workflow in the system", workflowType); + } + + @Test + public void taskExecutionLogs() throws Exception { + TaskExecLog taskExecLog1 = new TaskExecLog(); + taskExecLog1.setTaskId("some-task-id"); + long createdTime1 = LocalDateTime.of(2018, 11, 01, 06, 33, 22) + .toEpochSecond(ZoneOffset.UTC); + taskExecLog1.setCreatedTime(createdTime1); + taskExecLog1.setLog("some-log"); + TaskExecLog taskExecLog2 = new TaskExecLog(); + taskExecLog2.setTaskId("some-task-id"); + long createdTime2 = LocalDateTime.of(2018, 11, 01, 06, 33, 22) + .toEpochSecond(ZoneOffset.UTC); + taskExecLog2.setCreatedTime(createdTime2); + taskExecLog2.setLog("some-log"); + List logsToAdd = Arrays.asList(taskExecLog1, taskExecLog2); + indexDAO.addTaskExecutionLogs(logsToAdd); + + await() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + List taskExecutionLogs = indexDAO.getTaskExecutionLogs("some-task-id"); + assertEquals(2, taskExecutionLogs.size()); + }); + } + + @Test + public void indexTask() throws Exception { + String correlationId = "some-correlation-id"; + + Task task = new Task(); + task.setTaskId("some-task-id"); + task.setWorkflowInstanceId("some-workflow-instance-id"); + task.setTaskType("some-task-type"); + task.setStatus(Status.FAILED); + task.setInputData(new HashMap() {{ put("input_key", "input_value"); }}); + task.setCorrelationId(correlationId); + task.setTaskDefName("some-task-def-name"); + task.setReasonForIncompletion("some-failure-reason"); + + indexDAO.indexTask(task); + + await() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + SearchResult result = indexDAO + .searchTasks("correlationId='" + correlationId + "'", "*", 0, 10000, null); + + assertTrue("should return 1 or more search results", result.getResults().size() > 0); + assertEquals("taskId should match the indexed task", "some-task-id", result.getResults().get(0)); + }); + } + + @Test + public void addMessage() { + String messageId = "some-message-id"; + + Message message = new Message(); + message.setId(messageId); + message.setPayload("some-payload"); + message.setReceipt("some-receipt"); + + indexDAO.addMessage("some-queue", message); + + await() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + SearchResponse searchResponse = search( + LOG_INDEX_PREFIX + "*", + "messageId='" + messageId + "'", + 0, + 10000, + "*", + MSG_DOC_TYPE + ); + assertEquals("search results should be length 1", searchResponse.getHits().getTotalHits(), 1); + + SearchHit searchHit = searchResponse.getHits().getAt(0); + GetResponse response = elasticSearchClient + .prepareGet(searchHit.getIndex(), MSG_DOC_TYPE, searchHit.getId()) + .get(); + assertEquals("indexed message id should match", messageId, response.getSource().get("messageId")); + assertEquals("indexed payload should match", "some-payload", response.getSource().get("payload")); + }); + } + + @Test + public void addEventExecution() { + String messageId = "some-message-id"; + + EventExecution eventExecution = new EventExecution(); + eventExecution.setId("some-id"); + eventExecution.setMessageId(messageId); + eventExecution.setAction(Type.complete_task); + eventExecution.setEvent("some-event"); + eventExecution.setStatus(EventExecution.Status.COMPLETED); + + indexDAO.addEventExecution(eventExecution); + + await() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + SearchResponse searchResponse = search( + LOG_INDEX_PREFIX + "*", + "messageId='" + messageId + "'", + 0, + 10000, + "*", + EVENT_DOC_TYPE + ); + + assertEquals("search results should be length 1", searchResponse.getHits().getTotalHits(), 1); + + SearchHit searchHit = searchResponse.getHits().getAt(0); + GetResponse response = elasticSearchClient + .prepareGet(searchHit.getIndex(), EVENT_DOC_TYPE, searchHit.getId()) + .get(); + + assertEquals("indexed message id should match", messageId, response.getSource().get("messageId")); + assertEquals("indexed id should match", "some-id", response.getSource().get("id")); + assertEquals("indexed status should match", EventExecution.Status.COMPLETED.name(), response.getSource().get("status")); + }); + } + + + private SearchResponse search(String indexName, String structuredQuery, int start, + int size, String freeTextQuery, String docType) throws ParserException { + QueryBuilder queryBuilder = QueryBuilders.matchAllQuery(); + if (StringUtils.isNotEmpty(structuredQuery)) { + Expression expression = Expression.fromString(structuredQuery); + queryBuilder = expression.getFilterBuilder(); + } + + BoolQueryBuilder filterQuery = QueryBuilders.boolQuery().must(queryBuilder); + QueryStringQueryBuilder stringQuery = QueryBuilders.queryStringQuery(freeTextQuery); + BoolQueryBuilder fq = QueryBuilders.boolQuery().must(stringQuery).must(filterQuery); + final SearchRequestBuilder srb = elasticSearchClient.prepareSearch(indexName) + .setQuery(fq) + .setTypes(docType) + .storedFields("_id") + .setFrom(start) + .setSize(size); + + return srb.get(); + } + +} diff --git a/es5-persistence/src/test/java/com/netflix/conductor/dao/es5/index/TestElasticSearchRestDAOV5.java b/es5-persistence/src/test/java/com/netflix/conductor/dao/es5/index/TestElasticSearchRestDAOV5.java index 7669910c90..267323741b 100644 --- a/es5-persistence/src/test/java/com/netflix/conductor/dao/es5/index/TestElasticSearchRestDAOV5.java +++ b/es5-persistence/src/test/java/com/netflix/conductor/dao/es5/index/TestElasticSearchRestDAOV5.java @@ -1,39 +1,100 @@ package com.netflix.conductor.dao.es5.index; +import static org.awaitility.Awaitility.await; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import com.amazonaws.util.IOUtils; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.netflix.conductor.com.netflix.conductor.elasticsearch.TestElasticSearchRestConfiguration; +import com.netflix.conductor.common.metadata.events.EventExecution; +import com.netflix.conductor.common.metadata.events.EventHandler.Action.Type; import com.netflix.conductor.common.metadata.tasks.Task; +import com.netflix.conductor.common.metadata.tasks.Task.Status; +import com.netflix.conductor.common.metadata.tasks.TaskExecLog; +import com.netflix.conductor.common.run.SearchResult; import com.netflix.conductor.common.run.Workflow; +import com.netflix.conductor.core.events.queue.Message; +import com.netflix.conductor.dao.es5.index.query.parser.Expression; import com.netflix.conductor.elasticsearch.ElasticSearchConfiguration; import com.netflix.conductor.elasticsearch.ElasticSearchRestClientProvider; import com.netflix.conductor.elasticsearch.EmbeddedElasticSearch; +import com.netflix.conductor.elasticsearch.SystemPropertiesElasticSearchConfiguration; import com.netflix.conductor.elasticsearch.es5.EmbeddedElasticSearchV5; +import com.netflix.conductor.elasticsearch.query.parser.ParserException; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.text.SimpleDateFormat; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; -import org.junit.*; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.query.QueryStringQueryBuilder; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.FieldSortBuilder; +import org.elasticsearch.search.sort.SortOrder; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.text.SimpleDateFormat; -import java.util.*; +public class TestElasticSearchRestDAOV5 { -import static org.junit.Assert.*; + private static final Logger logger = LoggerFactory.getLogger(TestElasticSearchRestDAOV5.class); + private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat("yyyyMMww"); -public class TestElasticSearchRestDAOV5 { + private static final String INDEX_NAME = "conductor"; + private static final String LOG_INDEX_PREFIX = "task_log"; - private static Logger logger = LoggerFactory.getLogger(TestElasticSearchRestDAOV5.class); + private static final String MSG_DOC_TYPE = "message"; + private static final String EVENT_DOC_TYPE = "event"; + private static ElasticSearchConfiguration configuration; private static RestClient restClient; + private static RestHighLevelClient elasticSearchClient; private static ElasticSearchRestDAOV5 indexDAO; private static EmbeddedElasticSearch embeddedElasticSearch; + private static ObjectMapper objectMapper; private Workflow workflow; + private @interface HttpMethod { + String GET = "GET"; + String POST = "POST"; + String PUT = "PUT"; + String HEAD = "HEAD"; + String DELETE = "DELETE"; + } + @BeforeClass - public static void setup() throws Exception { + public static void startServer() throws Exception { + System.setProperty(ElasticSearchConfiguration.EMBEDDED_PORT_PROPERTY_NAME, "9204"); + System.setProperty(ElasticSearchConfiguration.ELASTIC_SEARCH_URL_PROPERTY_NAME, "http://localhost:9204"); - ElasticSearchConfiguration configuration = new TestElasticSearchRestConfiguration(); + configuration = new SystemPropertiesElasticSearchConfiguration(); String host = configuration.getEmbeddedHost(); int port = configuration.getEmbeddedPort(); @@ -42,34 +103,19 @@ public static void setup() throws Exception { embeddedElasticSearch = new EmbeddedElasticSearchV5(clusterName, host, port); embeddedElasticSearch.start(); - ElasticSearchRestClientProvider restClientProvider = new ElasticSearchRestClientProvider(configuration); + ElasticSearchRestClientProvider restClientProvider = + new ElasticSearchRestClientProvider(configuration); restClient = restClientProvider.get(); - - long startTime = System.currentTimeMillis(); + elasticSearchClient = new RestHighLevelClient(restClient); Map params = new HashMap<>(); params.put("wait_for_status", "yellow"); params.put("timeout", "30s"); - while (true) { - try { - restClient.performRequest("GET", "/_cluster/health", params); - break; - } catch (IOException e) { - logger.info("No ES nodes available yet."); - } - Thread.sleep(10000); + restClient.performRequest("GET", "/_cluster/health", params); - if (System.currentTimeMillis() - startTime > 60000) { - logger.error("Unable to connect to the ES cluster in time."); - throw new RuntimeException("Unable to connect to ES cluster in time."); - } - } - - ObjectMapper objectMapper = new ObjectMapper(); + objectMapper = new ObjectMapper(); indexDAO = new ElasticSearchRestDAOV5(restClient, configuration, objectMapper); - indexDAO.setup(); - } @AfterClass @@ -82,8 +128,11 @@ public static void closeClient() throws Exception { } @Before - public void createTestWorkflow() { + public void createTestWorkflow() throws Exception { + // define indices + indexDAO.setup(); + // initialize workflow workflow = new Workflow(); workflow.getInput().put("requestId", "request id 001"); workflow.getInput().put("hasAwards", true); @@ -117,6 +166,26 @@ public void createTestWorkflow() { workflow.getTasks().add(task2); } + @After + public void tearDown() throws IOException { + deleteAllIndices(); + } + + private void deleteAllIndices() throws IOException { + Response beforeResponse = restClient.performRequest(HttpMethod.GET, "/_cat/indices"); + + Reader streamReader = new InputStreamReader(beforeResponse.getEntity().getContent()); + BufferedReader bufferedReader = new BufferedReader(streamReader); + + String line; + while ((line = bufferedReader.readLine()) != null) { + String[] fields = line.split("\\s"); + String endpoint = String.format("/%s", fields[2]); + + restClient.performRequest(HttpMethod.DELETE, endpoint); + } + } + private boolean indexExists(final String index) throws IOException { return indexDAO.doesResourceExist("/" + index); } @@ -176,4 +245,204 @@ public void testWorkflowCRUD() { assertNull("We should no longer have our workflow in the system", workflowType); } -} \ No newline at end of file + @Test + public void taskExecutionLogs() throws Exception { + TaskExecLog taskExecLog1 = new TaskExecLog(); + taskExecLog1.setTaskId("some-task-id"); + long createdTime1 = LocalDateTime.of(2018, 11, 01, 06, 33, 22) + .toEpochSecond(ZoneOffset.UTC); + taskExecLog1.setCreatedTime(createdTime1); + taskExecLog1.setLog("some-log"); + TaskExecLog taskExecLog2 = new TaskExecLog(); + taskExecLog2.setTaskId("some-task-id"); + long createdTime2 = LocalDateTime.of(2018, 11, 01, 06, 33, 22) + .toEpochSecond(ZoneOffset.UTC); + taskExecLog2.setCreatedTime(createdTime2); + taskExecLog2.setLog("some-log"); + List logsToAdd = Arrays.asList(taskExecLog1, taskExecLog2); + indexDAO.addTaskExecutionLogs(logsToAdd); + + await() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + List taskExecutionLogs = indexDAO.getTaskExecutionLogs("some-task-id"); + assertEquals(2, taskExecutionLogs.size()); + }); + } + + @Test + public void indexTask() throws Exception { + String correlationId = "some-correlation-id"; + + Task task = new Task(); + task.setTaskId("some-task-id"); + task.setWorkflowInstanceId("some-workflow-instance-id"); + task.setTaskType("some-task-type"); + task.setStatus(Status.FAILED); + task.setInputData(new HashMap() {{ put("input_key", "input_value"); }}); + task.setCorrelationId(correlationId); + task.setTaskDefName("some-task-def-name"); + task.setReasonForIncompletion("some-failure-reason"); + + indexDAO.indexTask(task); + + await() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + SearchResult result = indexDAO + .searchTasks("correlationId='" + correlationId + "'", "*", 0, 10000, null); + + assertTrue("should return 1 or more search results", result.getResults().size() > 0); + assertEquals("taskId should match the indexed task", "some-task-id", result.getResults().get(0)); + }); + } + + @Test + public void addMessage() { + String messageId = "some-message-id"; + + Message message = new Message(); + message.setId(messageId); + message.setPayload("some-payload"); + message.setReceipt("some-receipt"); + + indexDAO.addMessage("some-queue", message); + + await() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + SearchResponse searchResponse = searchObjectIdsViaExpression( + LOG_INDEX_PREFIX + "*", + "messageId='" + messageId + "'", + 0, + 10000, + null, + "*", + MSG_DOC_TYPE + ); + assertTrue("should return 1 or more search results", searchResponse.getHits().getTotalHits() > 0); + + SearchHit searchHit = searchResponse.getHits().getAt(0); + String resourcePath = + String.format("/%s/%s/%s", searchHit.getIndex(), MSG_DOC_TYPE, searchHit.getId()); + Response response = restClient.performRequest(HttpMethod.GET, resourcePath); + + String responseBody = IOUtils.toString(response.getEntity().getContent()); + logger.info("responseBody: {}", responseBody); + + TypeReference> typeRef = + new TypeReference>() {}; + Map responseMap = objectMapper.readValue(responseBody, typeRef); + Map source = (Map) responseMap.get("_source"); + assertEquals("indexed message id should match", messageId, source.get("messageId")); + assertEquals("indexed payload should match", "some-payload", source.get("payload")); + }); + } + + @Test + public void addEventExecution() { + String messageId = "some-message-id"; + + EventExecution eventExecution = new EventExecution(); + eventExecution.setId("some-id"); + eventExecution.setMessageId(messageId); + eventExecution.setAction(Type.complete_task); + eventExecution.setEvent("some-event"); + eventExecution.setStatus(EventExecution.Status.COMPLETED); + + indexDAO.addEventExecution(eventExecution); + + await() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + SearchResponse searchResponse = searchObjectIdsViaExpression( + LOG_INDEX_PREFIX + "*", + "messageId='" + messageId + "'", + 0, + 10000, + null, + "*", + EVENT_DOC_TYPE + ); + assertTrue("should return 1 or more search results", searchResponse.getHits().getTotalHits() > 0); + + SearchHit searchHit = searchResponse.getHits().getAt(0); + String resourcePath = + String.format("/%s/%s/%s", searchHit.getIndex(), EVENT_DOC_TYPE, searchHit.getId()); + Response response = restClient.performRequest(HttpMethod.GET, resourcePath); + + String responseBody = IOUtils.toString(response.getEntity().getContent()); + TypeReference> typeRef = + new TypeReference>() { + }; + Map responseMap = objectMapper.readValue(responseBody, typeRef); + + Map sourceMap = (Map) responseMap.get("_source"); + assertEquals("indexed id should match", "some-id", sourceMap.get("id")); + assertEquals("indexed message id should match", messageId, sourceMap.get("messageId")); + assertEquals("indexed action should match", Type.complete_task.name(), sourceMap.get("action")); + assertEquals("indexed event should match", "some-event", sourceMap.get("event")); + assertEquals("indexed status should match", EventExecution.Status.COMPLETED.name(), sourceMap.get("status")); + }); + } + + private SearchResponse searchObjectIdsViaExpression(String indexName, String structuredQuery, int start, int size, + List sortOptions, String freeTextQuery, String docType) throws ParserException, IOException { + + // Build query + QueryBuilder queryBuilder = QueryBuilders.matchAllQuery(); + if(StringUtils.isNotEmpty(structuredQuery)) { + Expression expression = Expression.fromString(structuredQuery); + queryBuilder = expression.getFilterBuilder(); + } + + BoolQueryBuilder filterQuery = QueryBuilders.boolQuery().must(queryBuilder); + QueryStringQueryBuilder stringQuery = QueryBuilders.queryStringQuery(freeTextQuery); + BoolQueryBuilder fq = QueryBuilders.boolQuery().must(stringQuery).must(filterQuery); + + return searchObjectIds(indexName, fq, start, size, sortOptions, docType); + } + + /** + * Tries to find object ids for a given query in an index. + * + * @param indexName The name of the index. + * @param queryBuilder The query to use for searching. + * @param start The start to use. + * @param size The total return size. + * @param sortOptions A list of string options to sort in the form VALUE:ORDER; where ORDER is optional and can be either ASC OR DESC. + * @param docType The document type to searchObjectIdsViaExpression for. + * + * @return The SearchResults which includes the count and IDs that were found. + * @throws IOException If we cannot communicate with ES. + */ + private SearchResponse searchObjectIds(String indexName, QueryBuilder queryBuilder, int start, int size, List sortOptions, String docType) throws IOException { + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(queryBuilder); + searchSourceBuilder.from(start); + searchSourceBuilder.size(size); + + if (sortOptions != null && !sortOptions.isEmpty()) { + + for (String sortOption : sortOptions) { + SortOrder order = SortOrder.ASC; + String field = sortOption; + int index = sortOption.indexOf(":"); + if (index > 0) { + field = sortOption.substring(0, index); + order = SortOrder.valueOf(sortOption.substring(index + 1)); + } + searchSourceBuilder.sort(new FieldSortBuilder(field).order(order)); + } + } + + // Generate the actual request to send to ES. + SearchRequest searchRequest = new SearchRequest(indexName); + searchRequest.types(docType); + searchRequest.source(searchSourceBuilder); + + return elasticSearchClient.search(searchRequest); + } + +} diff --git a/es5-persistence/src/test/resources/log4j.properties b/es5-persistence/src/test/resources/log4j.properties index e4fcb01308..6db8d4b142 100644 --- a/es5-persistence/src/test/resources/log4j.properties +++ b/es5-persistence/src/test/resources/log4j.properties @@ -6,4 +6,6 @@ log4j.appender.A1=org.apache.log4j.ConsoleAppender # A1 uses PatternLayout. log4j.appender.A1.layout=org.apache.log4j.PatternLayout -log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n \ No newline at end of file +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +log4j.appender.org.apache.http=info diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/utils/MySQLTestModule.java b/test-harness/src/test/java/com/netflix/conductor/tests/utils/MySQLTestModule.java index a623551bcd..54830373a2 100644 --- a/test-harness/src/test/java/com/netflix/conductor/tests/utils/MySQLTestModule.java +++ b/test-harness/src/test/java/com/netflix/conductor/tests/utils/MySQLTestModule.java @@ -9,6 +9,8 @@ import com.netflix.conductor.common.utils.JsonMapperProvider; import com.netflix.conductor.core.config.Configuration; import com.netflix.conductor.core.config.CoreModule; +import com.netflix.conductor.core.execution.WorkflowStatusListener; +import com.netflix.conductor.core.execution.WorkflowStatusListenerStub; import com.netflix.conductor.dao.ExecutionDAO; import com.netflix.conductor.dao.IndexDAO; import com.netflix.conductor.dao.MetadataDAO; @@ -50,6 +52,7 @@ protected void configure() { bind(ExecutionDAO.class).to(MySQLExecutionDAO.class); bind(QueueDAO.class).to(MySQLQueueDAO.class); bind(IndexDAO.class).to(MockIndexDAO.class); + bind(WorkflowStatusListener.class).to(WorkflowStatusListenerStub.class); install(new CoreModule()); bind(UserTask.class).asEagerSingleton();