From eec85435234b80ac59fd927cac092423249c6d02 Mon Sep 17 00:00:00 2001 From: Xiao Chen Date: Tue, 14 Feb 2023 16:37:23 -0800 Subject: [PATCH 1/4] Fix publish release bug in genie-build.yml To fix https://github.com/Netflix/genie/actions/runs/4177182205/jobs/7238063258 --- .github/workflows/genie-build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/genie-build.yml b/.github/workflows/genie-build.yml index 2ae2c345c4..8075bbda64 100644 --- a/.github/workflows/genie-build.yml +++ b/.github/workflows/genie-build.yml @@ -119,6 +119,6 @@ jobs: if: | startsWith(github.ref, 'refs/tags/v') && (!contains(github.ref, '-rc.')) - run: --stacktrace -Prelease.useLastTag=true final codeCoverageReport coveralls gitPublishPush dockerPush + run: ./gradlew --stacktrace -Prelease.useLastTag=true final codeCoverageReport coveralls gitPublishPush dockerPush - name: Codecov uses: codecov/codecov-action@v1 From 4da7a9da0f8262385eac663b9dab0c7f9081cd2a Mon Sep 17 00:00:00 2001 From: Liang Tian Date: Wed, 15 Feb 2023 22:34:13 -0800 Subject: [PATCH 2/4] add network config to titus request (#1185) * add network config to titus request * address comments * remove unnecessary files * address param location * address nit comments --- .../launchers/dtos/TitusBatchJobRequest.java | 11 ++++++ .../impl/TitusAgentLauncherImpl.java | 39 +++++++++++++++++-- .../TitusAgentLauncherProperties.java | 5 +++ .../impl/TitusAgentLauncherImplSpec.groovy | 33 ++++++++++++++++ 4 files changed, 85 insertions(+), 3 deletions(-) diff --git a/genie-web/src/main/java/com/netflix/genie/web/agent/launchers/dtos/TitusBatchJobRequest.java b/genie-web/src/main/java/com/netflix/genie/web/agent/launchers/dtos/TitusBatchJobRequest.java index f5aefb6eb2..6aff57fa49 100644 --- a/genie-web/src/main/java/com/netflix/genie/web/agent/launchers/dtos/TitusBatchJobRequest.java +++ b/genie-web/src/main/java/com/netflix/genie/web/agent/launchers/dtos/TitusBatchJobRequest.java @@ -71,6 +71,8 @@ public class TitusBatchJobRequest { @NotNull @NonNull private JobGroupInfo jobGroupInfo; + @Nullable + private NetworkConfiguration networkConfiguration; /** * Titus job owner POJO. @@ -190,6 +192,15 @@ public static class Batch { private long runtimeLimitSec; } + /** + * Titus job network configuration. + */ + @Data + @Builder + public static class NetworkConfiguration { + private String networkMode; + } + /** * Titus job disruption budget. */ diff --git a/genie-web/src/main/java/com/netflix/genie/web/agent/launchers/impl/TitusAgentLauncherImpl.java b/genie-web/src/main/java/com/netflix/genie/web/agent/launchers/impl/TitusAgentLauncherImpl.java index 697f1a6eee..7784e78dbd 100644 --- a/genie-web/src/main/java/com/netflix/genie/web/agent/launchers/impl/TitusAgentLauncherImpl.java +++ b/genie-web/src/main/java/com/netflix/genie/web/agent/launchers/impl/TitusAgentLauncherImpl.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.github.benmanes.caffeine.cache.Cache; +import com.google.common.base.Strings; import com.netflix.genie.common.internal.dtos.ComputeResources; import com.netflix.genie.common.internal.dtos.Image; import com.netflix.genie.common.internal.tracing.brave.BraveTracePropagator; @@ -102,6 +103,13 @@ public class TitusAgentLauncherImpl implements AgentLauncher { .map(s -> placeholders.getOrDefault(s, s)) .collect(Collectors.toList()); private static final Logger LOG = LoggerFactory.getLogger(TitusAgentLauncherImpl.class); + // For Titus network mode details, see + // https://github.com/Netflix/titus-api-definitions/blob/master/doc/titus-v3-spec.md#networkconfigurationnetworkmode + private static final String TITUS_NETWORK_MODE_IPV4 = "Ipv4Only"; + private static final String TITUS_NETWORK_MODE_DUAL_STACK = "Ipv6AndIpv4"; + private static final String TITUS_NETWORK_MODE_DUAL_STACK_FALLBACK = "Ipv6AndIpv4Fallback"; + private static final String TITUS_NETWORK_MODE_IPV6 = "Ipv6Only"; + private static final String TITUS_NETWORK_MODE_HIGH_SCALE = "HighScale"; private final RestTemplate restTemplate; private final RetryTemplate retryTemplate; @@ -261,7 +269,7 @@ private TitusBatchJobRequest createJobRequest(final ResolvedJob resolvedJob) thr final Map jobAttributes = this.createJobAttributes(jobId, resolvedJob); - final TitusBatchJobRequest request = TitusBatchJobRequest.builder() + final TitusBatchJobRequest.TitusBatchJobRequestBuilder requestBuilder = TitusBatchJobRequest.builder() .owner( TitusBatchJobRequest.Owner .builder() @@ -339,14 +347,39 @@ private TitusBatchJobRequest createJobRequest(final ResolvedJob resolvedJob) thr .detail(this.titusAgentLauncherProperties.getDetail()) .sequence(this.titusAgentLauncherProperties.getSequence()) .build() - ) - .build(); + ); + + final Optional networkConfiguration = validateNetworkConfiguration(this.environment.getProperty( + TitusAgentLauncherProperties.CONTAINER_NETWORK_MODE, + String.class)); + networkConfiguration.ifPresent(config -> requestBuilder.networkConfiguration( + TitusBatchJobRequest.NetworkConfiguration.builder().networkMode(config).build())); + + final TitusBatchJobRequest request = requestBuilder.build(); // Run the request through the security adapter to add any necessary context this.jobRequestAdapter.modifyJobRequest(request, resolvedJob); return request; } + private Optional validateNetworkConfiguration(@Nullable final String networkConfig) { + if (Strings.isNullOrEmpty(networkConfig)) { + return Optional.empty(); + } + + switch (networkConfig) { + case TITUS_NETWORK_MODE_IPV4: + case TITUS_NETWORK_MODE_DUAL_STACK: + case TITUS_NETWORK_MODE_DUAL_STACK_FALLBACK: + case TITUS_NETWORK_MODE_IPV6: + case TITUS_NETWORK_MODE_HIGH_SCALE: + return Optional.of(networkConfig); + default: + return Optional.empty(); + } + + } + /** * Helper method to avoid runtime errors if for some reason the DataSize converters aren't loaded. * diff --git a/genie-web/src/main/java/com/netflix/genie/web/properties/TitusAgentLauncherProperties.java b/genie-web/src/main/java/com/netflix/genie/web/properties/TitusAgentLauncherProperties.java index a39e485e9e..7a7974d695 100644 --- a/genie-web/src/main/java/com/netflix/genie/web/properties/TitusAgentLauncherProperties.java +++ b/genie-web/src/main/java/com/netflix/genie/web/properties/TitusAgentLauncherProperties.java @@ -169,6 +169,11 @@ public class TitusAgentLauncherProperties { */ public static final String AGENT_IMAGE_KEY_PROPERTY = PREFIX + ".agentImageKey"; + /** + * The property for titus container network mode. + */ + public static final String CONTAINER_NETWORK_MODE = PREFIX + ".networkMode"; + /** * Whether the Titus Agent Launcher is enabled. */ diff --git a/genie-web/src/test/groovy/com/netflix/genie/web/agent/launchers/impl/TitusAgentLauncherImplSpec.groovy b/genie-web/src/test/groovy/com/netflix/genie/web/agent/launchers/impl/TitusAgentLauncherImplSpec.groovy index 5041d33733..9edf89d5fc 100644 --- a/genie-web/src/test/groovy/com/netflix/genie/web/agent/launchers/impl/TitusAgentLauncherImplSpec.groovy +++ b/genie-web/src/test/groovy/com/netflix/genie/web/agent/launchers/impl/TitusAgentLauncherImplSpec.groovy @@ -605,6 +605,39 @@ class TitusAgentLauncherImplSpec extends Specification { requestCapture.getContainer().getAttributes().get(prop2Key) == prop2Value } + def "Check titus container network mode"() { + TitusBatchJobResponse response = toTitusResponse("{ \"id\" : \"" + TITUS_JOB_ID + "\" }") + TitusBatchJobRequest requestCapture + + when: + this.environment.withProperty(TitusAgentLauncherProperties.CONTAINER_NETWORK_MODE, inputNetworkMode) + this.launcher.launchAgent(this.resolvedJob, null) + + then: + 1 * this.restTemplate.postForObject(TITUS_ENDPOINT, _ as TitusBatchJobRequest, TitusBatchJobResponse.class) >> { + args -> + requestCapture = args[1] as TitusBatchJobRequest + return response + } + 1 * this.adapter.modifyJobRequest(_ as TitusBatchJobRequest, this.resolvedJob) + requestCapture != null + if (requestNetworkMode != null) { + requestCapture.getNetworkConfiguration() != null + requestCapture.getNetworkConfiguration().getNetworkMode() == requestNetworkMode + } else { + requestCapture.getNetworkConfiguration() == null + } + + where: + inputNetworkMode | requestNetworkMode + "Ipv4Only" | "Ipv4Only" + "Ipv6AndIpv4" | "Ipv6AndIpv4" + "Ipv6AndIpv4Fallback" | "Ipv6AndIpv4Fallback" + "Ipv6Only" | "Ipv6Only" + "HighScale" | "HighScale" + "NonSense" | null + } + def "Retry policy works as expected"() { def retryCodes = EnumSet.of(HttpStatus.REQUEST_TIMEOUT, HttpStatus.SERVICE_UNAVAILABLE) def maxRetries = 2 From fc1b55458655634a7fe98473db060b87adca6ce5 Mon Sep 17 00:00:00 2001 From: Xiao Chen Date: Wed, 15 Feb 2023 11:04:36 -0800 Subject: [PATCH 3/4] Fix flaky integration tests when running with non-h2 databases --- .../RestControllerIntegrationTestBase.java | 70 +++++++++++++++++-- 1 file changed, 63 insertions(+), 7 deletions(-) diff --git a/genie-web/src/integTest/java/com/netflix/genie/web/apis/rest/v3/controllers/RestControllerIntegrationTestBase.java b/genie-web/src/integTest/java/com/netflix/genie/web/apis/rest/v3/controllers/RestControllerIntegrationTestBase.java index 7415562256..9ac898a907 100644 --- a/genie-web/src/integTest/java/com/netflix/genie/web/apis/rest/v3/controllers/RestControllerIntegrationTestBase.java +++ b/genie-web/src/integTest/java/com/netflix/genie/web/apis/rest/v3/controllers/RestControllerIntegrationTestBase.java @@ -41,10 +41,13 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.ExtendWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.cloud.contract.wiremock.restdocs.WireMockSnippet; import org.springframework.core.env.Environment; +import org.springframework.dao.DataAccessException; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; @@ -55,12 +58,17 @@ import org.springframework.restdocs.restassured3.RestDocumentationFilter; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.junit.jupiter.SpringExtension; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallbackWithoutResult; +import org.springframework.transaction.support.TransactionTemplate; import javax.annotation.Nullable; import javax.validation.constraints.NotNull; import java.util.List; import java.util.Set; import java.util.UUID; +import java.util.function.Supplier; import java.util.stream.Collectors; /** @@ -108,6 +116,8 @@ abstract class RestControllerIntegrationTestBase { static final String JOBS_LINK_KEY = "jobs"; static final Set CLUSTERS_OPTIONAL_HAL_LINK_PARAMETERS = Sets.newHashSet("status"); static final Set COMMANDS_OPTIONAL_HAL_LINK_PARAMETERS = Sets.newHashSet("status"); + + private static final Logger LOG = LoggerFactory.getLogger(RestControllerIntegrationTestBase.class); private static final String URI_HOST = "genie.example.com"; private static final String URI_SCHEME = "https"; private static final String LOCAL_TEST_SERVER_PORT_PROPERTY_NAME = "local.server.port"; @@ -138,6 +148,10 @@ abstract class RestControllerIntegrationTestBase { protected int port; + @Autowired + private PlatformTransactionManager transactionManager; + private TransactionTemplate transactionTemplate; + private RequestSpecification requestSpecification; private static String getLinkedResourceExpectedUri( @@ -163,15 +177,57 @@ RequestSpecification getRequestSpecification() { return this.requestSpecification; } + private void doInTransactionWithoutResult(final Supplier supplier) { + int retriesLeft = 3; + while (retriesLeft > 0) { + try { + this.transactionTemplate.execute(new TransactionCallbackWithoutResult() { + protected void doInTransactionWithoutResult(final TransactionStatus status) { + supplier.get(); + } + }); + return; + } catch (DataAccessException e) { + LOG.warn("Exception caught when running transactions.", e); + retriesLeft--; + if (retriesLeft <= 0) { + throw e; + } + } + } + } + @BeforeEach void beforeBase(final RestDocumentationContextProvider documentationContextProvider) { - this.jobRepository.deleteAll(); - this.clusterRepository.deleteAll(); - this.commandRepository.deleteAll(); - this.applicationRepository.deleteAll(); - this.criterionRepository.deleteAll(); - this.fileRepository.deleteAll(); - this.tagRepository.deleteAll(); + this.transactionTemplate = new TransactionTemplate(transactionManager); + doInTransactionWithoutResult(() -> { + this.jobRepository.deleteAll(); + return null; + }); + doInTransactionWithoutResult(() -> { + this.clusterRepository.deleteAll(); + return null; + }); + doInTransactionWithoutResult(() -> { + this.commandRepository.deleteAll(); + return null; + }); + doInTransactionWithoutResult(() -> { + this.applicationRepository.deleteAll(); + return null; + }); + doInTransactionWithoutResult(() -> { + this.criterionRepository.deleteAll(); + return null; + }); + doInTransactionWithoutResult(() -> { + this.fileRepository.deleteAll(); + return null; + }); + doInTransactionWithoutResult(() -> { + this.tagRepository.deleteAll(); + return null; + }); this.requestSpecification = new RequestSpecBuilder() .addFilter( From c6c81dc16911c58473f837ef51fed2dcffd873c6 Mon Sep 17 00:00:00 2001 From: Liang Tian Date: Fri, 22 Sep 2023 14:51:55 -0700 Subject: [PATCH 4/4] add upstream security information pass thru mechanism (#1191) --- .../com/netflix/genie/client/JobClient.java | 75 ++++++++++++++++--- .../netflix/genie/client/apis/JobService.java | 29 +++++++ .../netflix/genie/client/JobClientSpec.groovy | 72 ++++++++++++++++++ 3 files changed, 166 insertions(+), 10 deletions(-) diff --git a/genie-client/src/main/java/com/netflix/genie/client/JobClient.java b/genie-client/src/main/java/com/netflix/genie/client/JobClient.java index 64a0c6ecc6..11ee40de26 100644 --- a/genie-client/src/main/java/com/netflix/genie/client/JobClient.java +++ b/genie-client/src/main/java/com/netflix/genie/client/JobClient.java @@ -43,6 +43,7 @@ import okhttp3.ResponseBody; import okio.BufferedSink; import org.apache.commons.lang3.StringUtils; +import retrofit2.Call; import retrofit2.Retrofit; import javax.annotation.Nullable; @@ -52,9 +53,12 @@ import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.Function; /** * Client library for the Job Service. @@ -119,16 +123,38 @@ public JobClient( */ public String submitJob( final JobRequest jobRequest - ) throws IOException, GenieClientException { + ) throws IOException { + return submitJob(jobRequest, jobService::submitJob); + } + + /** + * Submit a job to genie using the jobRequest and upstream security token information. + * + * @param jobRequest A job request containing all the details for running a job. + * @param upstreamSecurityTokenName the security token name provided by upstream. + * @param upstreamSecurityTokenValue the security token value provided by upstream. + * @return jobId The id of the job submitted. + * @throws GenieClientException If the response recieved is not 2xx. + * @throws IOException For Network and other IO issues. + */ + public String submitJob( + final JobRequest jobRequest, + final String upstreamSecurityTokenName, + final String upstreamSecurityTokenValue + ) throws IOException { + final Map headers = + Collections.singletonMap(upstreamSecurityTokenName, upstreamSecurityTokenValue); + return submitJob(jobRequest, jr -> jobService.submitJob(headers, jr)); + } + + private String submitJob( + final JobRequest jobRequest, + final Function> submitFn) throws IOException { if (jobRequest == null) { throw new IllegalArgumentException("Job Request cannot be null."); } - final String locationHeader = this.jobService - .submitJob(jobRequest) - .execute() - .headers() - .get(GenieClientUtils.LOCATION_HEADER); - + final String locationHeader = + submitFn.apply(jobRequest).execute().headers().get(GenieClientUtils.LOCATION_HEADER); if (StringUtils.isBlank(locationHeader)) { throw new GenieClientException("No location header. Unable to get ID"); } @@ -147,7 +173,37 @@ public String submitJob( public String submitJobWithAttachments( final JobRequest jobRequest, final Map attachments - ) throws IOException, GenieClientException { + ) throws IOException { + return submitJobWithAttachments(jobRequest, attachments, jobService::submitJobWithAttachments); + } + + /** + * Submit a job to genie using the jobRequest and attachments provided with the upstream security information. + * + * @param jobRequest A job request containing all the details for running a job. + * @param attachments A map of filenames/input-streams needed to be sent to the server as attachments. + * @param upstreamSecurityTokenName the security token name provided by upstream. + * @param upstreamSecurityTokenValue the security token value provided by upstream. + * @return jobId The id of the job submitted. + * @throws GenieClientException If the response recieved is not 2xx. + * @throws IOException For Network and other IO issues. + */ + public String submitJobWithAttachments( + final JobRequest jobRequest, + final Map attachments, + final String upstreamSecurityTokenName, + final String upstreamSecurityTokenValue + ) throws IOException { + final Map headers = + Collections.singletonMap(upstreamSecurityTokenName, upstreamSecurityTokenValue); + return submitJobWithAttachments( + jobRequest, attachments, (jr, at) -> jobService.submitJobWithAttachments(headers, jr, at)); + } + + private String submitJobWithAttachments( + final JobRequest jobRequest, + final Map attachments, + final BiFunction, Call> submitFn) throws IOException { if (jobRequest == null) { throw new IllegalArgumentException("Job Request cannot be null."); } @@ -177,8 +233,7 @@ public void writeTo(final BufferedSink sink) throws IOException { attachmentFiles.add(part); } - final String locationHeader = this.jobService - .submitJobWithAttachments(jobRequest, attachmentFiles) + final String locationHeader = submitFn.apply(jobRequest, attachmentFiles) .execute() .headers() .get(GenieClientUtils.LOCATION_HEADER); diff --git a/genie-client/src/main/java/com/netflix/genie/client/apis/JobService.java b/genie-client/src/main/java/com/netflix/genie/client/apis/JobService.java index 88fead00f0..88c5fe784b 100644 --- a/genie-client/src/main/java/com/netflix/genie/client/apis/JobService.java +++ b/genie-client/src/main/java/com/netflix/genie/client/apis/JobService.java @@ -32,6 +32,7 @@ import retrofit2.http.DELETE; import retrofit2.http.GET; import retrofit2.http.Header; +import retrofit2.http.HeaderMap; import retrofit2.http.Multipart; import retrofit2.http.POST; import retrofit2.http.Part; @@ -40,6 +41,7 @@ import retrofit2.http.Streaming; import java.util.List; +import java.util.Map; import java.util.Set; /** @@ -64,6 +66,17 @@ public interface JobService { @POST(JOBS_URL_SUFFIX) Call submitJob(@Body JobRequest request); + /** + * Method to submit a job to Genie with user defined custom headers. + * + * @param customHeaders client defined custom headers. + * @param request The request object containing all the + * @return A callable object. + */ + @POST(JOBS_URL_SUFFIX) + Call submitJob(@HeaderMap Map customHeaders, + @Body JobRequest request); + /** * Submit a job with attachments. * @@ -78,6 +91,22 @@ Call submitJobWithAttachments( @Part List attachments ); + /** + * Submit a job with attachments and custom headers. + * + * @param customHeaders client defined custom headers. + * @param request A JobRequest object containing all the details needed to run the job. + * @param attachments A list of all the attachment files to be sent to the server. + * @return A callable object. + */ + @Multipart + @POST(JOBS_URL_SUFFIX) + Call submitJobWithAttachments( + @HeaderMap Map customHeaders, + @Part("request") JobRequest request, + @Part List attachments + ); + /** * Method to get all jobs from Genie. * diff --git a/genie-client/src/test/groovy/com/netflix/genie/client/JobClientSpec.groovy b/genie-client/src/test/groovy/com/netflix/genie/client/JobClientSpec.groovy index 15d61f3fe1..fffc58e1b7 100644 --- a/genie-client/src/test/groovy/com/netflix/genie/client/JobClientSpec.groovy +++ b/genie-client/src/test/groovy/com/netflix/genie/client/JobClientSpec.groovy @@ -17,6 +17,7 @@ */ package com.netflix.genie.client +import com.netflix.genie.common.dto.JobRequest import com.netflix.genie.common.external.util.GenieObjectMapper import okhttp3.OkHttpClient import okhttp3.mockwebserver.MockResponse @@ -67,4 +68,75 @@ class JobClientSpec extends Specification { "{\"_embedded\": {\"jobSearchResultList\": {}}}" | _ "{\"_embedded\": {\"jobSearchResultList\": []}}" | _ } + + @Unroll + def "submit job with upstream security information formed correct request"() { + setup: + def server = new MockWebServer() + server.enqueue(new MockResponse().setHeader("location", genieId)) + server.start() + def url = server.url("") + def okHttpClient = new OkHttpClient.Builder().build() + def retrofit = new Retrofit.Builder() + .baseUrl(url) + .client(okHttpClient) + .addConverterFactory(JacksonConverterFactory.create(GenieObjectMapper.getMapper())) + .build() + def jobClient = new JobClient(retrofit, 5) + + when: + def jobId = jobClient.submitJob(Mock(JobRequest), securityName, securityValue) + def request = server.takeRequest() + + then: "No exception is thrown and empty list is returned" + noExceptionThrown() + + expect: + request.headers.get(securityName) == securityValue + jobId == genieId + + cleanup: + server.shutdown() + + where: + genieId | securityName | securityValue + "a-genie-id" | "x-forwarded-authorization" | "some-security-value" + "b-genie-id" | "Authorization" | "some-auth-value" + } + + @Unroll + def "submit job with attachments and upstream security information formed correct request"() { + setup: + def server = new MockWebServer() + server.enqueue(new MockResponse().setHeader("location", genieId)) + server.start() + def url = server.url("") + def okHttpClient = new OkHttpClient.Builder().build() + def retrofit = new Retrofit.Builder() + .baseUrl(url) + .client(okHttpClient) + .addConverterFactory(JacksonConverterFactory.create(GenieObjectMapper.getMapper())) + .build() + def jobClient = new JobClient(retrofit, 5) + + when: + def jobId = jobClient.submitJobWithAttachments( + Mock(JobRequest), new HashMap(), securityName, securityValue) + def request = server.takeRequest() + + then: "No exception is thrown and empty list is returned" + noExceptionThrown() + + expect: + request.headers.get(securityName) == securityValue + jobId == genieId + + cleanup: + server.shutdown() + + where: + genieId | securityName | securityValue + "a-genie-id" | "x-forwarded-authorization" | "some-security-value" + "b-genie-id" | "Authorization" | "some-auth-value" + } }