Skip to content

Commit

Permalink
Sync with master
Browse files Browse the repository at this point in the history
  • Loading branch information
bhou committed Oct 20, 2023
2 parents 8b37c2e + c6c81dc commit db4262a
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 44 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/genie-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,6 @@ jobs:
if: |
startsWith(github.ref, 'refs/tags/v') &&
(!contains(github.ref, '-rc.'))
run: --stacktrace -Prelease.useLastTag=true final codeCoverageReport coveralls gitPublishPush dockerPush
run: ./gradlew --stacktrace -Prelease.useLastTag=true final codeCoverageReport coveralls gitPublishPush dockerPush
- name: Codecov
uses: codecov/codecov-action@v1
75 changes: 65 additions & 10 deletions genie-client/src/main/java/com/netflix/genie/client/JobClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import okhttp3.ResponseBody;
import okio.BufferedSink;
import org.apache.commons.lang3.StringUtils;
import retrofit2.Call;
import retrofit2.Retrofit;

import javax.annotation.Nullable;
Expand All @@ -52,9 +53,12 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;

/**
* Client library for the Job Service.
Expand Down Expand Up @@ -119,16 +123,38 @@ public JobClient(
*/
public String submitJob(
final JobRequest jobRequest
) throws IOException, GenieClientException {
) throws IOException {
return submitJob(jobRequest, jobService::submitJob);
}

/**
* Submit a job to genie using the jobRequest and upstream security token information.
*
* @param jobRequest A job request containing all the details for running a job.
* @param upstreamSecurityTokenName the security token name provided by upstream.
* @param upstreamSecurityTokenValue the security token value provided by upstream.
* @return jobId The id of the job submitted.
* @throws GenieClientException If the response recieved is not 2xx.
* @throws IOException For Network and other IO issues.
*/
public String submitJob(
final JobRequest jobRequest,
final String upstreamSecurityTokenName,
final String upstreamSecurityTokenValue
) throws IOException {
final Map<String, String> headers =
Collections.singletonMap(upstreamSecurityTokenName, upstreamSecurityTokenValue);
return submitJob(jobRequest, jr -> jobService.submitJob(headers, jr));
}

private String submitJob(
final JobRequest jobRequest,
final Function<JobRequest, Call<Void>> submitFn) throws IOException {
if (jobRequest == null) {
throw new IllegalArgumentException("Job Request cannot be null.");
}
final String locationHeader = this.jobService
.submitJob(jobRequest)
.execute()
.headers()
.get(GenieClientUtils.LOCATION_HEADER);

final String locationHeader =
submitFn.apply(jobRequest).execute().headers().get(GenieClientUtils.LOCATION_HEADER);
if (StringUtils.isBlank(locationHeader)) {
throw new GenieClientException("No location header. Unable to get ID");
}
Expand All @@ -147,7 +173,37 @@ public String submitJob(
public String submitJobWithAttachments(
final JobRequest jobRequest,
final Map<String, InputStream> attachments
) throws IOException, GenieClientException {
) throws IOException {
return submitJobWithAttachments(jobRequest, attachments, jobService::submitJobWithAttachments);
}

/**
* Submit a job to genie using the jobRequest and attachments provided with the upstream security information.
*
* @param jobRequest A job request containing all the details for running a job.
* @param attachments A map of filenames/input-streams needed to be sent to the server as attachments.
* @param upstreamSecurityTokenName the security token name provided by upstream.
* @param upstreamSecurityTokenValue the security token value provided by upstream.
* @return jobId The id of the job submitted.
* @throws GenieClientException If the response recieved is not 2xx.
* @throws IOException For Network and other IO issues.
*/
public String submitJobWithAttachments(
final JobRequest jobRequest,
final Map<String, InputStream> attachments,
final String upstreamSecurityTokenName,
final String upstreamSecurityTokenValue
) throws IOException {
final Map<String, String> headers =
Collections.singletonMap(upstreamSecurityTokenName, upstreamSecurityTokenValue);
return submitJobWithAttachments(
jobRequest, attachments, (jr, at) -> jobService.submitJobWithAttachments(headers, jr, at));
}

private String submitJobWithAttachments(
final JobRequest jobRequest,
final Map<String, InputStream> attachments,
final BiFunction<JobRequest, List<MultipartBody.Part>, Call<Void>> submitFn) throws IOException {
if (jobRequest == null) {
throw new IllegalArgumentException("Job Request cannot be null.");
}
Expand Down Expand Up @@ -177,8 +233,7 @@ public void writeTo(final BufferedSink sink) throws IOException {

attachmentFiles.add(part);
}
final String locationHeader = this.jobService
.submitJobWithAttachments(jobRequest, attachmentFiles)
final String locationHeader = submitFn.apply(jobRequest, attachmentFiles)
.execute()
.headers()
.get(GenieClientUtils.LOCATION_HEADER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import retrofit2.http.DELETE;
import retrofit2.http.GET;
import retrofit2.http.Header;
import retrofit2.http.HeaderMap;
import retrofit2.http.Multipart;
import retrofit2.http.POST;
import retrofit2.http.Part;
Expand All @@ -40,6 +41,7 @@
import retrofit2.http.Streaming;

import java.util.List;
import java.util.Map;
import java.util.Set;

/**
Expand All @@ -64,6 +66,17 @@ public interface JobService {
@POST(JOBS_URL_SUFFIX)
Call<Void> submitJob(@Body JobRequest request);

/**
* Method to submit a job to Genie with user defined custom headers.
*
* @param customHeaders client defined custom headers.
* @param request The request object containing all the
* @return A callable object.
*/
@POST(JOBS_URL_SUFFIX)
Call<Void> submitJob(@HeaderMap Map<String, String> customHeaders,
@Body JobRequest request);

/**
* Submit a job with attachments.
*
Expand All @@ -78,6 +91,22 @@ Call<Void> submitJobWithAttachments(
@Part List<MultipartBody.Part> attachments
);

/**
* Submit a job with attachments and custom headers.
*
* @param customHeaders client defined custom headers.
* @param request A JobRequest object containing all the details needed to run the job.
* @param attachments A list of all the attachment files to be sent to the server.
* @return A callable object.
*/
@Multipart
@POST(JOBS_URL_SUFFIX)
Call<Void> submitJobWithAttachments(
@HeaderMap Map<String, String> customHeaders,
@Part("request") JobRequest request,
@Part List<MultipartBody.Part> attachments
);

/**
* Method to get all jobs from Genie.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package com.netflix.genie.client

import com.netflix.genie.common.dto.JobRequest
import com.netflix.genie.common.external.util.GenieObjectMapper
import okhttp3.OkHttpClient
import okhttp3.mockwebserver.MockResponse
Expand Down Expand Up @@ -67,4 +68,75 @@ class JobClientSpec extends Specification {
"{\"_embedded\": {\"jobSearchResultList\": {}}}" | _
"{\"_embedded\": {\"jobSearchResultList\": []}}" | _
}

@Unroll
def "submit job with upstream security information formed correct request"() {
setup:
def server = new MockWebServer()
server.enqueue(new MockResponse().setHeader("location", genieId))
server.start()
def url = server.url("")
def okHttpClient = new OkHttpClient.Builder().build()
def retrofit = new Retrofit.Builder()
.baseUrl(url)
.client(okHttpClient)
.addConverterFactory(JacksonConverterFactory.create(GenieObjectMapper.getMapper()))
.build()
def jobClient = new JobClient(retrofit, 5)

when:
def jobId = jobClient.submitJob(Mock(JobRequest), securityName, securityValue)
def request = server.takeRequest()

then: "No exception is thrown and empty list is returned"
noExceptionThrown()

expect:
request.headers.get(securityName) == securityValue
jobId == genieId

cleanup:
server.shutdown()

where:
genieId | securityName | securityValue
"a-genie-id" | "x-forwarded-authorization" | "some-security-value"
"b-genie-id" | "Authorization" | "some-auth-value"
}

@Unroll
def "submit job with attachments and upstream security information formed correct request"() {
setup:
def server = new MockWebServer()
server.enqueue(new MockResponse().setHeader("location", genieId))
server.start()
def url = server.url("")
def okHttpClient = new OkHttpClient.Builder().build()
def retrofit = new Retrofit.Builder()
.baseUrl(url)
.client(okHttpClient)
.addConverterFactory(JacksonConverterFactory.create(GenieObjectMapper.getMapper()))
.build()
def jobClient = new JobClient(retrofit, 5)

when:
def jobId = jobClient.submitJobWithAttachments(
Mock(JobRequest), new HashMap<String, InputStream>(), securityName, securityValue)
def request = server.takeRequest()

then: "No exception is thrown and empty list is returned"
noExceptionThrown()

expect:
request.headers.get(securityName) == securityValue
jobId == genieId

cleanup:
server.shutdown()

where:
genieId | securityName | securityValue
"a-genie-id" | "x-forwarded-authorization" | "some-security-value"
"b-genie-id" | "Authorization" | "some-auth-value"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,13 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.contract.wiremock.restdocs.WireMockSnippet;
import org.springframework.core.env.Environment;
import org.springframework.dao.DataAccessException;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
Expand All @@ -55,12 +58,17 @@
import org.springframework.restdocs.restassured3.RestDocumentationFilter;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;

import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -108,6 +116,8 @@ abstract class RestControllerIntegrationTestBase {
static final String JOBS_LINK_KEY = "jobs";
static final Set<String> CLUSTERS_OPTIONAL_HAL_LINK_PARAMETERS = Sets.newHashSet("status");
static final Set<String> COMMANDS_OPTIONAL_HAL_LINK_PARAMETERS = Sets.newHashSet("status");

private static final Logger LOG = LoggerFactory.getLogger(RestControllerIntegrationTestBase.class);
private static final String URI_HOST = "genie.example.com";
private static final String URI_SCHEME = "https";
private static final String LOCAL_TEST_SERVER_PORT_PROPERTY_NAME = "local.server.port";
Expand Down Expand Up @@ -138,6 +148,10 @@ abstract class RestControllerIntegrationTestBase {

protected int port;

@Autowired
private PlatformTransactionManager transactionManager;
private TransactionTemplate transactionTemplate;

private RequestSpecification requestSpecification;

private static String getLinkedResourceExpectedUri(
Expand All @@ -163,15 +177,57 @@ RequestSpecification getRequestSpecification() {
return this.requestSpecification;
}

private void doInTransactionWithoutResult(final Supplier<Void> supplier) {
int retriesLeft = 3;
while (retriesLeft > 0) {
try {
this.transactionTemplate.execute(new TransactionCallbackWithoutResult() {
protected void doInTransactionWithoutResult(final TransactionStatus status) {
supplier.get();
}
});
return;
} catch (DataAccessException e) {
LOG.warn("Exception caught when running transactions.", e);
retriesLeft--;
if (retriesLeft <= 0) {
throw e;
}
}
}
}

@BeforeEach
void beforeBase(final RestDocumentationContextProvider documentationContextProvider) {
this.jobRepository.deleteAll();
this.clusterRepository.deleteAll();
this.commandRepository.deleteAll();
this.applicationRepository.deleteAll();
this.criterionRepository.deleteAll();
this.fileRepository.deleteAll();
this.tagRepository.deleteAll();
this.transactionTemplate = new TransactionTemplate(transactionManager);
doInTransactionWithoutResult(() -> {
this.jobRepository.deleteAll();
return null;
});
doInTransactionWithoutResult(() -> {
this.clusterRepository.deleteAll();
return null;
});
doInTransactionWithoutResult(() -> {
this.commandRepository.deleteAll();
return null;
});
doInTransactionWithoutResult(() -> {
this.applicationRepository.deleteAll();
return null;
});
doInTransactionWithoutResult(() -> {
this.criterionRepository.deleteAll();
return null;
});
doInTransactionWithoutResult(() -> {
this.fileRepository.deleteAll();
return null;
});
doInTransactionWithoutResult(() -> {
this.tagRepository.deleteAll();
return null;
});

this.requestSpecification = new RequestSpecBuilder()
.addFilter(
Expand Down
Loading

0 comments on commit db4262a

Please sign in to comment.