diff --git a/docker/server/config/config-local.properties b/docker/server/config/config-local.properties index b72d893d56..8916e8b594 100755 --- a/docker/server/config/config-local.properties +++ b/docker/server/config/config-local.properties @@ -35,7 +35,7 @@ queues.dynomite.nonQuorum.port=22122 # memory: The instance is created in memory and lost when the server dies. Useful for development and testing. # external: Elastic search instance runs outside of the server. Data is persisted and does not get lost when # the server dies. Useful for more stable environments like staging or production. -workflow.elasticsearch.instanceType=external +workflow.elasticsearch.instanceType=memory # Transport address to elasticsearch workflow.elasticsearch.url=localhost:9300 diff --git a/es5-persistence/README.md b/es5-persistence/README.md index 272dd6d516..fed42832df 100644 --- a/es5-persistence/README.md +++ b/es5-persistence/README.md @@ -1,2 +1,67 @@ +# ES5 Persistence + +This module provides ES5 persistence when indexing workflows and tasks. + ## Usage +This module uses the following configuration options: + +* `workflow.elasticsearch.instanceType` - This determines the type of ES instance we are using with conductor. +The two values are either `MEMORY` or `EXTERNAL`. +If `MEMORY`, then an embedded server will be run. +Default is `MEMORY`. +* `workflow.elasticsearch.url` - A comma separated list of schema/host/port of the ES nodes to communicate with. +Schema can be ignored when using `tcp` transport; otherwise, you must specify `http` or `https`. +If using the `http` or `https`, then conductor will use the REST transport protocol. +* `workflow.elasticsearch.index.name` - The name of the workflow and task index. +Defaults to `conductor` +* `workflow.elasticsearch.tasklog.index.name` - The name of the task log index. +Defaults to `task_log` + +### Embedded Configuration + +If `workflow.elasticsearch.instanceType=MEMORY`, then you can configure the embedded server using the following configurations: + +* `workflow.elasticsearch.embedded.port` - The starting port of the embedded server. +This is the port used for the TCP transport. +It will also use this + 100 in order to setup the http transport. +Default is `9200` +* `workflow.elasticsearch.embedded.cluster.name` - The name of the embedded cluster name. +Default is `elasticsearch_test` +* `workflow.elasticsearch.embedded.host` - The host of the embedded server. +Default is `127.0.0.1` + +### REST Transport + +If you are using AWS ElasticSearch, you should use the `rest` transport as that's the only version transport that they support. +However, this module currently only works with open IAM, VPC version of ElasticSearch. +Eventually, we should create ES modules that can be loaded in to support authentication and request signing, but this currently does not support that. + +### Example Configurations + +**In-memory ES with TCP transport** + +``` +workflow.elasticsearch.instanceType=MEMORY +``` + +**In-memory ES with REST transport** + +``` +workflow.elasticsearch.instanceType=MEMORY +workflow.elasticsearch.url=http://localhost:9300 +``` + +**ES with TCP transport** + +``` +workflow.elasticsearch.instanceType=EXTERNAL +workflow.elasticsearch.url=127.0.0.1:9300 +``` + +**ES with REST transport** + +``` +workflow.elasticsearch.instanceType=EXTERNAL +workflow.elasticsearch.url=http://127.0.0.1:9200 +``` diff --git a/es5-persistence/build.gradle b/es5-persistence/build.gradle index a36128e0b4..6206ca2edf 100644 --- a/es5-persistence/build.gradle +++ b/es5-persistence/build.gradle @@ -5,9 +5,13 @@ dependencies { compile "org.elasticsearch:elasticsearch:${revElasticSearch5}" compile "org.elasticsearch.client:transport:${revElasticSearch5}" + compile "org.elasticsearch.client:elasticsearch-rest-client:${revElasticSearch5}" compile "org.elasticsearch.client:elasticsearch-rest-high-level-client:${revElasticSearch5}" //ES5 Dependency compile "org.apache.logging.log4j:log4j-api:${revLog4jApi}" compile "org.apache.logging.log4j:log4j-core:${revLog4jCore}" + + testCompile "org.slf4j:slf4j-log4j12:${revSlf4jlog4j}" + testCompile "org.awaitility:awaitility:${revAwaitility}" } diff --git a/es5-persistence/dependencies.lock b/es5-persistence/dependencies.lock index 88f9de49a4..bc75e63283 100644 --- a/es5-persistence/dependencies.lock +++ b/es5-persistence/dependencies.lock @@ -113,6 +113,10 @@ "locked": "2.9.1", "requested": "2.9.1" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "locked": "5.6.8", + "requested": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "locked": "5.6.8", "requested": "5.6.8" @@ -246,6 +250,10 @@ "locked": "2.9.1", "requested": "2.9.1" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "locked": "5.6.8", + "requested": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "locked": "5.6.8", "requested": "5.6.8" @@ -379,6 +387,10 @@ "locked": "2.9.1", "requested": "2.9.1" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "locked": "5.6.8", + "requested": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "locked": "5.6.8", "requested": "5.6.8" @@ -522,6 +534,10 @@ "locked": "2.9.1", "requested": "2.9.1" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "locked": "5.6.8", + "requested": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "locked": "5.6.8", "requested": "5.6.8" @@ -655,6 +671,10 @@ "locked": "2.9.1", "requested": "2.9.1" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "locked": "5.6.8", + "requested": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "locked": "5.6.8", "requested": "5.6.8" @@ -792,6 +812,14 @@ "locked": "2.9.1", "requested": "2.9.1" }, + "org.awaitility:awaitility": { + "locked": "3.1.2", + "requested": "3.1.2" + }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "locked": "5.6.8", + "requested": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "locked": "5.6.8", "requested": "5.6.8" @@ -812,7 +840,11 @@ "firstLevelTransitive": [ "com.netflix.conductor:conductor-common" ], - "locked": "1.7.25" + "locked": "1.8.0-alpha1" + }, + "org.slf4j:slf4j-log4j12": { + "locked": "1.8.0-alpha1", + "requested": "1.8.0-alpha1" } }, "testCompileClasspath": { @@ -933,6 +965,14 @@ "locked": "2.9.1", "requested": "2.9.1" }, + "org.awaitility:awaitility": { + "locked": "3.1.2", + "requested": "3.1.2" + }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "locked": "5.6.8", + "requested": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "locked": "5.6.8", "requested": "5.6.8" @@ -953,7 +993,11 @@ "firstLevelTransitive": [ "com.netflix.conductor:conductor-common" ], - "locked": "1.7.25" + "locked": "1.8.0-alpha1" + }, + "org.slf4j:slf4j-log4j12": { + "locked": "1.8.0-alpha1", + "requested": "1.8.0-alpha1" } }, "testRuntime": { @@ -1074,6 +1118,14 @@ "locked": "2.9.1", "requested": "2.9.1" }, + "org.awaitility:awaitility": { + "locked": "3.1.2", + "requested": "3.1.2" + }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "locked": "5.6.8", + "requested": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "locked": "5.6.8", "requested": "5.6.8" @@ -1094,7 +1146,11 @@ "firstLevelTransitive": [ "com.netflix.conductor:conductor-common" ], - "locked": "1.7.25" + "locked": "1.8.0-alpha1" + }, + "org.slf4j:slf4j-log4j12": { + "locked": "1.8.0-alpha1", + "requested": "1.8.0-alpha1" } }, "testRuntimeClasspath": { @@ -1215,6 +1271,14 @@ "locked": "2.9.1", "requested": "2.9.1" }, + "org.awaitility:awaitility": { + "locked": "3.1.2", + "requested": "3.1.2" + }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "locked": "5.6.8", + "requested": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "locked": "5.6.8", "requested": "5.6.8" @@ -1235,7 +1299,11 @@ "firstLevelTransitive": [ "com.netflix.conductor:conductor-common" ], - "locked": "1.7.25" + "locked": "1.8.0-alpha1" + }, + "org.slf4j:slf4j-log4j12": { + "locked": "1.8.0-alpha1", + "requested": "1.8.0-alpha1" } } } \ No newline at end of file diff --git a/es5-persistence/src/main/java/com/netflix/conductor/dao/es5/index/ElasticSearchDAOV5.java b/es5-persistence/src/main/java/com/netflix/conductor/dao/es5/index/ElasticSearchDAOV5.java index 2e8a184c54..816406aea2 100644 --- a/es5-persistence/src/main/java/com/netflix/conductor/dao/es5/index/ElasticSearchDAOV5.java +++ b/es5-persistence/src/main/java/com/netflix/conductor/dao/es5/index/ElasticSearchDAOV5.java @@ -1,20 +1,15 @@ /** * Copyright 2016 Netflix, Inc. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** + * http://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. */ package com.netflix.conductor.dao.es5.index; @@ -64,23 +59,24 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; +import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortOrder; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.inject.Inject; -import javax.inject.Singleton; +import java.io.IOException; import java.io.InputStream; import java.text.SimpleDateFormat; import java.time.LocalDate; -import java.util.ArrayList; +import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.TimeZone; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -90,6 +86,9 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.StreamSupport; +import javax.inject.Inject; +import javax.inject.Singleton; /** * @author Viren @@ -101,34 +100,22 @@ public class ElasticSearchDAOV5 implements IndexDAO { private static Logger logger = LoggerFactory.getLogger(ElasticSearchDAOV5.class); private static final String WORKFLOW_DOC_TYPE = "workflow"; - private static final String TASK_DOC_TYPE = "task"; - private static final String LOG_DOC_TYPE = "task_log"; - private static final String EVENT_DOC_TYPE = "event"; - private static final String MSG_DOC_TYPE = "message"; private static final String className = ElasticSearchDAOV5.class.getSimpleName(); + private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat("yyyyMMww"); + private static final TimeZone GMT = TimeZone.getTimeZone("GMT"); private static final int RETRY_COUNT = 3; - private String indexName; - + private final String indexName; private String logIndexName; - - private String logIndexPrefix; - - private ObjectMapper objectMapper; - - private Client elasticSearchClient; - - - private static final TimeZone GMT = TimeZone.getTimeZone("GMT"); - - private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat("yyyyMMww"); - + private final String logIndexPrefix; + private final ObjectMapper objectMapper; + private final Client elasticSearchClient; private final ExecutorService executorService; static { @@ -136,7 +123,8 @@ public class ElasticSearchDAOV5 implements IndexDAO { } @Inject - public ElasticSearchDAOV5(Client elasticSearchClient, ElasticSearchConfiguration config, ObjectMapper objectMapper) { + public ElasticSearchDAOV5(Client elasticSearchClient, ElasticSearchConfiguration config, + ObjectMapper objectMapper) { this.objectMapper = objectMapper; this.elasticSearchClient = elasticSearchClient; this.indexName = config.getIndexName(); @@ -146,74 +134,108 @@ public ElasticSearchDAOV5(Client elasticSearchClient, ElasticSearchConfiguration int maximumPoolSize = 12; long keepAliveTime = 1L; this.executorService = new ThreadPoolExecutor(corePoolSize, - maximumPoolSize, - keepAliveTime, - TimeUnit.MINUTES, - new LinkedBlockingQueue<>()); + maximumPoolSize, + keepAliveTime, + TimeUnit.MINUTES, + new LinkedBlockingQueue<>()); } @Override public void setup() throws Exception { - elasticSearchClient.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().get(); + elasticSearchClient.admin() + .cluster() + .prepareHealth() + .setWaitForGreenStatus() + .execute() + .get(); try { - initIndex(); - updateIndexName(); - Executors.newScheduledThreadPool(1).scheduleAtFixedRate(() -> updateIndexName(), 0, 1, TimeUnit.HOURS); - + updateLogIndexName(); + Executors.newScheduledThreadPool(1) + .scheduleAtFixedRate(() -> updateLogIndexName(), 0, 1, TimeUnit.HOURS); } catch (Exception e) { logger.error(e.getMessage(), e); } //1. Create the required index - try { - elasticSearchClient.admin().indices().prepareGetIndex().addIndices(indexName).execute().actionGet(); - }catch(IndexNotFoundException infe) { - try { - elasticSearchClient.admin().indices().prepareCreate(indexName).execute().actionGet(); - }catch(ResourceAlreadyExistsException done) {} - } + addIndex(indexName); //2. Add Mappings for the workflow document type - GetMappingsResponse getMappingsResponse = elasticSearchClient.admin().indices().prepareGetMappings(indexName).addTypes(WORKFLOW_DOC_TYPE).execute().actionGet(); - if(getMappingsResponse.mappings().isEmpty()) { - logger.info("Adding the workflow type mappings"); - InputStream stream = ElasticSearchDAOV5.class.getResourceAsStream("/mappings_docType_workflow.json"); - byte[] bytes = IOUtils.toByteArray(stream); - String source = new String(bytes); + addMappingToIndex(indexName, WORKFLOW_DOC_TYPE, "/mappings_docType_workflow.json"); + + //3. Add Mappings for task document type + addMappingToIndex(indexName, TASK_DOC_TYPE, "/mappings_docType_task.json"); + } + + private void addIndex(String indexName) { + try { + elasticSearchClient.admin() + .indices() + .prepareGetIndex() + .addIndices(indexName) + .execute() + .actionGet(); + } catch (IndexNotFoundException infe) { try { - elasticSearchClient.admin().indices().preparePutMapping(indexName).setType(WORKFLOW_DOC_TYPE).setSource(source).execute().actionGet(); - }catch(Exception e) { - logger.error("Failed to init index mappings", e); + elasticSearchClient.admin() + .indices() + .prepareCreate(indexName) + .execute() + .actionGet(); + } catch (ResourceAlreadyExistsException done) { + // no-op } } + } + + private void addMappingToIndex(String indexName, String mappingType, String mappingFilename) + throws IOException { + GetMappingsResponse getMappingsResponse = elasticSearchClient.admin() + .indices() + .prepareGetMappings(indexName) + .addTypes(mappingType) + .execute() + .actionGet(); - //3. Add Mappings for task document type - getMappingsResponse = elasticSearchClient.admin().indices().prepareGetMappings(indexName).addTypes(TASK_DOC_TYPE).execute().actionGet(); if (getMappingsResponse.mappings().isEmpty()) { - logger.info("Adding the task type mappings"); - InputStream stream = ElasticSearchDAOV5.class.getResourceAsStream("/mappings_docType_task.json"); + logger.info("Adding the workflow type mappings"); + InputStream stream = ElasticSearchDAOV5.class.getResourceAsStream(mappingFilename); byte[] bytes = IOUtils.toByteArray(stream); String source = new String(bytes); try { - elasticSearchClient.admin().indices().preparePutMapping(indexName).setType(TASK_DOC_TYPE).setSource(source).execute().actionGet(); + elasticSearchClient.admin() + .indices() + .preparePutMapping(indexName) + .setType(mappingType) + .setSource(source) + .execute() + .actionGet(); } catch (Exception e) { - logger.error(e.getMessage(), e); + logger.error("Failed to init index mappings", e); } } } - private void updateIndexName() { + private void updateLogIndexName() { this.logIndexName = this.logIndexPrefix + "_" + SIMPLE_DATE_FORMAT.format(new Date()); try { - elasticSearchClient.admin().indices().prepareGetIndex().addIndices(logIndexName).execute().actionGet(); + elasticSearchClient.admin() + .indices() + .prepareGetIndex() + .addIndices(logIndexName) + .execute() + .actionGet(); } catch (IndexNotFoundException infe) { try { - elasticSearchClient.admin().indices().prepareCreate(logIndexName).execute().actionGet(); + elasticSearchClient.admin() + .indices() + .prepareCreate(logIndexName) + .execute() + .actionGet(); } catch (ResourceAlreadyExistsException ilee) { - + // no-op } catch (Exception e) { logger.error("Failed to update log index name: {}", logIndexName, e); } @@ -225,26 +247,35 @@ private void updateIndexName() { */ private void initIndex() throws Exception { - //0. Add the tasklog template - GetIndexTemplatesResponse result = elasticSearchClient.admin().indices().prepareGetTemplates("tasklog_template").execute().actionGet(); - if(result.getIndexTemplates().isEmpty()) { + // 0. Add the tasklog template + GetIndexTemplatesResponse result = elasticSearchClient.admin() + .indices() + .prepareGetTemplates("tasklog_template") + .execute() + .actionGet(); + + if (result.getIndexTemplates().isEmpty()) { logger.info("Creating the index template 'tasklog_template'"); - InputStream stream = ElasticSearchDAOV5.class.getResourceAsStream("/template_tasklog.json"); + InputStream stream = ElasticSearchDAOV5.class + .getResourceAsStream("/template_tasklog.json"); byte[] templateSource = IOUtils.toByteArray(stream); try { - elasticSearchClient.admin().indices().preparePutTemplate("tasklog_template").setSource(templateSource, XContentType.JSON).execute().actionGet(); - }catch(Exception e) { + elasticSearchClient.admin() + .indices() + .preparePutTemplate("tasklog_template") + .setSource(templateSource, XContentType.JSON) + .execute() + .actionGet(); + } catch (Exception e) { logger.error("Failed to init tasklog_template", e); } } - } @Override public void indexWorkflow(Workflow workflow) { try { - String id = workflow.getWorkflowId(); WorkflowSummary summary = new WorkflowSummary(workflow); byte[] doc = objectMapper.writeValueAsBytes(summary); @@ -254,8 +285,7 @@ public void indexWorkflow(Workflow workflow) { req.upsert(doc, XContentType.JSON); req.retryOnConflict(5); updateWithRetry(req, "Index workflow into doc_type workflow"); - - } catch (Throwable e) { + } catch (Exception e) { logger.error("Failed to index workflow: {}", workflow.getWorkflowId(), e); } } @@ -268,7 +298,6 @@ public CompletableFuture asyncIndexWorkflow(Workflow workflow) { @Override public void indexTask(Task task) { try { - String id = task.getTaskId(); TaskSummary summary = new TaskSummary(task); byte[] doc = objectMapper.writeValueAsBytes(summary); @@ -277,8 +306,7 @@ public void indexTask(Task task) { req.doc(doc, XContentType.JSON); req.upsert(doc, XContentType.JSON); updateWithRetry(req, "Index workflow into doc_type workflow"); - - } catch (Throwable e) { + } catch (Exception e) { logger.error("Failed to index task: {}", task.getTaskId(), e); } } @@ -288,12 +316,12 @@ public CompletableFuture asyncIndexTask(Task task) { return CompletableFuture.runAsync(() -> indexTask(task), executorService); } - @Override public void addTaskExecutionLogs(List taskExecLogs) { if (taskExecLogs.isEmpty()) { return; } + try { BulkRequestBuilder bulkRequestBuilder = elasticSearchClient.prepareBulk(); for (TaskExecLog log : taskExecLogs) { @@ -301,10 +329,18 @@ public void addTaskExecutionLogs(List taskExecLogs) { request.source(objectMapper.writeValueAsBytes(log), XContentType.JSON); bulkRequestBuilder.add(request); } - new RetryUtil().retryOnException(() -> bulkRequestBuilder.execute().actionGet(), null , - BulkResponse::hasFailures, RETRY_COUNT, "Indexing all execution logs into doc_type task", "addTaskExecutionLogs"); - } catch (Throwable e) { - List taskIds = taskExecLogs.stream().map(TaskExecLog::getTaskId).collect(Collectors.toList()); + new RetryUtil().retryOnException( + () -> bulkRequestBuilder.execute().actionGet(), + null, + BulkResponse::hasFailures, + RETRY_COUNT, + "Indexing all execution logs into doc_type task", + "addTaskExecutionLogs" + ); + } catch (Exception e) { + List taskIds = taskExecLogs.stream() + .map(TaskExecLog::getTaskId) + .collect(Collectors.toList()); logger.error("Failed to index task execution logs for tasks: ", taskIds, e); } } @@ -324,22 +360,28 @@ public List getTaskExecutionLogs(String taskId) { QueryStringQueryBuilder stringQuery = QueryBuilders.queryStringQuery("*"); BoolQueryBuilder fq = QueryBuilders.boolQuery().must(stringQuery).must(filterQuery); + FieldSortBuilder sortBuilder = SortBuilders.fieldSort("createdTime") + .order(SortOrder.ASC); final SearchRequestBuilder srb = elasticSearchClient.prepareSearch(logIndexPrefix + "*") - .setQuery(fq).setTypes(LOG_DOC_TYPE) - .addSort(SortBuilders.fieldSort("createdTime") - .order(SortOrder.ASC)); - SearchResponse response = srb.execute().actionGet(); - SearchHit[] hits = response.getHits().getHits(); - List logs = new ArrayList<>(hits.length); - for(SearchHit hit : hits) { - String source = hit.getSourceAsString(); - TaskExecLog tel = objectMapper.readValue(source, TaskExecLog.class); - logs.add(tel); - } + .setQuery(fq) + .setTypes(LOG_DOC_TYPE) + .addSort(sortBuilder); - return logs; + SearchResponse response = srb.execute().actionGet(); - }catch(Exception e) { + return Arrays.stream(response.getHits().getHits()) + .map(hit -> { + String source = hit.getSourceAsString(); + try { + return objectMapper.readValue(source, TaskExecLog.class); + } catch (IOException e) { + logger.error("exception deserializing taskExecLog: {}", source); + } + return null; + }) + .filter(taskExecLog -> Objects.nonNull(taskExecLog)) + .collect(Collectors.toList()); + } catch (Exception e) { logger.error("Failed to get task execution logs for task: {}", taskId, e); } @@ -356,9 +398,14 @@ public void addMessage(String queue, Message message) { IndexRequest request = new IndexRequest(logIndexName, MSG_DOC_TYPE); request.source(doc); try { - new RetryUtil<>().retryOnException(() -> elasticSearchClient.index(request).actionGet(), null, - null, RETRY_COUNT, "Indexing document in for docType: message", "addMessage"); - } catch (Throwable e) { + new RetryUtil<>().retryOnException( + () -> elasticSearchClient.index(request).actionGet(), + null, + null, + RETRY_COUNT, + "Indexing document in for docType: message", "addMessage" + ); + } catch (Exception e) { logger.error("Failed to index message: {}", message.getId(), e); } } @@ -367,13 +414,15 @@ public void addMessage(String queue, Message message) { public void addEventExecution(EventExecution eventExecution) { try { byte[] doc = objectMapper.writeValueAsBytes(eventExecution); - String id = eventExecution.getName() + "." + eventExecution.getEvent() + "." + eventExecution.getMessageId() + "." + eventExecution.getId(); + String id = + eventExecution.getName() + "." + eventExecution.getEvent() + "." + eventExecution + .getMessageId() + "." + eventExecution.getId(); UpdateRequest req = new UpdateRequest(logIndexName, EVENT_DOC_TYPE, id); req.doc(doc, XContentType.JSON); req.upsert(doc, XContentType.JSON); req.retryOnConflict(5); updateWithRetry(req, "Update Event execution for doc_type event"); - } catch (Throwable e) { + } catch (Exception e) { logger.error("Failed to index event execution: {}", eventExecution.getId(), e); } } @@ -385,29 +434,36 @@ public CompletableFuture asyncAddEventExecution(EventExecution eventExecut private void updateWithRetry(UpdateRequest request, String operationDescription) { try { - new RetryUtil().retryOnException(() -> elasticSearchClient.update(request).actionGet(), null, - null, RETRY_COUNT, operationDescription, "updateWithRetry"); + new RetryUtil().retryOnException( + () -> elasticSearchClient.update(request).actionGet(), + null, + null, + RETRY_COUNT, + operationDescription, + "updateWithRetry" + ); } catch (Exception e) { Monitors.error(className, "index"); - logger.error("Failed to index {} for request type: {}", request.index(), request.type(), e); + logger.error("Failed to index {} for request type: {}", request.index(), request.type(), + e); } } @Override - public SearchResult searchWorkflows(String query, String freeText, int start, int count, List sort) { + public SearchResult searchWorkflows(String query, String freeText, int start, int count, + List sort) { try { - - return search(query, start, count, sort, freeText, WORKFLOW_DOC_TYPE); - + return search(indexName, query, start, count, sort, freeText, WORKFLOW_DOC_TYPE); } catch (ParserException e) { throw new ApplicationException(Code.BACKEND_ERROR, e.getMessage(), e); } } @Override - public SearchResult searchTasks(String query, String freeText, int start, int count, List sort) { + public SearchResult searchTasks(String query, String freeText, int start, int count, + List sort) { try { - return search(query, start, count, sort, freeText, TASK_DOC_TYPE); + return search(indexName, query, start, count, sort, freeText, TASK_DOC_TYPE); } catch (ParserException e) { throw new ApplicationException(Code.BACKEND_ERROR, e.getMessage(), e); } @@ -421,7 +477,7 @@ public void removeWorkflow(String workflowId) { if (response.getResult() == DocWriteResponse.Result.DELETED) { logger.error("Index removal failed - document not found by id: {}", workflowId); } - } catch (Throwable e) { + } catch (Exception e) { logger.error("Failed to remove workflow {} from index", workflowId, e); Monitors.error(className, "remove"); } @@ -435,43 +491,56 @@ public CompletableFuture asyncRemoveWorkflow(String workflowId) { @Override public void updateWorkflow(String workflowInstanceId, String[] keys, Object[] values) { if (keys.length != values.length) { - throw new ApplicationException(Code.INVALID_INPUT, "Number of keys and values do not match"); + throw new ApplicationException(Code.INVALID_INPUT, + "Number of keys and values do not match"); } UpdateRequest request = new UpdateRequest(indexName, WORKFLOW_DOC_TYPE, workflowInstanceId); - Map source = IntStream.range(0, keys.length).boxed() - .collect(Collectors.toMap(i -> keys[i], i -> values[i])); + Map source = IntStream.range(0, keys.length) + .boxed() + .collect(Collectors.toMap(i -> keys[i], i -> values[i])); request.doc(source); logger.debug("Updating workflow {} with {}", workflowInstanceId, source); - new RetryUtil<>().retryOnException(() -> elasticSearchClient.update(request), null, null, RETRY_COUNT, - "Updating index for doc_type workflow", "updateWorkflow"); + new RetryUtil<>().retryOnException( + () -> elasticSearchClient.update(request), + null, + null, + RETRY_COUNT, + "Updating index for doc_type workflow", + "updateWorkflow" + ); } @Override - public CompletableFuture asyncUpdateWorkflow(String workflowInstanceId, String[] keys, Object[] values) { + public CompletableFuture asyncUpdateWorkflow(String workflowInstanceId, String[] keys, + Object[] values) { return CompletableFuture.runAsync(() -> updateWorkflow(workflowInstanceId, keys, values), executorService); } @Override public String get(String workflowInstanceId, String fieldToGet) { GetRequest request = new GetRequest(indexName, WORKFLOW_DOC_TYPE, workflowInstanceId) - .fetchSourceContext(new FetchSourceContext(true, new String[]{fieldToGet}, Strings.EMPTY_ARRAY)); + .fetchSourceContext( + new FetchSourceContext(true, new String[]{fieldToGet}, Strings.EMPTY_ARRAY)); GetResponse response = elasticSearchClient.get(request).actionGet(); - if (response.isExists()){ + if (response.isExists()) { Map sourceAsMap = response.getSourceAsMap(); - if (sourceAsMap.containsKey(fieldToGet)){ + if (sourceAsMap.containsKey(fieldToGet)) { return sourceAsMap.get(fieldToGet).toString(); } } - logger.debug("Unable to find Workflow: {} in ElasticSearch index: {}.", workflowInstanceId, indexName); + logger.debug("Unable to find Workflow: {} in ElasticSearch index: {}.", workflowInstanceId, + indexName); return null; } - private SearchResult search(String structuredQuery, int start, int size, List sortOptions, String freeTextQuery, String docType) throws ParserException { + private SearchResult search(String indexName, String structuredQuery, int start, + int size, + List sortOptions, String freeTextQuery, String docType) throws ParserException { QueryBuilder queryBuilder = QueryBuilders.matchAllQuery(); - if(StringUtils.isNotEmpty(structuredQuery)) { + if (StringUtils.isNotEmpty(structuredQuery)) { Expression expression = Expression.fromString(structuredQuery); queryBuilder = expression.getFilterBuilder(); } @@ -479,72 +548,82 @@ private SearchResult search(String structuredQuery, int start, int size, BoolQueryBuilder filterQuery = QueryBuilders.boolQuery().must(queryBuilder); QueryStringQueryBuilder stringQuery = QueryBuilders.queryStringQuery(freeTextQuery); BoolQueryBuilder fq = QueryBuilders.boolQuery().must(stringQuery).must(filterQuery); - final SearchRequestBuilder srb = elasticSearchClient.prepareSearch(indexName).setQuery(fq).setTypes(docType).storedFields("_id").setFrom(start).setSize(size); - if(sortOptions != null){ - sortOptions.forEach(sortOption -> { - SortOrder order = SortOrder.ASC; - String field = sortOption; - int indx = sortOption.indexOf(':'); - if(indx > 0){ //Can't be 0, need the field name at-least - field = sortOption.substring(0, indx); - order = SortOrder.valueOf(sortOption.substring(indx+1)); - } - srb.addSort(field, order); - }); + final SearchRequestBuilder srb = elasticSearchClient.prepareSearch(indexName) + .setQuery(fq) + .setTypes(docType) + .storedFields("_id") + .setFrom(start) + .setSize(size); + + if (sortOptions != null) { + sortOptions.forEach(sortOption -> addSortOptionToSearchRequest(srb, sortOption)); } - List result = new LinkedList(); + SearchResponse response = srb.get(); - response.getHits().forEach(hit -> { - result.add(hit.getId()); - }); + + LinkedList result = StreamSupport.stream(response.getHits().spliterator(), false) + .map(SearchHit::getId) + .collect(Collectors.toCollection(LinkedList::new)); long count = response.getHits().getTotalHits(); + return new SearchResult(count, result); } + private void addSortOptionToSearchRequest(SearchRequestBuilder searchRequestBuilder, + String sortOption) { + SortOrder order = SortOrder.ASC; + String field = sortOption; + int indx = sortOption.indexOf(':'); + if (indx > 0) { // Can't be 0, need the field name at-least + field = sortOption.substring(0, indx); + order = SortOrder.valueOf(sortOption.substring(indx + 1)); + } + searchRequestBuilder.addSort(field, order); + } + @Override public List searchArchivableWorkflows(String indexName, long archiveTtlDays) { QueryBuilder q = QueryBuilders.boolQuery() - .must(QueryBuilders.rangeQuery("endTime").lt(LocalDate.now().minusDays(archiveTtlDays).toString())) - .should(QueryBuilders.termQuery("status", "COMPLETED")) - .should(QueryBuilders.termQuery("status", "FAILED")) - .mustNot(QueryBuilders.existsQuery("archived")) - .minimumShouldMatch(1); + .must(QueryBuilders.rangeQuery("endTime") + .lt(LocalDate.now().minusDays(archiveTtlDays).toString())) + .should(QueryBuilders.termQuery("status", "COMPLETED")) + .should(QueryBuilders.termQuery("status", "FAILED")) + .mustNot(QueryBuilders.existsQuery("archived")) + .minimumShouldMatch(1); SearchRequestBuilder s = elasticSearchClient.prepareSearch(indexName) - .setTypes("workflow") - .setQuery(q) - .setSize(1000); + .setTypes("workflow") + .setQuery(q) + .setSize(1000); + SearchResponse response = s.execute().actionGet(); + SearchHits hits = response.getHits(); - List ids = new LinkedList<>(); - for (SearchHit hit : hits.getHits()) { - ids.add(hit.getId()); - } - return ids; + return Arrays.stream(hits.getHits()) + .map(hit -> hit.getId()) + .collect(Collectors.toCollection(LinkedList::new)); } - public List searchRecentRunningWorkflows(int lastModifiedHoursAgoFrom, int lastModifiedHoursAgoTo) { + @Override + public List searchRecentRunningWorkflows(int lastModifiedHoursAgoFrom, + int lastModifiedHoursAgoTo) { DateTime dateTime = new DateTime(); QueryBuilder q = QueryBuilders.boolQuery() - .must(QueryBuilders.rangeQuery("updateTime") - .gt(dateTime.minusHours(lastModifiedHoursAgoFrom))) - .must(QueryBuilders.rangeQuery("updateTime") - .lt(dateTime.minusHours(lastModifiedHoursAgoTo))) - .must(QueryBuilders.termQuery("status", "RUNNING")); + .must(QueryBuilders.rangeQuery("updateTime") + .gt(dateTime.minusHours(lastModifiedHoursAgoFrom))) + .must(QueryBuilders.rangeQuery("updateTime") + .lt(dateTime.minusHours(lastModifiedHoursAgoTo))) + .must(QueryBuilders.termQuery("status", "RUNNING")); SearchRequestBuilder s = elasticSearchClient.prepareSearch(indexName) - .setTypes("workflow") - .setQuery(q) - .setSize(5000) - .addSort("updateTime",SortOrder.ASC); + .setTypes("workflow") + .setQuery(q) + .setSize(5000) + .addSort("updateTime", SortOrder.ASC); SearchResponse response = s.execute().actionGet(); - SearchHits hits = response.getHits(); - List ids = new LinkedList<>(); - for (SearchHit hit : hits.getHits()) { - ids.add(hit.getId()); - } - return ids; + return StreamSupport.stream(response.getHits().spliterator(), false) + .map(hit -> hit.getId()) + .collect(Collectors.toCollection(LinkedList::new)); } - -} \ No newline at end of file +} diff --git a/es5-persistence/src/main/java/com/netflix/conductor/dao/es5/index/ElasticSearchRestDAOV5.java b/es5-persistence/src/main/java/com/netflix/conductor/dao/es5/index/ElasticSearchRestDAOV5.java new file mode 100644 index 0000000000..ce6b2a3df8 --- /dev/null +++ b/es5-persistence/src/main/java/com/netflix/conductor/dao/es5/index/ElasticSearchRestDAOV5.java @@ -0,0 +1,675 @@ +package com.netflix.conductor.dao.es5.index; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.netflix.conductor.annotations.Trace; +import com.netflix.conductor.common.metadata.events.EventExecution; +import com.netflix.conductor.common.metadata.tasks.Task; +import com.netflix.conductor.common.metadata.tasks.TaskExecLog; +import com.netflix.conductor.common.run.SearchResult; +import com.netflix.conductor.common.run.TaskSummary; +import com.netflix.conductor.common.run.Workflow; +import com.netflix.conductor.common.run.WorkflowSummary; +import com.netflix.conductor.common.utils.RetryUtil; +import com.netflix.conductor.core.events.queue.Message; +import com.netflix.conductor.core.execution.ApplicationException; +import com.netflix.conductor.dao.IndexDAO; +import com.netflix.conductor.dao.es5.index.query.parser.Expression; +import com.netflix.conductor.elasticsearch.query.parser.ParserException; +import com.netflix.conductor.elasticsearch.ElasticSearchConfiguration; +import com.netflix.conductor.metrics.Monitors; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpEntity; +import org.apache.http.HttpStatus; +import org.apache.http.entity.ContentType; +import org.apache.http.nio.entity.NByteArrayEntity; +import org.apache.http.util.EntityUtils; +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.action.update.UpdateResponse; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.query.QueryStringQueryBuilder; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.FieldSortBuilder; +import org.elasticsearch.search.sort.SortOrder; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Inject; +import javax.inject.Singleton; +import java.io.IOException; +import java.io.InputStream; +import java.text.SimpleDateFormat; +import java.time.LocalDate; +import java.util.*; +import java.util.concurrent.*; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + + +@Trace +@Singleton +public class ElasticSearchRestDAOV5 implements IndexDAO { + + private static Logger logger = LoggerFactory.getLogger(ElasticSearchRestDAOV5.class); + + private static final int RETRY_COUNT = 3; + + private static final String WORKFLOW_DOC_TYPE = "workflow"; + private static final String TASK_DOC_TYPE = "task"; + private static final String LOG_DOC_TYPE = "task_log"; + private static final String EVENT_DOC_TYPE = "event"; + private static final String MSG_DOC_TYPE = "message"; + + private static final TimeZone GMT = TimeZone.getTimeZone("GMT"); + private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat("yyyyMMww"); + + private @interface HttpMethod { + String GET = "GET"; + String POST = "POST"; + String PUT = "PUT"; + String HEAD = "HEAD"; + } + + private static final String className = ElasticSearchRestDAOV5.class.getSimpleName(); + + private final String indexName; + private final String logIndexPrefix; + private String logIndexName; + private final ObjectMapper objectMapper; + private final RestHighLevelClient elasticSearchClient; + private final RestClient elasticSearchAdminClient; + private final ExecutorService executorService; + + static { + SIMPLE_DATE_FORMAT.setTimeZone(GMT); + } + + @Inject + public ElasticSearchRestDAOV5(RestClient lowLevelRestClient, ElasticSearchConfiguration config, ObjectMapper objectMapper) { + + this.objectMapper = objectMapper; + this.elasticSearchAdminClient = lowLevelRestClient; + this.elasticSearchClient = new RestHighLevelClient(lowLevelRestClient); + this.indexName = config.getIndexName(); + this.logIndexPrefix = config.getTasklogIndexName(); + + // Set up a workerpool for performing async operations. + int corePoolSize = 6; + int maximumPoolSize = 12; + long keepAliveTime = 1L; + this.executorService = new ThreadPoolExecutor(corePoolSize, + maximumPoolSize, + keepAliveTime, + TimeUnit.MINUTES, + new LinkedBlockingQueue<>()); + + } + + @Override + public void setup() throws Exception { + waitForGreenCluster(); + + try { + initIndex(); + updateIndexName(); + Executors.newScheduledThreadPool(1).scheduleAtFixedRate(this::updateIndexName, 0, 1, TimeUnit.HOURS); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + + //1. Create the required index + try { + addIndex(indexName); + } catch (IOException e) { + logger.error("Failed to initialize index '{}'", indexName, e); + } + + //2. Add mappings for the workflow document type + try { + addMappingToIndex(indexName, WORKFLOW_DOC_TYPE, "/mappings_docType_workflow.json"); + } catch (IOException e) { + logger.error("Failed to add {} mapping", WORKFLOW_DOC_TYPE); + } + + //3. Add mappings for task document type + try { + addMappingToIndex(indexName, TASK_DOC_TYPE, "/mappings_docType_task.json"); + } catch (IOException e) { + logger.error("Failed to add {} mapping", TASK_DOC_TYPE); + } + } + + /** + * Waits for the ES cluster to become green. + * @throws Exception If there is an issue connecting with the ES cluster. + */ + private void waitForGreenCluster() throws Exception { + Map params = new HashMap<>(); + params.put("wait_for_status", "green"); + params.put("timeout", "30s"); + + elasticSearchAdminClient.performRequest("GET", "/_cluster/health", params); + } + + /** + * Roll the tasklog index daily. + */ + private void updateIndexName() { + this.logIndexName = this.logIndexPrefix + "_" + SIMPLE_DATE_FORMAT.format(new Date()); + + try { + addIndex(logIndexName); + } catch (IOException e) { + logger.error("Failed to update log index name: {}", logIndexName, e); + } + } + + /** + * Initializes the index with the required templates and mappings. + */ + private void initIndex() throws Exception { + + //0. Add the tasklog template + if (doesResourceNotExist("/_template/tasklog_template")) { + logger.info("Creating the index template 'tasklog_template'"); + InputStream stream = ElasticSearchDAOV5.class.getResourceAsStream("/template_tasklog.json"); + byte[] templateSource = IOUtils.toByteArray(stream); + + HttpEntity entity = new NByteArrayEntity(templateSource, ContentType.APPLICATION_JSON); + try { + elasticSearchAdminClient.performRequest(HttpMethod.PUT, "/_template/tasklog_template", Collections.emptyMap(), entity); + } catch (IOException e) { + logger.error("Failed to initialize tasklog_template", e); + } + } + } + + /** + * Adds an index to elasticsearch if it does not exist. + * + * @param index The name of the index to create. + * @throws IOException If an error occurred during requests to ES. + */ + private void addIndex(final String index) throws IOException { + + logger.info("Adding index '{}'...", index); + + String resourcePath = "/" + index; + + if (doesResourceNotExist(resourcePath)) { + + try { + elasticSearchAdminClient.performRequest(HttpMethod.PUT, resourcePath); + + logger.info("Added '{}' index", index); + } catch (ResponseException e) { + + boolean errorCreatingIndex = true; + + Response errorResponse = e.getResponse(); + if (errorResponse.getStatusLine().getStatusCode() == HttpStatus.SC_BAD_REQUEST) { + JsonNode root = objectMapper.readTree(EntityUtils.toString(errorResponse.getEntity())); + String errorCode = root.get("error").get("type").asText(); + if ("index_already_exists_exception".equals(errorCode)) { + errorCreatingIndex = false; + } + } + + if (errorCreatingIndex) { + throw e; + } + } + } else { + logger.info("Index '{}' already exists", index); + } + } + + /** + * Adds a mapping type to an index if it does not exist. + * + * @param index The name of the index. + * @param mappingType The name of the mapping type. + * @param mappingFilename The name of the mapping file to use to add the mapping if it does not exist. + * @throws IOException If an error occurred during requests to ES. + */ + private void addMappingToIndex(final String index, final String mappingType, final String mappingFilename) throws IOException { + + logger.info("Adding '{}' mapping to index '{}'...", mappingType, index); + + String resourcePath = "/" + index + "/_mapping/" + mappingType; + + if (doesResourceNotExist(resourcePath)) { + InputStream stream = ElasticSearchDAOV5.class.getResourceAsStream(mappingFilename); + byte[] mappingSource = IOUtils.toByteArray(stream); + + HttpEntity entity = new NByteArrayEntity(mappingSource, ContentType.APPLICATION_JSON); + elasticSearchAdminClient.performRequest(HttpMethod.PUT, resourcePath, Collections.emptyMap(), entity); + logger.info("Added '{}' mapping", mappingType); + } else { + logger.info("Mapping '{}' already exists", mappingType); + } + } + + /** + * Determines whether a resource exists in ES. This will call a GET method to a particular path and + * return true if status 200; false otherwise. + * + * @param resourcePath The path of the resource to get. + * @return True if it exists; false otherwise. + * @throws IOException If an error occurred during requests to ES. + */ + public boolean doesResourceExist(final String resourcePath) throws IOException { + Response response = elasticSearchAdminClient.performRequest(HttpMethod.HEAD, resourcePath); + return response.getStatusLine().getStatusCode() == HttpStatus.SC_OK; + } + + /** + * The inverse of doesResourceExist. + * + * @param resourcePath The path of the resource to check. + * @return True if it does not exist; false otherwise. + * @throws IOException If an error occurred during requests to ES. + */ + public boolean doesResourceNotExist(final String resourcePath) throws IOException { + return !doesResourceExist(resourcePath); + } + + @Override + public void indexWorkflow(Workflow workflow) { + + String workflowId = workflow.getWorkflowId(); + WorkflowSummary summary = new WorkflowSummary(workflow); + + indexObject(indexName, WORKFLOW_DOC_TYPE, workflowId, summary); + } + + @Override + public CompletableFuture asyncIndexWorkflow(Workflow workflow) { + return CompletableFuture.runAsync(() -> indexWorkflow(workflow), executorService); + } + + @Override + public void indexTask(Task task) { + + String taskId = task.getTaskId(); + TaskSummary summary = new TaskSummary(task); + + indexObject(indexName, TASK_DOC_TYPE, taskId, summary); + } + + @Override + public CompletableFuture asyncIndexTask(Task task) { + return CompletableFuture.runAsync(() -> indexTask(task), executorService); + } + + @Override + public void addTaskExecutionLogs(List taskExecLogs) { + if (taskExecLogs.isEmpty()) { + return; + } + + BulkRequest bulkRequest = new BulkRequest(); + + for (TaskExecLog log : taskExecLogs) { + + byte[] docBytes; + try { + docBytes = objectMapper.writeValueAsBytes(log); + } catch (JsonProcessingException e) { + logger.error("Failed to convert task log to JSON for task {}", log.getTaskId()); + continue; + } + + IndexRequest request = new IndexRequest(logIndexName, LOG_DOC_TYPE); + request.source(docBytes, XContentType.JSON); + bulkRequest.add(request); + } + + try { + new RetryUtil().retryOnException(() -> { + try { + return elasticSearchClient.bulk(bulkRequest); + } catch (IOException e) { + throw new RuntimeException(e); + } + }, null, BulkResponse::hasFailures, RETRY_COUNT, "Indexing all execution logs into doc_type task", "addTaskExecutionLogs"); + } catch (Exception e) { + List taskIds = taskExecLogs.stream().map(TaskExecLog::getTaskId).collect(Collectors.toList()); + logger.error("Failed to index task execution logs for tasks: {}", taskIds, e); + } + } + + @Override + public CompletableFuture asyncAddTaskExecutionLogs(List logs) { + return CompletableFuture.runAsync(() -> addTaskExecutionLogs(logs), executorService); + } + + @Override + public List getTaskExecutionLogs(String taskId) { + + try { + + // Build Query + Expression expression = Expression.fromString("taskId='" + taskId + "'"); + QueryBuilder queryBuilder = expression.getFilterBuilder(); + + BoolQueryBuilder filterQuery = QueryBuilders.boolQuery().must(queryBuilder); + QueryStringQueryBuilder stringQuery = QueryBuilders.queryStringQuery("*"); + BoolQueryBuilder fq = QueryBuilders.boolQuery().must(stringQuery).must(filterQuery); + + // Create the searchObjectIdsViaExpression source + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(fq); + searchSourceBuilder.sort(new FieldSortBuilder("createdTime").order(SortOrder.ASC)); + + // Generate the actual request to send to ES. + SearchRequest searchRequest = new SearchRequest(logIndexPrefix + "*"); + searchRequest.types(LOG_DOC_TYPE); + searchRequest.source(searchSourceBuilder); + + SearchResponse response = elasticSearchClient.search(searchRequest); + + SearchHit[] hits = response.getHits().getHits(); + List logs = new ArrayList<>(hits.length); + for(SearchHit hit : hits) { + String source = hit.getSourceAsString(); + TaskExecLog tel = objectMapper.readValue(source, TaskExecLog.class); + logs.add(tel); + } + + return logs; + + }catch(Exception e) { + logger.error("Failed to get task execution logs for task: {}", taskId, e); + } + + return null; + } + + @Override + public void addMessage(String queue, Message message) { + Map doc = new HashMap<>(); + doc.put("messageId", message.getId()); + doc.put("payload", message.getPayload()); + doc.put("queue", queue); + doc.put("created", System.currentTimeMillis()); + + indexObject(logIndexName, MSG_DOC_TYPE, doc); + } + + @Override + public void addEventExecution(EventExecution eventExecution) { + String id = eventExecution.getName() + "." + eventExecution.getEvent() + "." + eventExecution.getMessageId() + "." + eventExecution.getId(); + + indexObject(logIndexName, EVENT_DOC_TYPE, id, eventExecution); + } + + @Override + public CompletableFuture asyncAddEventExecution(EventExecution eventExecution) { + return CompletableFuture.runAsync(() -> addEventExecution(eventExecution), executorService); + } + + @Override + public SearchResult searchWorkflows(String query, String freeText, int start, int count, List sort) { + try { + return searchObjectIdsViaExpression(query, start, count, sort, freeText, WORKFLOW_DOC_TYPE); + } catch (Exception e) { + throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, e.getMessage(), e); + } + } + + @Override + public SearchResult searchTasks(String query, String freeText, int start, int count, List sort) { + try { + return searchObjectIdsViaExpression(query, start, count, sort, freeText, TASK_DOC_TYPE); + } catch (Exception e) { + throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, e.getMessage(), e); + } + } + + @Override + public void removeWorkflow(String workflowId) { + + DeleteRequest request = new DeleteRequest(indexName, WORKFLOW_DOC_TYPE, workflowId); + + try { + DeleteResponse response = elasticSearchClient.delete(request); + + if (response.getResult() == DocWriteResponse.Result.NOT_FOUND) { + logger.error("Index removal failed - document not found by id: {}", workflowId); + } + + } catch (IOException e) { + logger.error("Failed to remove workflow {} from index", workflowId, e); + Monitors.error(className, "remove"); + } + } + + @Override + public CompletableFuture asyncRemoveWorkflow(String workflowId) { + return CompletableFuture.runAsync(() -> removeWorkflow(workflowId), executorService); + } + + @Override + public void updateWorkflow(String workflowInstanceId, String[] keys, Object[] values) { + + if (keys.length != values.length) { + throw new ApplicationException(ApplicationException.Code.INVALID_INPUT, "Number of keys and values do not match"); + } + + UpdateRequest request = new UpdateRequest(indexName, WORKFLOW_DOC_TYPE, workflowInstanceId); + Map source = IntStream.range(0, keys.length).boxed() + .collect(Collectors.toMap(i -> keys[i], i -> values[i])); + request.doc(source); + + logger.debug("Updating workflow {} with {}", workflowInstanceId, source); + + new RetryUtil().retryOnException(() -> { + try { + return elasticSearchClient.update(request); + } catch (IOException e) { + throw new RuntimeException(e); + } + }, null, null, RETRY_COUNT, "Updating index for doc_type workflow", "updateWorkflow"); + } + + @Override + public CompletableFuture asyncUpdateWorkflow(String workflowInstanceId, String[] keys, Object[] values) { + return CompletableFuture.runAsync(() -> updateWorkflow(workflowInstanceId, keys, values), executorService); + } + + @Override + public String get(String workflowInstanceId, String fieldToGet) { + + GetRequest request = new GetRequest(indexName, WORKFLOW_DOC_TYPE, workflowInstanceId); + + GetResponse response; + try { + response = elasticSearchClient.get(request); + } catch (IOException e) { + logger.error("Unable to get Workflow: {} from ElasticSearch index: {}", workflowInstanceId, indexName, e); + return null; + } + + if (response.isExists()){ + Map sourceAsMap = response.getSourceAsMap(); + if (sourceAsMap.containsKey(fieldToGet)){ + return sourceAsMap.get(fieldToGet).toString(); + } + } + + logger.debug("Unable to find Workflow: {} in ElasticSearch index: {}.", workflowInstanceId, indexName); + return null; + } + + private SearchResult searchObjectIdsViaExpression(String structuredQuery, int start, int size, List sortOptions, String freeTextQuery, String docType) throws ParserException, IOException { + + // Build query + QueryBuilder queryBuilder = QueryBuilders.matchAllQuery(); + if(StringUtils.isNotEmpty(structuredQuery)) { + Expression expression = Expression.fromString(structuredQuery); + queryBuilder = expression.getFilterBuilder(); + } + + BoolQueryBuilder filterQuery = QueryBuilders.boolQuery().must(queryBuilder); + QueryStringQueryBuilder stringQuery = QueryBuilders.queryStringQuery(freeTextQuery); + BoolQueryBuilder fq = QueryBuilders.boolQuery().must(stringQuery).must(filterQuery); + + return searchObjectIds(indexName, fq, start, size, sortOptions, docType); + } + + private SearchResult searchObjectIds(String indexName, QueryBuilder queryBuilder, int start, int size, String docType) throws IOException { + return searchObjectIds(indexName, queryBuilder, start, size, null, docType); + } + + /** + * Tries to find object ids for a given query in an index. + * + * @param indexName The name of the index. + * @param queryBuilder The query to use for searching. + * @param start The start to use. + * @param size The total return size. + * @param sortOptions A list of string options to sort in the form VALUE:ORDER; where ORDER is optional and can be either ASC OR DESC. + * @param docType The document type to searchObjectIdsViaExpression for. + * + * @return The SearchResults which includes the count and IDs that were found. + * @throws IOException If we cannot communicate with ES. + */ + private SearchResult searchObjectIds(String indexName, QueryBuilder queryBuilder, int start, int size, List sortOptions, String docType) throws IOException { + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(queryBuilder); + searchSourceBuilder.from(start); + searchSourceBuilder.size(size); + + if (sortOptions != null && !sortOptions.isEmpty()) { + + for (String sortOption : sortOptions) { + SortOrder order = SortOrder.ASC; + String field = sortOption; + int index = sortOption.indexOf(":"); + if (index > 0) { + field = sortOption.substring(0, index); + order = SortOrder.valueOf(sortOption.substring(index + 1)); + } + searchSourceBuilder.sort(new FieldSortBuilder(field).order(order)); + } + } + + // Generate the actual request to send to ES. + SearchRequest searchRequest = new SearchRequest(indexName); + searchRequest.types(docType); + searchRequest.source(searchSourceBuilder); + + SearchResponse response = elasticSearchClient.search(searchRequest); + + List result = new LinkedList<>(); + response.getHits().forEach(hit -> result.add(hit.getId())); + long count = response.getHits().getTotalHits(); + return new SearchResult<>(count, result); + } + + @Override + public List searchArchivableWorkflows(String indexName, long archiveTtlDays) { + QueryBuilder q = QueryBuilders.boolQuery() + .must(QueryBuilders.rangeQuery("endTime").lt(LocalDate.now().minusDays(archiveTtlDays).toString())) + .should(QueryBuilders.termQuery("status", "COMPLETED")) + .should(QueryBuilders.termQuery("status", "FAILED")) + .mustNot(QueryBuilders.existsQuery("archived")) + .minimumShouldMatch(1); + + SearchResult workflowIds; + try { + workflowIds = searchObjectIds(indexName, q, 0, 1000, WORKFLOW_DOC_TYPE); + } catch (IOException e) { + logger.error("Unable to communicate with ES to find archivable workflows", e); + return Collections.emptyList(); + } + + return workflowIds.getResults(); + } + + public List searchRecentRunningWorkflows(int lastModifiedHoursAgoFrom, int lastModifiedHoursAgoTo) { + DateTime dateTime = new DateTime(); + QueryBuilder q = QueryBuilders.boolQuery() + .must(QueryBuilders.rangeQuery("updateTime") + .gt(dateTime.minusHours(lastModifiedHoursAgoFrom))) + .must(QueryBuilders.rangeQuery("updateTime") + .lt(dateTime.minusHours(lastModifiedHoursAgoTo))) + .must(QueryBuilders.termQuery("status", "RUNNING")); + + SearchResult workflowIds; + try { + workflowIds = searchObjectIds(indexName, q, 0, 5000, Collections.singletonList("updateTime:ASC"), WORKFLOW_DOC_TYPE); + } catch (IOException e) { + logger.error("Unable to communicate with ES to find recent running workflows", e); + return Collections.emptyList(); + } + + return workflowIds.getResults(); + } + + private void indexObject(final String index, final String docType, final Object doc) { + indexObject(index, docType, null, doc); + } + + private void indexObject(final String index, final String docType, final String docId, final Object doc) { + + byte[] docBytes; + try { + docBytes = objectMapper.writeValueAsBytes(doc); + } catch (JsonProcessingException e) { + logger.error("Failed to convert {} '{}' to byte string", docType, docId); + return; + } + + IndexRequest request = new IndexRequest(index, docType, docId); + request.source(docBytes, XContentType.JSON); + + indexWithRetry(request, "Indexing " + docType + ": " + docId); + } + + /** + * Performs an index operation with a retry. + * @param request The index request that we want to perform. + * @param operationDescription The type of operation that we are performing. + */ + private void indexWithRetry(final IndexRequest request, final String operationDescription) { + + try { + new RetryUtil().retryOnException(() -> { + try { + return elasticSearchClient.index(request); + } catch (IOException e) { + throw new RuntimeException(e); + } + }, null, null, RETRY_COUNT, operationDescription, "indexWithRetry"); + } catch (Exception e) { + Monitors.error(className, "index"); + logger.error("Failed to index {} for request type: {}", request.id(), request.type(), e); + } + } + +} \ No newline at end of file diff --git a/es5-persistence/src/main/java/com/netflix/conductor/elasticsearch/ElasticSearchModule.java b/es5-persistence/src/main/java/com/netflix/conductor/elasticsearch/ElasticSearchModule.java index b385442b66..5a0cd00817 100644 --- a/es5-persistence/src/main/java/com/netflix/conductor/elasticsearch/ElasticSearchModule.java +++ b/es5-persistence/src/main/java/com/netflix/conductor/elasticsearch/ElasticSearchModule.java @@ -3,12 +3,20 @@ import com.google.inject.AbstractModule; import com.google.inject.Singleton; +import com.netflix.conductor.elasticsearch.es5.ElasticSearchV5Module; import org.elasticsearch.client.Client; +import org.elasticsearch.client.RestClient; public class ElasticSearchModule extends AbstractModule { @Override protected void configure() { + + ElasticSearchConfiguration esConfiguration = new SystemPropertiesElasticSearchConfiguration(); + bind(ElasticSearchConfiguration.class).to(SystemPropertiesElasticSearchConfiguration.class); bind(Client.class).toProvider(ElasticSearchTransportClientProvider.class).in(Singleton.class); + bind(RestClient.class).toProvider(ElasticSearchRestClientProvider.class).in(Singleton.class); + + install(new ElasticSearchV5Module(esConfiguration)); } } diff --git a/es5-persistence/src/main/java/com/netflix/conductor/elasticsearch/ElasticSearchRestClientProvider.java b/es5-persistence/src/main/java/com/netflix/conductor/elasticsearch/ElasticSearchRestClientProvider.java index a21e0bc2a6..7bc2cc6da2 100644 --- a/es5-persistence/src/main/java/com/netflix/conductor/elasticsearch/ElasticSearchRestClientProvider.java +++ b/es5-persistence/src/main/java/com/netflix/conductor/elasticsearch/ElasticSearchRestClientProvider.java @@ -2,7 +2,6 @@ import org.apache.http.HttpHost; import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestHighLevelClient; import java.net.URI; import java.util.List; @@ -11,7 +10,7 @@ import javax.inject.Inject; import javax.inject.Provider; -public class ElasticSearchRestClientProvider implements Provider { +public class ElasticSearchRestClientProvider implements Provider { private final ElasticSearchConfiguration configuration; @Inject @@ -20,15 +19,15 @@ public ElasticSearchRestClientProvider(ElasticSearchConfiguration configuration) } @Override - public RestHighLevelClient get() { - RestClient lowLevelRestClient = RestClient.builder(convertToHttpHosts(configuration.getURIs())).build(); - return new RestHighLevelClient(lowLevelRestClient); + public RestClient get() { + return RestClient.builder(convertToHttpHosts(configuration.getURIs())).build(); } private HttpHost[] convertToHttpHosts(List hosts) { - List list = hosts.stream().map(host -> - new HttpHost(host.getHost(), host.getPort())) - .collect(Collectors.toList()); - return list.toArray(new HttpHost[0]); + List list = hosts.stream() + .map(host -> new HttpHost(host.getHost(), host.getPort(), host.getScheme())) + .collect(Collectors.toList()); + + return list.toArray(new HttpHost[list.size()]); } } diff --git a/es5-persistence/src/main/java/com/netflix/conductor/elasticsearch/es5/ElasticSearchV5Module.java b/es5-persistence/src/main/java/com/netflix/conductor/elasticsearch/es5/ElasticSearchV5Module.java index 3592fd1bca..f10e2bc287 100644 --- a/es5-persistence/src/main/java/com/netflix/conductor/elasticsearch/es5/ElasticSearchV5Module.java +++ b/es5-persistence/src/main/java/com/netflix/conductor/elasticsearch/es5/ElasticSearchV5Module.java @@ -19,20 +19,42 @@ import com.netflix.conductor.dao.IndexDAO; import com.netflix.conductor.dao.es5.index.ElasticSearchDAOV5; -import com.netflix.conductor.elasticsearch.ElasticSearchModule; +import com.netflix.conductor.dao.es5.index.ElasticSearchRestDAOV5; +import com.netflix.conductor.elasticsearch.ElasticSearchConfiguration; import com.netflix.conductor.elasticsearch.EmbeddedElasticSearchProvider; +import java.util.HashSet; +import java.util.Set; + /** * @author Viren - * Provider for the elasticsearch transport client + * Provider for the elasticsearch index DAO. */ public class ElasticSearchV5Module extends AbstractModule { + private boolean restTransport; + + public ElasticSearchV5Module(ElasticSearchConfiguration elasticSearchConfiguration) { + + Set REST_SCHEMAS = new HashSet<>(); + REST_SCHEMAS.add("http"); + REST_SCHEMAS.add("https"); + + String esTransport = elasticSearchConfiguration.getURIs().get(0).getScheme(); + + this.restTransport = REST_SCHEMAS.contains(esTransport); + } + @Override protected void configure() { - install(new ElasticSearchModule()); - bind(IndexDAO.class).to(ElasticSearchDAOV5.class); + + if (restTransport) { + bind(IndexDAO.class).to(ElasticSearchRestDAOV5.class); + } else { + bind(IndexDAO.class).to(ElasticSearchDAOV5.class); + } + bind(EmbeddedElasticSearchProvider.class).to(EmbeddedElasticSearchV5Provider.class); } } diff --git a/es5-persistence/src/test/java/com/netflix/conductor/dao/es5/index/TestElasticSearchDAOV5.java b/es5-persistence/src/test/java/com/netflix/conductor/dao/es5/index/TestElasticSearchDAOV5.java new file mode 100644 index 0000000000..85b43a196d --- /dev/null +++ b/es5-persistence/src/test/java/com/netflix/conductor/dao/es5/index/TestElasticSearchDAOV5.java @@ -0,0 +1,402 @@ +package com.netflix.conductor.dao.es5.index; + +import static org.awaitility.Awaitility.await; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.netflix.conductor.common.metadata.events.EventExecution; +import com.netflix.conductor.common.metadata.events.EventHandler.Action.Type; +import com.netflix.conductor.common.metadata.tasks.Task; +import com.netflix.conductor.common.metadata.tasks.Task.Status; +import com.netflix.conductor.common.metadata.tasks.TaskExecLog; +import com.netflix.conductor.common.run.SearchResult; +import com.netflix.conductor.common.run.Workflow; +import com.netflix.conductor.core.events.queue.Message; +import com.netflix.conductor.dao.es5.index.query.parser.Expression; +import com.netflix.conductor.elasticsearch.ElasticSearchConfiguration; +import com.netflix.conductor.elasticsearch.ElasticSearchTransportClientProvider; +import com.netflix.conductor.elasticsearch.EmbeddedElasticSearch; +import com.netflix.conductor.elasticsearch.SystemPropertiesElasticSearchConfiguration; +import com.netflix.conductor.elasticsearch.es5.EmbeddedElasticSearchV5; +import com.netflix.conductor.elasticsearch.query.parser.ParserException; +import java.text.SimpleDateFormat; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.query.QueryStringQueryBuilder; +import org.elasticsearch.search.SearchHit; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestElasticSearchDAOV5 { + + private static final Logger logger = LoggerFactory.getLogger(TestElasticSearchDAOV5.class); + + private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat("yyyyMMww"); + + private static final String MSG_DOC_TYPE = "message"; + private static final String EVENT_DOC_TYPE = "event"; + private static final String LOG_INDEX_PREFIX = "task_log"; + + private static ElasticSearchConfiguration configuration; + private static Client elasticSearchClient; + private static ElasticSearchDAOV5 indexDAO; + private static EmbeddedElasticSearch embeddedElasticSearch; + + private Workflow workflow; + + @BeforeClass + public static void startServer() throws Exception { + System.setProperty(ElasticSearchConfiguration.EMBEDDED_PORT_PROPERTY_NAME, "9203"); + System.setProperty(ElasticSearchConfiguration.ELASTIC_SEARCH_URL_PROPERTY_NAME, "localhost:9303"); + + configuration = new SystemPropertiesElasticSearchConfiguration(); + String host = configuration.getEmbeddedHost(); + int port = configuration.getEmbeddedPort(); + String clusterName = configuration.getEmbeddedClusterName(); + + embeddedElasticSearch = new EmbeddedElasticSearchV5(clusterName, host, port); + embeddedElasticSearch.start(); + + ElasticSearchTransportClientProvider transportClientProvider = + new ElasticSearchTransportClientProvider(configuration); + elasticSearchClient = transportClientProvider.get(); + + elasticSearchClient.admin() + .cluster() + .prepareHealth() + .setWaitForGreenStatus() + .execute() + .get(); + + ObjectMapper objectMapper = new ObjectMapper(); + indexDAO = new ElasticSearchDAOV5(elasticSearchClient, configuration, objectMapper); + } + + @AfterClass + public static void closeClient() throws Exception { + if (elasticSearchClient != null) { + elasticSearchClient.close(); + } + + embeddedElasticSearch.stop(); + } + + @Before + public void createTestWorkflow() throws Exception { + // define indices + indexDAO.setup(); + + // initialize workflow + workflow = new Workflow(); + workflow.getInput().put("requestId", "request id 001"); + workflow.getInput().put("hasAwards", true); + workflow.getInput().put("channelMapping", 5); + Map name = new HashMap<>(); + name.put("name", "The Who"); + name.put("year", 1970); + Map name2 = new HashMap<>(); + name2.put("name", "The Doors"); + name2.put("year", 1975); + + List names = new LinkedList<>(); + names.add(name); + names.add(name2); + + workflow.getOutput().put("name", name); + workflow.getOutput().put("names", names); + workflow.getOutput().put("awards", 200); + + Task task = new Task(); + task.setReferenceTaskName("task2"); + task.getOutputData().put("location", "http://location"); + task.setStatus(Task.Status.COMPLETED); + + Task task2 = new Task(); + task2.setReferenceTaskName("task3"); + task2.getOutputData().put("refId", "abcddef_1234_7890_aaffcc"); + task2.setStatus(Task.Status.SCHEDULED); + + workflow.getTasks().add(task); + workflow.getTasks().add(task2); + } + + @After + public void tearDown() { + deleteAllIndices(); + } + + private void deleteAllIndices() { + + ImmutableOpenMap indices = elasticSearchClient.admin().cluster() + .prepareState().get().getState() + .getMetaData().getIndices(); + + indices.forEach(cursor -> { + try { + elasticSearchClient.admin() + .indices() + .delete(new DeleteIndexRequest(cursor.value.getIndex().getName())) + .get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + } + + private boolean indexExists(final String index) { + IndicesExistsRequest request = new IndicesExistsRequest(index); + try { + return elasticSearchClient.admin().indices().exists(request).get().isExists(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + private boolean doesMappingExist(final String index, final String mappingName) { + GetMappingsRequest request = new GetMappingsRequest() + .indices(index); + try { + GetMappingsResponse response = elasticSearchClient.admin() + .indices() + .getMappings(request) + .get(); + + return response.getMappings() + .get(index) + .containsKey(mappingName); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + @Test + public void assertInitialSetup() throws Exception { + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMww"); + dateFormat.setTimeZone(TimeZone.getTimeZone("GMT")); + + String taskLogIndex = "task_log_" + dateFormat.format(new Date()); + + assertTrue("Index 'conductor' should exist", indexExists("conductor")); + assertTrue("Index '" + taskLogIndex + "' should exist", indexExists(taskLogIndex)); + + assertTrue("Mapping 'workflow' for index 'conductor' should exist", doesMappingExist("conductor", "workflow")); + assertTrue("Mapping 'task' for index 'conductor' should exist", doesMappingExist("conductor", "task")); + } + + @Test + public void testWorkflowCRUD() throws Exception { + String testWorkflowType = "testworkflow"; + String testId = "1"; + + workflow.setWorkflowId(testId); + workflow.setWorkflowType(testWorkflowType); + + // Create + String workflowType = indexDAO.get(testId, "workflowType"); + assertNull("Workflow should not exist", workflowType); + + // Get + indexDAO.indexWorkflow(workflow); + + workflowType = indexDAO.get(testId, "workflowType"); + assertEquals("Should have found our workflow type", testWorkflowType, workflowType); + + // Update + String newWorkflowType = "newworkflowtype"; + String[] keyChanges = {"workflowType"}; + String[] valueChanges = {newWorkflowType}; + + indexDAO.updateWorkflow(testId, keyChanges, valueChanges); + + await() + .atMost(3, TimeUnit.SECONDS) + .untilAsserted( + () -> { + String actualWorkflowType = indexDAO.get(testId, "workflowType"); + assertEquals("Should have updated our new workflow type", newWorkflowType, actualWorkflowType); + } + ); + + // Delete + indexDAO.removeWorkflow(testId); + + workflowType = indexDAO.get(testId, "workflowType"); + assertNull("We should no longer have our workflow in the system", workflowType); + } + + @Test + public void taskExecutionLogs() throws Exception { + TaskExecLog taskExecLog1 = new TaskExecLog(); + taskExecLog1.setTaskId("some-task-id"); + long createdTime1 = LocalDateTime.of(2018, 11, 01, 06, 33, 22) + .toEpochSecond(ZoneOffset.UTC); + taskExecLog1.setCreatedTime(createdTime1); + taskExecLog1.setLog("some-log"); + TaskExecLog taskExecLog2 = new TaskExecLog(); + taskExecLog2.setTaskId("some-task-id"); + long createdTime2 = LocalDateTime.of(2018, 11, 01, 06, 33, 22) + .toEpochSecond(ZoneOffset.UTC); + taskExecLog2.setCreatedTime(createdTime2); + taskExecLog2.setLog("some-log"); + List logsToAdd = Arrays.asList(taskExecLog1, taskExecLog2); + indexDAO.addTaskExecutionLogs(logsToAdd); + + await() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + List taskExecutionLogs = indexDAO.getTaskExecutionLogs("some-task-id"); + assertEquals(2, taskExecutionLogs.size()); + }); + } + + @Test + public void indexTask() throws Exception { + String correlationId = "some-correlation-id"; + + Task task = new Task(); + task.setTaskId("some-task-id"); + task.setWorkflowInstanceId("some-workflow-instance-id"); + task.setTaskType("some-task-type"); + task.setStatus(Status.FAILED); + task.setInputData(new HashMap() {{ put("input_key", "input_value"); }}); + task.setCorrelationId(correlationId); + task.setTaskDefName("some-task-def-name"); + task.setReasonForIncompletion("some-failure-reason"); + + indexDAO.indexTask(task); + + await() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + SearchResult result = indexDAO + .searchTasks("correlationId='" + correlationId + "'", "*", 0, 10000, null); + + assertTrue("should return 1 or more search results", result.getResults().size() > 0); + assertEquals("taskId should match the indexed task", "some-task-id", result.getResults().get(0)); + }); + } + + @Test + public void addMessage() { + String messageId = "some-message-id"; + + Message message = new Message(); + message.setId(messageId); + message.setPayload("some-payload"); + message.setReceipt("some-receipt"); + + indexDAO.addMessage("some-queue", message); + + await() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + SearchResponse searchResponse = search( + LOG_INDEX_PREFIX + "*", + "messageId='" + messageId + "'", + 0, + 10000, + "*", + MSG_DOC_TYPE + ); + assertEquals("search results should be length 1", searchResponse.getHits().getTotalHits(), 1); + + SearchHit searchHit = searchResponse.getHits().getAt(0); + GetResponse response = elasticSearchClient + .prepareGet(searchHit.getIndex(), MSG_DOC_TYPE, searchHit.getId()) + .get(); + assertEquals("indexed message id should match", messageId, response.getSource().get("messageId")); + assertEquals("indexed payload should match", "some-payload", response.getSource().get("payload")); + }); + } + + @Test + public void addEventExecution() { + String messageId = "some-message-id"; + + EventExecution eventExecution = new EventExecution(); + eventExecution.setId("some-id"); + eventExecution.setMessageId(messageId); + eventExecution.setAction(Type.complete_task); + eventExecution.setEvent("some-event"); + eventExecution.setStatus(EventExecution.Status.COMPLETED); + + indexDAO.addEventExecution(eventExecution); + + await() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + SearchResponse searchResponse = search( + LOG_INDEX_PREFIX + "*", + "messageId='" + messageId + "'", + 0, + 10000, + "*", + EVENT_DOC_TYPE + ); + + assertEquals("search results should be length 1", searchResponse.getHits().getTotalHits(), 1); + + SearchHit searchHit = searchResponse.getHits().getAt(0); + GetResponse response = elasticSearchClient + .prepareGet(searchHit.getIndex(), EVENT_DOC_TYPE, searchHit.getId()) + .get(); + + assertEquals("indexed message id should match", messageId, response.getSource().get("messageId")); + assertEquals("indexed id should match", "some-id", response.getSource().get("id")); + assertEquals("indexed status should match", EventExecution.Status.COMPLETED.name(), response.getSource().get("status")); + }); + } + + + private SearchResponse search(String indexName, String structuredQuery, int start, + int size, String freeTextQuery, String docType) throws ParserException { + QueryBuilder queryBuilder = QueryBuilders.matchAllQuery(); + if (StringUtils.isNotEmpty(structuredQuery)) { + Expression expression = Expression.fromString(structuredQuery); + queryBuilder = expression.getFilterBuilder(); + } + + BoolQueryBuilder filterQuery = QueryBuilders.boolQuery().must(queryBuilder); + QueryStringQueryBuilder stringQuery = QueryBuilders.queryStringQuery(freeTextQuery); + BoolQueryBuilder fq = QueryBuilders.boolQuery().must(stringQuery).must(filterQuery); + final SearchRequestBuilder srb = elasticSearchClient.prepareSearch(indexName) + .setQuery(fq) + .setTypes(docType) + .storedFields("_id") + .setFrom(start) + .setSize(size); + + return srb.get(); + } + +} diff --git a/es5-persistence/src/test/java/com/netflix/conductor/dao/es5/index/TestElasticSearchRestDAOV5.java b/es5-persistence/src/test/java/com/netflix/conductor/dao/es5/index/TestElasticSearchRestDAOV5.java new file mode 100644 index 0000000000..267323741b --- /dev/null +++ b/es5-persistence/src/test/java/com/netflix/conductor/dao/es5/index/TestElasticSearchRestDAOV5.java @@ -0,0 +1,448 @@ +package com.netflix.conductor.dao.es5.index; + +import static org.awaitility.Awaitility.await; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import com.amazonaws.util.IOUtils; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.netflix.conductor.common.metadata.events.EventExecution; +import com.netflix.conductor.common.metadata.events.EventHandler.Action.Type; +import com.netflix.conductor.common.metadata.tasks.Task; +import com.netflix.conductor.common.metadata.tasks.Task.Status; +import com.netflix.conductor.common.metadata.tasks.TaskExecLog; +import com.netflix.conductor.common.run.SearchResult; +import com.netflix.conductor.common.run.Workflow; +import com.netflix.conductor.core.events.queue.Message; +import com.netflix.conductor.dao.es5.index.query.parser.Expression; +import com.netflix.conductor.elasticsearch.ElasticSearchConfiguration; +import com.netflix.conductor.elasticsearch.ElasticSearchRestClientProvider; +import com.netflix.conductor.elasticsearch.EmbeddedElasticSearch; +import com.netflix.conductor.elasticsearch.SystemPropertiesElasticSearchConfiguration; +import com.netflix.conductor.elasticsearch.es5.EmbeddedElasticSearchV5; +import com.netflix.conductor.elasticsearch.query.parser.ParserException; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.text.SimpleDateFormat; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.query.QueryStringQueryBuilder; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.FieldSortBuilder; +import org.elasticsearch.search.sort.SortOrder; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestElasticSearchRestDAOV5 { + + private static final Logger logger = LoggerFactory.getLogger(TestElasticSearchRestDAOV5.class); + + private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat("yyyyMMww"); + + private static final String INDEX_NAME = "conductor"; + private static final String LOG_INDEX_PREFIX = "task_log"; + + private static final String MSG_DOC_TYPE = "message"; + private static final String EVENT_DOC_TYPE = "event"; + + private static ElasticSearchConfiguration configuration; + private static RestClient restClient; + private static RestHighLevelClient elasticSearchClient; + private static ElasticSearchRestDAOV5 indexDAO; + private static EmbeddedElasticSearch embeddedElasticSearch; + private static ObjectMapper objectMapper; + + private Workflow workflow; + + private @interface HttpMethod { + String GET = "GET"; + String POST = "POST"; + String PUT = "PUT"; + String HEAD = "HEAD"; + String DELETE = "DELETE"; + } + + @BeforeClass + public static void startServer() throws Exception { + System.setProperty(ElasticSearchConfiguration.EMBEDDED_PORT_PROPERTY_NAME, "9204"); + System.setProperty(ElasticSearchConfiguration.ELASTIC_SEARCH_URL_PROPERTY_NAME, "http://localhost:9204"); + + configuration = new SystemPropertiesElasticSearchConfiguration(); + + String host = configuration.getEmbeddedHost(); + int port = configuration.getEmbeddedPort(); + String clusterName = configuration.getEmbeddedClusterName(); + + embeddedElasticSearch = new EmbeddedElasticSearchV5(clusterName, host, port); + embeddedElasticSearch.start(); + + ElasticSearchRestClientProvider restClientProvider = + new ElasticSearchRestClientProvider(configuration); + restClient = restClientProvider.get(); + elasticSearchClient = new RestHighLevelClient(restClient); + + Map params = new HashMap<>(); + params.put("wait_for_status", "yellow"); + params.put("timeout", "30s"); + + restClient.performRequest("GET", "/_cluster/health", params); + + objectMapper = new ObjectMapper(); + indexDAO = new ElasticSearchRestDAOV5(restClient, configuration, objectMapper); + } + + @AfterClass + public static void closeClient() throws Exception { + if (restClient != null) { + restClient.close(); + } + + embeddedElasticSearch.stop(); + } + + @Before + public void createTestWorkflow() throws Exception { + // define indices + indexDAO.setup(); + + // initialize workflow + workflow = new Workflow(); + workflow.getInput().put("requestId", "request id 001"); + workflow.getInput().put("hasAwards", true); + workflow.getInput().put("channelMapping", 5); + Map name = new HashMap<>(); + name.put("name", "The Who"); + name.put("year", 1970); + Map name2 = new HashMap<>(); + name2.put("name", "The Doors"); + name2.put("year", 1975); + + List names = new LinkedList<>(); + names.add(name); + names.add(name2); + + workflow.getOutput().put("name", name); + workflow.getOutput().put("names", names); + workflow.getOutput().put("awards", 200); + + Task task = new Task(); + task.setReferenceTaskName("task2"); + task.getOutputData().put("location", "http://location"); + task.setStatus(Task.Status.COMPLETED); + + Task task2 = new Task(); + task2.setReferenceTaskName("task3"); + task2.getOutputData().put("refId", "abcddef_1234_7890_aaffcc"); + task2.setStatus(Task.Status.SCHEDULED); + + workflow.getTasks().add(task); + workflow.getTasks().add(task2); + } + + @After + public void tearDown() throws IOException { + deleteAllIndices(); + } + + private void deleteAllIndices() throws IOException { + Response beforeResponse = restClient.performRequest(HttpMethod.GET, "/_cat/indices"); + + Reader streamReader = new InputStreamReader(beforeResponse.getEntity().getContent()); + BufferedReader bufferedReader = new BufferedReader(streamReader); + + String line; + while ((line = bufferedReader.readLine()) != null) { + String[] fields = line.split("\\s"); + String endpoint = String.format("/%s", fields[2]); + + restClient.performRequest(HttpMethod.DELETE, endpoint); + } + } + + private boolean indexExists(final String index) throws IOException { + return indexDAO.doesResourceExist("/" + index); + } + + private boolean doesMappingExist(final String index, final String mappingName) throws IOException { + return indexDAO.doesResourceExist("/" + index + "/_mapping/" + mappingName); + } + + @Test + public void assertInitialSetup() throws Exception { + + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMww"); + dateFormat.setTimeZone(TimeZone.getTimeZone("GMT")); + + String taskLogIndex = "task_log_" + dateFormat.format(new Date()); + + assertTrue("Index 'conductor' should exist", indexExists("conductor")); + assertTrue("Index '" + taskLogIndex + "' should exist", indexExists(taskLogIndex)); + + assertTrue("Mapping 'workflow' for index 'conductor' should exist", doesMappingExist("conductor", "workflow")); + assertTrue("Mapping 'task' for inndex 'conductor' should exist", doesMappingExist("conductor", "task")); + } + + @Test + public void testWorkflowCRUD() { + + String testWorkflowType = "testworkflow"; + String testId = "1"; + + workflow.setWorkflowId(testId); + workflow.setWorkflowType(testWorkflowType); + + // Create + String workflowType = indexDAO.get(testId, "workflowType"); + assertNull("Workflow should not exist", workflowType); + + // Get + indexDAO.indexWorkflow(workflow); + + workflowType = indexDAO.get(testId, "workflowType"); + assertEquals("Should have found our workflow type", testWorkflowType, workflowType); + + // Update + String newWorkflowType = "newworkflowtype"; + String[] keyChanges = {"workflowType"}; + String[] valueChanges = {newWorkflowType}; + + indexDAO.updateWorkflow(testId, keyChanges, valueChanges); + + workflowType = indexDAO.get(testId, "workflowType"); + assertEquals("Should have updated our new workflow type", newWorkflowType, workflowType); + + // Delete + indexDAO.removeWorkflow(testId); + + workflowType = indexDAO.get(testId, "workflowType"); + assertNull("We should no longer have our workflow in the system", workflowType); + } + + @Test + public void taskExecutionLogs() throws Exception { + TaskExecLog taskExecLog1 = new TaskExecLog(); + taskExecLog1.setTaskId("some-task-id"); + long createdTime1 = LocalDateTime.of(2018, 11, 01, 06, 33, 22) + .toEpochSecond(ZoneOffset.UTC); + taskExecLog1.setCreatedTime(createdTime1); + taskExecLog1.setLog("some-log"); + TaskExecLog taskExecLog2 = new TaskExecLog(); + taskExecLog2.setTaskId("some-task-id"); + long createdTime2 = LocalDateTime.of(2018, 11, 01, 06, 33, 22) + .toEpochSecond(ZoneOffset.UTC); + taskExecLog2.setCreatedTime(createdTime2); + taskExecLog2.setLog("some-log"); + List logsToAdd = Arrays.asList(taskExecLog1, taskExecLog2); + indexDAO.addTaskExecutionLogs(logsToAdd); + + await() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + List taskExecutionLogs = indexDAO.getTaskExecutionLogs("some-task-id"); + assertEquals(2, taskExecutionLogs.size()); + }); + } + + @Test + public void indexTask() throws Exception { + String correlationId = "some-correlation-id"; + + Task task = new Task(); + task.setTaskId("some-task-id"); + task.setWorkflowInstanceId("some-workflow-instance-id"); + task.setTaskType("some-task-type"); + task.setStatus(Status.FAILED); + task.setInputData(new HashMap() {{ put("input_key", "input_value"); }}); + task.setCorrelationId(correlationId); + task.setTaskDefName("some-task-def-name"); + task.setReasonForIncompletion("some-failure-reason"); + + indexDAO.indexTask(task); + + await() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + SearchResult result = indexDAO + .searchTasks("correlationId='" + correlationId + "'", "*", 0, 10000, null); + + assertTrue("should return 1 or more search results", result.getResults().size() > 0); + assertEquals("taskId should match the indexed task", "some-task-id", result.getResults().get(0)); + }); + } + + @Test + public void addMessage() { + String messageId = "some-message-id"; + + Message message = new Message(); + message.setId(messageId); + message.setPayload("some-payload"); + message.setReceipt("some-receipt"); + + indexDAO.addMessage("some-queue", message); + + await() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + SearchResponse searchResponse = searchObjectIdsViaExpression( + LOG_INDEX_PREFIX + "*", + "messageId='" + messageId + "'", + 0, + 10000, + null, + "*", + MSG_DOC_TYPE + ); + assertTrue("should return 1 or more search results", searchResponse.getHits().getTotalHits() > 0); + + SearchHit searchHit = searchResponse.getHits().getAt(0); + String resourcePath = + String.format("/%s/%s/%s", searchHit.getIndex(), MSG_DOC_TYPE, searchHit.getId()); + Response response = restClient.performRequest(HttpMethod.GET, resourcePath); + + String responseBody = IOUtils.toString(response.getEntity().getContent()); + logger.info("responseBody: {}", responseBody); + + TypeReference> typeRef = + new TypeReference>() {}; + Map responseMap = objectMapper.readValue(responseBody, typeRef); + Map source = (Map) responseMap.get("_source"); + assertEquals("indexed message id should match", messageId, source.get("messageId")); + assertEquals("indexed payload should match", "some-payload", source.get("payload")); + }); + } + + @Test + public void addEventExecution() { + String messageId = "some-message-id"; + + EventExecution eventExecution = new EventExecution(); + eventExecution.setId("some-id"); + eventExecution.setMessageId(messageId); + eventExecution.setAction(Type.complete_task); + eventExecution.setEvent("some-event"); + eventExecution.setStatus(EventExecution.Status.COMPLETED); + + indexDAO.addEventExecution(eventExecution); + + await() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + SearchResponse searchResponse = searchObjectIdsViaExpression( + LOG_INDEX_PREFIX + "*", + "messageId='" + messageId + "'", + 0, + 10000, + null, + "*", + EVENT_DOC_TYPE + ); + assertTrue("should return 1 or more search results", searchResponse.getHits().getTotalHits() > 0); + + SearchHit searchHit = searchResponse.getHits().getAt(0); + String resourcePath = + String.format("/%s/%s/%s", searchHit.getIndex(), EVENT_DOC_TYPE, searchHit.getId()); + Response response = restClient.performRequest(HttpMethod.GET, resourcePath); + + String responseBody = IOUtils.toString(response.getEntity().getContent()); + TypeReference> typeRef = + new TypeReference>() { + }; + Map responseMap = objectMapper.readValue(responseBody, typeRef); + + Map sourceMap = (Map) responseMap.get("_source"); + assertEquals("indexed id should match", "some-id", sourceMap.get("id")); + assertEquals("indexed message id should match", messageId, sourceMap.get("messageId")); + assertEquals("indexed action should match", Type.complete_task.name(), sourceMap.get("action")); + assertEquals("indexed event should match", "some-event", sourceMap.get("event")); + assertEquals("indexed status should match", EventExecution.Status.COMPLETED.name(), sourceMap.get("status")); + }); + } + + private SearchResponse searchObjectIdsViaExpression(String indexName, String structuredQuery, int start, int size, + List sortOptions, String freeTextQuery, String docType) throws ParserException, IOException { + + // Build query + QueryBuilder queryBuilder = QueryBuilders.matchAllQuery(); + if(StringUtils.isNotEmpty(structuredQuery)) { + Expression expression = Expression.fromString(structuredQuery); + queryBuilder = expression.getFilterBuilder(); + } + + BoolQueryBuilder filterQuery = QueryBuilders.boolQuery().must(queryBuilder); + QueryStringQueryBuilder stringQuery = QueryBuilders.queryStringQuery(freeTextQuery); + BoolQueryBuilder fq = QueryBuilders.boolQuery().must(stringQuery).must(filterQuery); + + return searchObjectIds(indexName, fq, start, size, sortOptions, docType); + } + + /** + * Tries to find object ids for a given query in an index. + * + * @param indexName The name of the index. + * @param queryBuilder The query to use for searching. + * @param start The start to use. + * @param size The total return size. + * @param sortOptions A list of string options to sort in the form VALUE:ORDER; where ORDER is optional and can be either ASC OR DESC. + * @param docType The document type to searchObjectIdsViaExpression for. + * + * @return The SearchResults which includes the count and IDs that were found. + * @throws IOException If we cannot communicate with ES. + */ + private SearchResponse searchObjectIds(String indexName, QueryBuilder queryBuilder, int start, int size, List sortOptions, String docType) throws IOException { + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(queryBuilder); + searchSourceBuilder.from(start); + searchSourceBuilder.size(size); + + if (sortOptions != null && !sortOptions.isEmpty()) { + + for (String sortOption : sortOptions) { + SortOrder order = SortOrder.ASC; + String field = sortOption; + int index = sortOption.indexOf(":"); + if (index > 0) { + field = sortOption.substring(0, index); + order = SortOrder.valueOf(sortOption.substring(index + 1)); + } + searchSourceBuilder.sort(new FieldSortBuilder(field).order(order)); + } + } + + // Generate the actual request to send to ES. + SearchRequest searchRequest = new SearchRequest(indexName); + searchRequest.types(docType); + searchRequest.source(searchSourceBuilder); + + return elasticSearchClient.search(searchRequest); + } + +} diff --git a/es5-persistence/src/test/resources/log4j.properties b/es5-persistence/src/test/resources/log4j.properties new file mode 100644 index 0000000000..6db8d4b142 --- /dev/null +++ b/es5-persistence/src/test/resources/log4j.properties @@ -0,0 +1,11 @@ +# Set root logger level to DEBUG and its only appender to A1. +log4j.rootLogger=DEBUG, A1 + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender + +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +log4j.appender.org.apache.http=info diff --git a/server/dependencies.lock b/server/dependencies.lock index d2604f8178..402a136b5c 100644 --- a/server/dependencies.lock +++ b/server/dependencies.lock @@ -299,6 +299,12 @@ "locked": "9.3.9.v20160517", "requested": "9.3.9.v20160517" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-es5-persistence" + ], + "locked": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-es5-persistence" @@ -642,6 +648,12 @@ "locked": "9.3.9.v20160517", "requested": "9.3.9.v20160517" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-es5-persistence" + ], + "locked": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-es5-persistence" @@ -985,6 +997,12 @@ "locked": "9.3.9.v20160517", "requested": "9.3.9.v20160517" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-es5-persistence" + ], + "locked": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-es5-persistence" @@ -1328,6 +1346,12 @@ "locked": "9.3.9.v20160517", "requested": "9.3.9.v20160517" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-es5-persistence" + ], + "locked": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-es5-persistence" @@ -1741,6 +1765,12 @@ "locked": "9.3.9.v20160517", "requested": "9.3.9.v20160517" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-es5-persistence" + ], + "locked": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-es5-persistence" @@ -2084,6 +2114,12 @@ "locked": "9.3.9.v20160517", "requested": "9.3.9.v20160517" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-es5-persistence" + ], + "locked": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-es5-persistence" @@ -2427,6 +2463,12 @@ "project": true, "requested": "9.3.9.v20160517" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-es5-persistence" + ], + "locked": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-es5-persistence" @@ -2774,6 +2816,12 @@ "locked": "9.3.9.v20160517", "requested": "9.3.9.v20160517" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-es5-persistence" + ], + "locked": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-es5-persistence" @@ -3125,6 +3173,12 @@ "locked": "9.3.9.v20160517", "requested": "9.3.9.v20160517" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-es5-persistence" + ], + "locked": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-es5-persistence" @@ -3476,6 +3530,12 @@ "locked": "9.3.9.v20160517", "requested": "9.3.9.v20160517" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-es5-persistence" + ], + "locked": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-es5-persistence" @@ -3827,6 +3887,12 @@ "locked": "9.3.9.v20160517", "requested": "9.3.9.v20160517" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-es5-persistence" + ], + "locked": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-es5-persistence" diff --git a/server/src/main/java/com/netflix/conductor/bootstrap/ModulesProvider.java b/server/src/main/java/com/netflix/conductor/bootstrap/ModulesProvider.java index 350665a9b9..403765ce93 100644 --- a/server/src/main/java/com/netflix/conductor/bootstrap/ModulesProvider.java +++ b/server/src/main/java/com/netflix/conductor/bootstrap/ModulesProvider.java @@ -11,7 +11,7 @@ import com.netflix.conductor.core.utils.DummyPayloadStorage; import com.netflix.conductor.core.utils.S3PayloadStorage; import com.netflix.conductor.dao.RedisWorkflowModule; -import com.netflix.conductor.elasticsearch.es5.ElasticSearchV5Module; +import com.netflix.conductor.elasticsearch.ElasticSearchModule; import com.netflix.conductor.mysql.MySQLWorkflowModule; import com.netflix.conductor.server.DynomiteClusterModule; import com.netflix.conductor.server.JerseyModule; @@ -90,7 +90,7 @@ private List selectModulesToLoad() { break; } - modules.add(new ElasticSearchV5Module()); + modules.add(new ElasticSearchModule()); modules.add(new WorkflowExecutorModule()); diff --git a/test-harness/build.gradle b/test-harness/build.gradle index 6b98298e3f..7d103cdc70 100644 --- a/test-harness/build.gradle +++ b/test-harness/build.gradle @@ -23,6 +23,7 @@ dependencies { testCompile "io.swagger:swagger-jersey-jaxrs:${revSwaggerJersey}" testCompile "ch.vorburger.mariaDB4j:mariaDB4j:${revMariaDB4j}" + testCompile "org.awaitility:awaitility:${revAwaitility}" } test { diff --git a/test-harness/dependencies.lock b/test-harness/dependencies.lock index d85895eead..55fbb94872 100644 --- a/test-harness/dependencies.lock +++ b/test-harness/dependencies.lock @@ -387,6 +387,10 @@ ], "locked": "2.9.1" }, + "org.awaitility:awaitility": { + "locked": "3.1.2", + "requested": "3.1.2" + }, "org.eclipse.jetty:jetty-server": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-server" @@ -401,6 +405,12 @@ "locked": "9.3.9.v20160517", "requested": "9.3.9.v20160517" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-es5-persistence" + ], + "locked": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-es5-persistence" @@ -828,6 +838,10 @@ ], "locked": "2.9.1" }, + "org.awaitility:awaitility": { + "locked": "3.1.2", + "requested": "3.1.2" + }, "org.eclipse.jetty:jetty-server": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-server" @@ -842,6 +856,12 @@ "locked": "9.3.9.v20160517", "requested": "9.3.9.v20160517" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-es5-persistence" + ], + "locked": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-es5-persistence" @@ -1269,6 +1289,10 @@ ], "locked": "2.9.1" }, + "org.awaitility:awaitility": { + "locked": "3.1.2", + "requested": "3.1.2" + }, "org.eclipse.jetty:jetty-server": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-server" @@ -1283,6 +1307,12 @@ "locked": "9.3.9.v20160517", "requested": "9.3.9.v20160517" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-es5-persistence" + ], + "locked": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-es5-persistence" @@ -1710,6 +1740,10 @@ ], "locked": "2.9.1" }, + "org.awaitility:awaitility": { + "locked": "3.1.2", + "requested": "3.1.2" + }, "org.eclipse.jetty:jetty-server": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-server" @@ -1724,6 +1758,12 @@ "locked": "9.3.9.v20160517", "requested": "9.3.9.v20160517" }, + "org.elasticsearch.client:elasticsearch-rest-client": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-es5-persistence" + ], + "locked": "5.6.8" + }, "org.elasticsearch.client:elasticsearch-rest-high-level-client": { "firstLevelTransitive": [ "com.netflix.conductor:conductor-es5-persistence" diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/integration/End2EndGrpcTests.java b/test-harness/src/test/java/com/netflix/conductor/tests/integration/AbstractGrpcEndToEndTest.java similarity index 77% rename from test-harness/src/test/java/com/netflix/conductor/tests/integration/End2EndGrpcTests.java rename to test-harness/src/test/java/com/netflix/conductor/tests/integration/AbstractGrpcEndToEndTest.java index cddd44c0d9..535ba0fb08 100644 --- a/test-harness/src/test/java/com/netflix/conductor/tests/integration/End2EndGrpcTests.java +++ b/test-harness/src/test/java/com/netflix/conductor/tests/integration/AbstractGrpcEndToEndTest.java @@ -15,10 +15,10 @@ */ package com.netflix.conductor.tests.integration; -import com.google.inject.Guice; -import com.google.inject.Injector; -import com.netflix.conductor.bootstrap.BootstrapModule; -import com.netflix.conductor.bootstrap.ModulesProvider; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + import com.netflix.conductor.client.grpc.MetadataClient; import com.netflix.conductor.client.grpc.TaskClient; import com.netflix.conductor.client.grpc.WorkflowClient; @@ -34,60 +34,42 @@ import com.netflix.conductor.common.run.Workflow; import com.netflix.conductor.common.run.Workflow.WorkflowStatus; import com.netflix.conductor.common.run.WorkflowSummary; -import com.netflix.conductor.elasticsearch.ElasticSearchConfiguration; import com.netflix.conductor.elasticsearch.EmbeddedElasticSearch; -import com.netflix.conductor.elasticsearch.EmbeddedElasticSearchProvider; -import com.netflix.conductor.grpc.server.GRPCServer; -import com.netflix.conductor.grpc.server.GRPCServerConfiguration; -import com.netflix.conductor.grpc.server.GRPCServerProvider; -import com.netflix.conductor.tests.utils.TestEnvironment; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - import java.util.LinkedList; import java.util.List; -import java.util.Optional; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import org.junit.Test; /** * @author Viren */ -public class End2EndGrpcTests extends AbstractEndToEndTest { - private static TaskClient taskClient; - private static WorkflowClient workflowClient; - private static MetadataClient metadataClient; - private static EmbeddedElasticSearch search; - - @BeforeClass - public static void setup() throws Exception { - TestEnvironment.setup(); - System.setProperty(GRPCServerConfiguration.ENABLED_PROPERTY_NAME, "true"); - System.setProperty(ElasticSearchConfiguration.EMBEDDED_PORT_PROPERTY_NAME, "9202"); - System.setProperty(ElasticSearchConfiguration.ELASTIC_SEARCH_URL_PROPERTY_NAME, "localhost:9302"); - - Injector bootInjector = Guice.createInjector(new BootstrapModule()); - Injector serverInjector = Guice.createInjector(bootInjector.getInstance(ModulesProvider.class).get()); - - search = serverInjector.getInstance(EmbeddedElasticSearchProvider.class).get().get(); - search.start(); - - Optional server = serverInjector.getInstance(GRPCServerProvider.class).get(); - assertTrue("failed to instantiate GRPCServer", server.isPresent()); - server.get().start(); - - taskClient = new TaskClient("localhost", 8090); - workflowClient = new WorkflowClient("localhost", 8090); - metadataClient = new MetadataClient("localhost", 8090); +public abstract class AbstractGrpcEndToEndTest extends AbstractEndToEndTest { + + protected static TaskClient taskClient; + protected static WorkflowClient workflowClient; + protected static MetadataClient metadataClient; + protected static EmbeddedElasticSearch search; + + @Override + protected String startWorkflow(String workflowExecutionName, WorkflowDef workflowDefinition) { + StartWorkflowRequest workflowRequest = new StartWorkflowRequest() + .withName(workflowExecutionName) + .withWorkflowDef(workflowDefinition); + return workflowClient.startWorkflow(workflowRequest); } - @AfterClass - public static void teardown() throws Exception { - TestEnvironment.teardown(); - search.stop(); + @Override + protected Workflow getWorkflow(String workflowId, boolean includeTasks) { + return workflowClient.getWorkflow(workflowId, includeTasks); + } + + @Override + protected TaskDef getTaskDefinition(String taskName) { + return metadataClient.getTaskDef(taskName); + } + + @Override + protected void registerTaskDefinitions(List taskDefinitionList) { + metadataClient.registerTaskDefs(taskDefinitionList); } @Test @@ -213,27 +195,4 @@ public void testAll() throws Exception { assertEquals(WorkflowStatus.RUNNING, wf.getStatus()); assertEquals(1, wf.getTasks().size()); } - - @Override - protected String startWorkflow(String workflowExecutionName, WorkflowDef workflowDefinition) { - StartWorkflowRequest workflowRequest = new StartWorkflowRequest() - .withName(workflowExecutionName) - .withWorkflowDef(workflowDefinition); - return workflowClient.startWorkflow(workflowRequest); - } - - @Override - protected Workflow getWorkflow(String workflowId, boolean includeTasks) { - return workflowClient.getWorkflow(workflowId, includeTasks); - } - - @Override - protected TaskDef getTaskDefinition(String taskName) { - return metadataClient.getTaskDef(taskName); - } - - @Override - protected void registerTaskDefinitions(List taskDefinitionList) { - metadataClient.registerTaskDefs(taskDefinitionList); - } } diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/integration/End2EndTests.java b/test-harness/src/test/java/com/netflix/conductor/tests/integration/AbstractHttpEndToEndTest.java similarity index 85% rename from test-harness/src/test/java/com/netflix/conductor/tests/integration/End2EndTests.java rename to test-harness/src/test/java/com/netflix/conductor/tests/integration/AbstractHttpEndToEndTest.java index c17c7c727f..4f4c6def5a 100644 --- a/test-harness/src/test/java/com/netflix/conductor/tests/integration/End2EndTests.java +++ b/test-harness/src/test/java/com/netflix/conductor/tests/integration/AbstractHttpEndToEndTest.java @@ -12,10 +12,11 @@ */ package com.netflix.conductor.tests.integration; -import com.google.inject.Guice; -import com.google.inject.Injector; -import com.netflix.conductor.bootstrap.BootstrapModule; -import com.netflix.conductor.bootstrap.ModulesProvider; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + import com.netflix.conductor.client.exceptions.ConductorClientException; import com.netflix.conductor.client.http.MetadataClient; import com.netflix.conductor.client.http.TaskClient; @@ -32,67 +33,25 @@ import com.netflix.conductor.common.run.Workflow; import com.netflix.conductor.common.run.Workflow.WorkflowStatus; import com.netflix.conductor.common.run.WorkflowSummary; -import com.netflix.conductor.elasticsearch.ElasticSearchConfiguration; import com.netflix.conductor.elasticsearch.EmbeddedElasticSearch; -import com.netflix.conductor.elasticsearch.EmbeddedElasticSearchProvider; -import com.netflix.conductor.jetty.server.JettyServer; -import com.netflix.conductor.tests.utils.TestEnvironment; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - import java.util.ArrayList; import java.util.HashMap; import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import java.util.concurrent.TimeUnit; +import org.awaitility.Awaitility; +import org.junit.Test; /** * @author Viren */ +public abstract class AbstractHttpEndToEndTest extends AbstractEndToEndTest { -public class End2EndTests extends AbstractEndToEndTest { - - private static TaskClient taskClient; - private static WorkflowClient workflowClient; - private static EmbeddedElasticSearch search; - private static MetadataClient metadataClient; - - private static final int SERVER_PORT = 8080; - - @BeforeClass - public static void setup() throws Exception { - TestEnvironment.setup(); - System.setProperty(ElasticSearchConfiguration.EMBEDDED_PORT_PROPERTY_NAME, "9201"); - System.setProperty(ElasticSearchConfiguration.ELASTIC_SEARCH_URL_PROPERTY_NAME, "localhost:9301"); - - Injector bootInjector = Guice.createInjector(new BootstrapModule()); - Injector serverInjector = Guice.createInjector(bootInjector.getInstance(ModulesProvider.class).get()); - - search = serverInjector.getInstance(EmbeddedElasticSearchProvider.class).get().get(); - search.start(); + protected static String apiRoot; - JettyServer server = new JettyServer(SERVER_PORT, false); - server.start(); - - taskClient = new TaskClient(); - taskClient.setRootURI("http://localhost:8080/api/"); - - workflowClient = new WorkflowClient(); - workflowClient.setRootURI("http://localhost:8080/api/"); - - metadataClient = new MetadataClient(); - metadataClient.setRootURI("http://localhost:8080/api/"); - } - - @AfterClass - public static void teardown() throws Exception { - TestEnvironment.teardown(); - search.stop(); - } + protected static TaskClient taskClient; + protected static WorkflowClient workflowClient; + protected static EmbeddedElasticSearch search; + protected static MetadataClient metadataClient; @Override protected String startWorkflow(String workflowExecutionName, WorkflowDef workflowDefinition) { @@ -154,9 +113,13 @@ public void testAll() throws Exception { assertEquals(0, workflow.getTasks().size()); assertEquals(workflowId, workflow.getWorkflowId()); - List workflowList = workflowClient.getWorkflows(def.getName(), correlationId, false, false); - assertEquals(1, workflowList.size()); - assertEquals(workflowId, workflowList.get(0).getWorkflowId()); + Awaitility.await() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + List workflowList = workflowClient.getWorkflows(def.getName(), correlationId, false, false); + assertEquals(1, workflowList.size()); + assertEquals(workflowId, workflowList.get(0).getWorkflowId()); + }); workflow = workflowClient.getWorkflow(workflowId, true); assertNotNull(workflow); @@ -274,7 +237,7 @@ public void testMetadataWorkflowDefinition() { @Test public void testInvalidResource() { MetadataClient metadataClient = new MetadataClient(); - metadataClient.setRootURI("http://localhost:8080/api/invalid"); + metadataClient.setRootURI(String.format("%sinvalid", apiRoot)); WorkflowDef def = new WorkflowDef(); def.setName("testWorkflowDel"); def.setVersion(1); diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/integration/ESRestClientHttpEndToEndTest.java b/test-harness/src/test/java/com/netflix/conductor/tests/integration/ESRestClientHttpEndToEndTest.java new file mode 100644 index 0000000000..e4024dd360 --- /dev/null +++ b/test-harness/src/test/java/com/netflix/conductor/tests/integration/ESRestClientHttpEndToEndTest.java @@ -0,0 +1,99 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.tests.integration; + +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.netflix.conductor.bootstrap.BootstrapModule; +import com.netflix.conductor.bootstrap.ModulesProvider; +import com.netflix.conductor.client.http.MetadataClient; +import com.netflix.conductor.client.http.TaskClient; +import com.netflix.conductor.client.http.WorkflowClient; +import com.netflix.conductor.elasticsearch.ElasticSearchConfiguration; +import com.netflix.conductor.elasticsearch.ElasticSearchRestClientProvider; +import com.netflix.conductor.elasticsearch.EmbeddedElasticSearchProvider; +import com.netflix.conductor.elasticsearch.SystemPropertiesElasticSearchConfiguration; +import com.netflix.conductor.jetty.server.JettyServer; +import com.netflix.conductor.tests.utils.TestEnvironment; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import org.elasticsearch.client.RestClient; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Viren + */ +public class ESRestClientHttpEndToEndTest extends AbstractHttpEndToEndTest { + + private static final Logger logger = + LoggerFactory.getLogger(ESRestClientHttpEndToEndTest.class); + + private static final int SERVER_PORT = 8083; + + private static RestClient elasticSearchAdminClient; + + @BeforeClass + public static void setup() throws Exception { + TestEnvironment.setup(); + System.setProperty(ElasticSearchConfiguration.EMBEDDED_PORT_PROPERTY_NAME, "9203"); + System.setProperty(ElasticSearchConfiguration.ELASTIC_SEARCH_URL_PROPERTY_NAME, "http://localhost:9203"); + + Injector bootInjector = Guice.createInjector(new BootstrapModule()); + Injector serverInjector = Guice.createInjector(bootInjector.getInstance(ModulesProvider.class).get()); + + search = serverInjector.getInstance(EmbeddedElasticSearchProvider.class).get().get(); + search.start(); + + SystemPropertiesElasticSearchConfiguration configuration = new SystemPropertiesElasticSearchConfiguration(); + ElasticSearchRestClientProvider restClientProvider = new ElasticSearchRestClientProvider(configuration); + elasticSearchAdminClient = restClientProvider.get(); + + waitForGreenCluster(); + + JettyServer server = new JettyServer(SERVER_PORT, false); + server.start(); + + apiRoot = String.format("http://localhost:%d/api/", SERVER_PORT); + + taskClient = new TaskClient(); + taskClient.setRootURI(apiRoot); + + workflowClient = new WorkflowClient(); + workflowClient.setRootURI(apiRoot); + + metadataClient = new MetadataClient(); + metadataClient.setRootURI(apiRoot); + } + + @AfterClass + public static void teardown() throws Exception { + TestEnvironment.teardown(); + search.stop(); + } + + private static void waitForGreenCluster() throws Exception { + long startTime = System.currentTimeMillis(); + + Map params = new HashMap<>(); + params.put("wait_for_status", "green"); + params.put("timeout", "30s"); + + elasticSearchAdminClient.performRequest("GET", "/_cluster/health", params); + logger.info("Elasticsearch Cluster ready in {} ms", System.currentTimeMillis() - startTime); + } + +} diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/integration/GrpcEndToEndTest.java b/test-harness/src/test/java/com/netflix/conductor/tests/integration/GrpcEndToEndTest.java new file mode 100644 index 0000000000..9f30f41a01 --- /dev/null +++ b/test-harness/src/test/java/com/netflix/conductor/tests/integration/GrpcEndToEndTest.java @@ -0,0 +1,75 @@ +/** + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +/** + * + */ +package com.netflix.conductor.tests.integration; + +import static org.junit.Assert.assertTrue; + +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.netflix.conductor.bootstrap.BootstrapModule; +import com.netflix.conductor.bootstrap.ModulesProvider; +import com.netflix.conductor.client.grpc.MetadataClient; +import com.netflix.conductor.client.grpc.TaskClient; +import com.netflix.conductor.client.grpc.WorkflowClient; +import com.netflix.conductor.core.config.Configuration; +import com.netflix.conductor.elasticsearch.ElasticSearchConfiguration; +import com.netflix.conductor.elasticsearch.EmbeddedElasticSearchProvider; +import com.netflix.conductor.grpc.server.GRPCServer; +import com.netflix.conductor.grpc.server.GRPCServerConfiguration; +import com.netflix.conductor.grpc.server.GRPCServerProvider; +import com.netflix.conductor.mysql.MySQLConfiguration; +import com.netflix.conductor.tests.utils.TestEnvironment; +import java.util.Optional; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +/** + * @author Viren + */ +public class GrpcEndToEndTest extends AbstractGrpcEndToEndTest { + + private static final int SERVER_PORT = 8092; + + @BeforeClass + public static void setup() throws Exception { + TestEnvironment.setup(); + System.setProperty(GRPCServerConfiguration.ENABLED_PROPERTY_NAME, "true"); + System.setProperty(GRPCServerConfiguration.PORT_PROPERTY_NAME, "8092"); + System.setProperty(ElasticSearchConfiguration.EMBEDDED_PORT_PROPERTY_NAME, "9202"); + System.setProperty(ElasticSearchConfiguration.ELASTIC_SEARCH_URL_PROPERTY_NAME, "localhost:9302"); + + Injector bootInjector = Guice.createInjector(new BootstrapModule()); + Injector serverInjector = Guice.createInjector(bootInjector.getInstance(ModulesProvider.class).get()); + + search = serverInjector.getInstance(EmbeddedElasticSearchProvider.class).get().get(); + search.start(); + + Optional server = serverInjector.getInstance(GRPCServerProvider.class).get(); + assertTrue("failed to instantiate GRPCServer", server.isPresent()); + server.get().start(); + + taskClient = new TaskClient("localhost", SERVER_PORT); + workflowClient = new WorkflowClient("localhost", SERVER_PORT); + metadataClient = new MetadataClient("localhost", SERVER_PORT); + } + + @AfterClass + public static void teardown() throws Exception { + TestEnvironment.teardown(); + search.stop(); + } + +} diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/integration/HttpEndToEndTest.java b/test-harness/src/test/java/com/netflix/conductor/tests/integration/HttpEndToEndTest.java new file mode 100644 index 0000000000..456070aa2d --- /dev/null +++ b/test-harness/src/test/java/com/netflix/conductor/tests/integration/HttpEndToEndTest.java @@ -0,0 +1,69 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.tests.integration; + +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.netflix.conductor.bootstrap.BootstrapModule; +import com.netflix.conductor.bootstrap.ModulesProvider; +import com.netflix.conductor.client.http.MetadataClient; +import com.netflix.conductor.client.http.TaskClient; +import com.netflix.conductor.client.http.WorkflowClient; +import com.netflix.conductor.elasticsearch.ElasticSearchConfiguration; +import com.netflix.conductor.elasticsearch.EmbeddedElasticSearchProvider; +import com.netflix.conductor.jetty.server.JettyServer; +import com.netflix.conductor.tests.utils.TestEnvironment; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +/** + * @author Viren + */ +public class HttpEndToEndTest extends AbstractHttpEndToEndTest { + + private static final int SERVER_PORT = 8080; + + @BeforeClass + public static void setup() throws Exception { + TestEnvironment.setup(); + System.setProperty(ElasticSearchConfiguration.EMBEDDED_PORT_PROPERTY_NAME, "9201"); + System.setProperty(ElasticSearchConfiguration.ELASTIC_SEARCH_URL_PROPERTY_NAME, "localhost:9301"); + + Injector bootInjector = Guice.createInjector(new BootstrapModule()); + Injector serverInjector = Guice.createInjector(bootInjector.getInstance(ModulesProvider.class).get()); + + search = serverInjector.getInstance(EmbeddedElasticSearchProvider.class).get().get(); + search.start(); + + JettyServer server = new JettyServer(SERVER_PORT, false); + server.start(); + + apiRoot = String.format("http://localhost:%d/api/", SERVER_PORT); + + taskClient = new TaskClient(); + taskClient.setRootURI(apiRoot); + + workflowClient = new WorkflowClient(); + workflowClient.setRootURI(apiRoot); + + metadataClient = new MetadataClient(); + metadataClient.setRootURI(apiRoot); + } + + @AfterClass + public static void teardown() throws Exception { + TestEnvironment.teardown(); + search.stop(); + } + +} diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/integration/MySQLGrpcEndToEndTest.java b/test-harness/src/test/java/com/netflix/conductor/tests/integration/MySQLGrpcEndToEndTest.java new file mode 100644 index 0000000000..852828e8d6 --- /dev/null +++ b/test-harness/src/test/java/com/netflix/conductor/tests/integration/MySQLGrpcEndToEndTest.java @@ -0,0 +1,79 @@ +/** + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +/** + * + */ +package com.netflix.conductor.tests.integration; + +import static org.junit.Assert.assertTrue; + +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.netflix.conductor.bootstrap.BootstrapModule; +import com.netflix.conductor.bootstrap.ModulesProvider; +import com.netflix.conductor.client.grpc.MetadataClient; +import com.netflix.conductor.client.grpc.TaskClient; +import com.netflix.conductor.client.grpc.WorkflowClient; +import com.netflix.conductor.core.config.Configuration; +import com.netflix.conductor.elasticsearch.ElasticSearchConfiguration; +import com.netflix.conductor.elasticsearch.EmbeddedElasticSearchProvider; +import com.netflix.conductor.grpc.server.GRPCServer; +import com.netflix.conductor.grpc.server.GRPCServerConfiguration; +import com.netflix.conductor.grpc.server.GRPCServerProvider; +import com.netflix.conductor.mysql.MySQLConfiguration; +import com.netflix.conductor.tests.utils.MySQLTestModule; +import com.netflix.conductor.tests.utils.MySQLTestRunner; +import com.netflix.conductor.tests.utils.TestEnvironment; +import java.util.Optional; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.runner.RunWith; + +/** + * @author Viren + */ +@RunWith(MySQLTestRunner.class) +public class MySQLGrpcEndToEndTest extends AbstractGrpcEndToEndTest { + + private static final int SERVER_PORT = 8094; + + @BeforeClass + public static void setup() throws Exception { + TestEnvironment.setup(); + System.setProperty(GRPCServerConfiguration.ENABLED_PROPERTY_NAME, "true"); + System.setProperty(GRPCServerConfiguration.PORT_PROPERTY_NAME, "8094"); + System.setProperty(ElasticSearchConfiguration.EMBEDDED_PORT_PROPERTY_NAME, "9204"); + System.setProperty(ElasticSearchConfiguration.ELASTIC_SEARCH_URL_PROPERTY_NAME, "localhost:9304"); + + Injector bootInjector = Guice.createInjector(new BootstrapModule()); + Injector serverInjector = Guice.createInjector(bootInjector.getInstance(ModulesProvider.class).get()); + + search = serverInjector.getInstance(EmbeddedElasticSearchProvider.class).get().get(); + search.start(); + + Optional server = serverInjector.getInstance(GRPCServerProvider.class).get(); + assertTrue("failed to instantiate GRPCServer", server.isPresent()); + server.get().start(); + + taskClient = new TaskClient("localhost", SERVER_PORT); + workflowClient = new WorkflowClient("localhost", SERVER_PORT); + metadataClient = new MetadataClient("localhost", SERVER_PORT); + } + + @AfterClass + public static void teardown() throws Exception { + TestEnvironment.teardown(); + search.stop(); + } + +} diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/utils/MySQLTestModule.java b/test-harness/src/test/java/com/netflix/conductor/tests/utils/MySQLTestModule.java index a623551bcd..54830373a2 100644 --- a/test-harness/src/test/java/com/netflix/conductor/tests/utils/MySQLTestModule.java +++ b/test-harness/src/test/java/com/netflix/conductor/tests/utils/MySQLTestModule.java @@ -9,6 +9,8 @@ import com.netflix.conductor.common.utils.JsonMapperProvider; import com.netflix.conductor.core.config.Configuration; import com.netflix.conductor.core.config.CoreModule; +import com.netflix.conductor.core.execution.WorkflowStatusListener; +import com.netflix.conductor.core.execution.WorkflowStatusListenerStub; import com.netflix.conductor.dao.ExecutionDAO; import com.netflix.conductor.dao.IndexDAO; import com.netflix.conductor.dao.MetadataDAO; @@ -50,6 +52,7 @@ protected void configure() { bind(ExecutionDAO.class).to(MySQLExecutionDAO.class); bind(QueueDAO.class).to(MySQLQueueDAO.class); bind(IndexDAO.class).to(MockIndexDAO.class); + bind(WorkflowStatusListener.class).to(WorkflowStatusListenerStub.class); install(new CoreModule()); bind(UserTask.class).asEagerSingleton(); diff --git a/versionsOfDependencies.gradle b/versionsOfDependencies.gradle index 41bfda0028..19f62dc839 100644 --- a/versionsOfDependencies.gradle +++ b/versionsOfDependencies.gradle @@ -2,6 +2,7 @@ * Common place to define all the version dependencies */ ext { + revAwaitility = '3.1.2' revAwsSdk = '1.11.86' revArchaius = '0.7.5' revCommonsLang3 = '3.0'