Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #2179 from Netflix/sync_2.31
Browse files Browse the repository at this point in the history
sync to 2.31 98c75e9
  • Loading branch information
apanicker-nflx authored Apr 14, 2021
2 parents 8b1c025 + dfa7955 commit 961b97b
Show file tree
Hide file tree
Showing 66 changed files with 11,908 additions and 79 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 Netflix, Inc.
* Copyright 2021 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1236,8 +1236,8 @@ List<String> cancelNonTerminalTasks(Workflow workflow) {
}
if (erroredTasks.isEmpty()) {
try {
queueDAO.remove(DECIDER_QUEUE, workflow.getWorkflowId());
workflowStatusListener.onWorkflowFinalizedIfEnabled(workflow);
queueDAO.remove(DECIDER_QUEUE, workflow.getWorkflowId());
} catch (Exception e) {
LOGGER.error("Error removing workflow: {} from decider queue", workflow.getWorkflowId(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.util.concurrent.ThreadFactory;
import javax.sql.DataSource;
import org.flywaydb.core.Flyway;
import org.flywaydb.core.api.configuration.FluentConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.sql.DataSource;
import java.util.concurrent.ThreadFactory;

public class MySQLDataSourceProvider {

private static final Logger LOGGER = LoggerFactory.getLogger(MySQLDataSourceProvider.class);
Expand Down Expand Up @@ -62,9 +63,9 @@ private HikariConfig createConfiguration() {
hikariConfig.setAutoCommit(properties.isAutoCommit());

ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("hikari-mysql-%d")
.build();
.setDaemon(true)
.setNameFormat("hikari-mysql-%d")
.build();

hikariConfig.setThreadFactory(threadFactory);
return hikariConfig;
Expand All @@ -78,16 +79,15 @@ private void flywayMigrate(DataSource dataSource) {
return;
}

FluentConfiguration fluentConfiguration = Flyway.configure()
.dataSource(dataSource)
.placeholderReplacement(false);
String flywayTable = properties.getFlywayTable();
LOGGER.debug("Using Flyway migration table '{}'", flywayTable);

properties.getFlywayTable().ifPresent(tableName -> {
LOGGER.debug("Using Flyway migration table '{}'", tableName);
fluentConfiguration.table(tableName);
});
FluentConfiguration fluentConfiguration = Flyway.configure()
.table(flywayTable)
.dataSource(dataSource)
.placeholderReplacement(false);

Flyway flyway = new Flyway(fluentConfiguration);
Flyway flyway = fluentConfiguration.load();
flyway.migrate();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
*/
package com.netflix.conductor.mysql.config;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.convert.DurationUnit;

import java.sql.Connection;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Optional;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.convert.DurationUnit;

@ConfigurationProperties("conductor.mysql")
public class MySQLProperties {
Expand Down Expand Up @@ -45,7 +45,7 @@ public class MySQLProperties {
/**
* Used to override the default flyway migration table
*/
private String flywayTable = null;
private String flywayTable = "schema_version";

// The defaults are currently in line with the HikariConfig defaults, which are unfortunately private.
/**
Expand Down Expand Up @@ -125,8 +125,8 @@ public void setFlywayEnabled(boolean flywayEnabled) {
this.flywayEnabled = flywayEnabled;
}

public Optional<String> getFlywayTable() {
return Optional.ofNullable(flywayTable);
public String getFlywayTable() {
return flywayTable;
}

public void setFlywayTable(String flywayTable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,25 @@
*/
package com.netflix.conductor.mysql.util;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.conductor.mysql.config.MySQLProperties;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import javax.sql.DataSource;
import org.flywaydb.core.Flyway;
import org.flywaydb.core.api.configuration.FluentConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.MySQLContainer;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class MySQLDAOTestUtil {

private static final Logger LOGGER = LoggerFactory.getLogger(MySQLDAOTestUtil.class);
Expand All @@ -40,7 +41,7 @@ public class MySQLDAOTestUtil {
public MySQLDAOTestUtil(MySQLContainer mySQLContainer, ObjectMapper objectMapper, String dbName) {
properties = mock(MySQLProperties.class);
when(properties.getJdbcUrl()).thenReturn(mySQLContainer.getJdbcUrl()
+ "?useSSL=false&useUnicode=true&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC");
+ "?useSSL=false&useUnicode=true&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC");
when(properties.getJdbcUsername()).thenReturn(mySQLContainer.getUsername());
when(properties.getJdbcPassword()).thenReturn(mySQLContainer.getPassword());
when(properties.getTaskDefCacheRefreshInterval()).thenReturn(Duration.ofSeconds(60));
Expand All @@ -66,10 +67,11 @@ private HikariDataSource getDataSource(MySQLProperties properties) {

private void flywayMigrate(DataSource dataSource) {
FluentConfiguration fluentConfiguration = Flyway.configure()
.dataSource(dataSource)
.placeholderReplacement(false);
.table("schema_version")
.dataSource(dataSource)
.placeholderReplacement(false);

Flyway flyway = new Flyway(fluentConfiguration);
Flyway flyway = fluentConfiguration.load();
flyway.migrate();
}

Expand All @@ -89,7 +91,7 @@ public void resetAllData() {
LOGGER.info("Resetting data for test");
try (Connection connection = dataSource.getConnection()) {
try (ResultSet rs = connection.prepareStatement("SHOW TABLES").executeQuery();
PreparedStatement keysOn = connection.prepareStatement("SET FOREIGN_KEY_CHECKS=1")) {
PreparedStatement keysOn = connection.prepareStatement("SET FOREIGN_KEY_CHECKS=1")) {
try (PreparedStatement keysOff = connection.prepareStatement("SET FOREIGN_KEY_CHECKS=0")) {
keysOff.execute();
while (rs.next()) {
Expand Down
109 changes: 109 additions & 0 deletions polyglot-clients/go/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Go client for Conductor
Go client for Conductor provides two sets of functions:

1. Workflow Management APIs (start, terminate, get workflow status etc.)
2. Worker execution framework

## Prerequisites
Go must be installed and GOPATH env variable set.

## Install

```shell
go get github.com/netflix/conductor/client/go
```
This will create a Go project under $GOPATH/src and download any dependencies.

## Run

```shell
go run $GOPATH/src/netflix-conductor/client/go/startclient/startclient.go
```

## Using Workflow Management API
Go struct ```ConductorHttpClient``` provides client API calls to the conductor server to start and manage workflows and tasks.

### Example
```go
package main

import (
conductor "github.com/netflix/conductor/client/go"
)

func main() {
conductorClient := conductor.NewConductorHttpClient("http://localhost:8080")

// Example API that will print out workflow definition meta
conductorClient.GetAllWorkflowDefs()
}

```

## Task Worker Execution
Task Worker execution APIs facilitates execution of a task worker using go. The API provides necessary tools to poll for tasks at a specified interval and executing the go worker in a separate goroutine.

### Example
The following go code demonstrates workers for tasks "task_1" and "task_2".

```go
package task

import (
"fmt"
)

// Implementation for "task_1"
func Task_1_Execution_Function(t *task.Task) (taskResult *task.TaskResult, err error) {
log.Println("Executing Task_1_Execution_Function for", t.TaskType)

//Do some logic
taskResult = task.NewTaskResult(t)

output := map[string]interface{}{"task":"task_1", "key2":"value2", "key3":3, "key4":false}
taskResult.OutputData = output
taskResult.Status = "COMPLETED"
err = nil

return taskResult, err
}

// Implementation for "task_2"
func Task_2_Execution_Function(t *task.Task) (taskResult *task.TaskResult, err error) {
log.Println("Executing Task_2_Execution_Function for", t.TaskType)

//Do some logic
taskResult = task.NewTaskResult(t)

output := map[string]interface{}{"task":"task_2", "key2":"value2", "key3":3, "key4":false}
taskResult.OutputData = output
taskResult.Status = "COMPLETED"
err = nil

return taskResult, err
}

```


Then main application to utilize these workers

```go
package main

import (
"github.com/netflix/conductor/client/go"
"github.com/netflix/conductor/client/go/task/sample"
)

func main() {
c := conductor.NewConductorWorker("http://localhost:8080", 1, 10000)

c.Start("task_1", "", sample.Task_1_Execution_Function, false)
c.Start("task_2", "mydomain", sample.Task_2_Execution_Function, true)
}

```

Note: For the example listed above the example task implementations are in conductor/task/sample package. Real task implementations can be placed in conductor/task directory or new subdirectory.

Loading

0 comments on commit 961b97b

Please sign in to comment.