Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix compile warnings in mantis server worker #423

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,17 @@ protected void setPayload(final long intervalSecs) {
if (dropped != null)
totalDropped += dropped.value();
else
logger.warn("Unexpected to get null dropped counter for metric " + m.getMetricGroupId().id());
logger.warn("Unexpected to get null dropped counter for metric {}", m.getMetricGroupId().id());
if (onNext != null)
totalOnNext += onNext.value();
else
logger.warn("Unexpected to get null onNext counter for metric " + m.getMetricGroupId().id());
logger.warn("Unexpected to get null onNext counter for metric {}", m.getMetricGroupId().id());
}
final StatusPayloads.DataDropCounts dataDrop = new StatusPayloads.DataDropCounts(totalOnNext, totalDropped);
try {
heartbeat.addSingleUsePayload("" + StatusPayloads.Type.IncomingDataDrop, objectMapper.writeValueAsString(dataDrop));
} catch (JsonProcessingException e) {
logger.warn("Error writing json for dataDrop payload: " + e.getMessage());
logger.warn("Error writing json for dataDrop payload: {}", e.getMessage());
}
dropCountGauge.set(dataDrop.getDroppedCount());
onNextCountGauge.set(dataDrop.getOnNextCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
public class DownloadJob {

private static final Logger logger = LoggerFactory.getLogger(DownloadJob.class);
private URL jobArtifactUrl;
private String jobName;
private String locationToStore;
private final URL jobArtifactUrl;
private final String jobName;
private final String locationToStore;

public DownloadJob(
URL jobArtifactUrl, String jobName,
Expand All @@ -49,9 +49,9 @@ public static void main(String[] args) throws MalformedURLException {
System.exit(1);
}

logger.info("parameters, jobArtifactUrl: " + args[0]);
logger.info("parameters, jobName: " + args[1]);
logger.info("parameters, locationToStore: " + args[2]);
logger.info("parameters, jobArtifactUrl: {}", args[0]);
logger.info("parameters, jobName: {}", args[1]);
logger.info("parameters, locationToStore: {}", args[2]);

new DownloadJob(new URL(args[0]), args[1], args[2]).execute();
}
Expand All @@ -63,21 +63,21 @@ public void execute() {
Path path = Paths.get(locationToStore, jobName,
"lib");

logger.info("Started writing job to tmp directory: " + path);
logger.info("Started writing job to tmp directory: {}", path);
// download file to /tmp, then add file location
try (InputStream is = jobArtifactUrl.openStream()) {
Files.createDirectories(path);
try (OutputStream os = Files.newOutputStream(Paths.get(path.toString(), jarName))) {
byte[] bytes = new byte[2048];
int read = 0;
int read;
while ((read = is.read(bytes)) >= 0) {
os.write(bytes, 0, read);
}
}
} catch (IOException e1) {
logger.error("Failed to write job to local store at path: " + path, e1);
logger.error("Failed to write job to local store at path: {}", path, e1);
throw new RuntimeException(e1);
}
logger.info("Finished writing job to tmp directory: " + path);
logger.info("Finished writing job to tmp directory: {}", path);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,10 @@ public ExecuteStageRequestService(
public void start() {
subscription = executeStageRequestObservable
// map to request with status observer
.map(new Func1<WrappedExecuteStageRequest, TrackedExecuteStageRequest>() {
@Override
public TrackedExecuteStageRequest call(
WrappedExecuteStageRequest executeRequest) {
PublishSubject<Status> statusSubject = PublishSubject.create();
tasksStatusObserver.onNext(statusSubject);
return new TrackedExecuteStageRequest(executeRequest, statusSubject);
}
.map(executeRequest -> {
PublishSubject<Status> statusSubject = PublishSubject.create();
tasksStatusObserver.onNext(statusSubject);
return new TrackedExecuteStageRequest(executeRequest, statusSubject);
})
// get provider from jar, return tracked MantisJob
.flatMap(new Func1<TrackedExecuteStageRequest, Observable<ExecutionDetails>>() {
Expand All @@ -98,7 +94,7 @@ public Observable<ExecutionDetails> call(TrackedExecuteStageRequest executeReque

cl = userCodeClassLoader.asClassLoader();
if (jobProviderClass.isPresent()) {
logger.info("loading job main class " + jobProviderClass.get());
logger.info("loading job main class {}", jobProviderClass.get());
final MantisJobProvider jobProvider = InstantiationUtil.instantiate(
jobProviderClass.get(), MantisJobProvider.class, cl);
mantisJob = jobProvider.getJobInstance();
Expand Down Expand Up @@ -143,7 +139,7 @@ public void onError(Throwable e) {

@Override
public void onNext(final ExecutionDetails executionDetails) {
logger.info("Executing stage for job ID: " + executionDetails.getExecuteStageRequest().getRequest().getJobId());
logger.info("Executing stage for job ID: {}", executionDetails.getExecuteStageRequest().getRequest().getJobId());
Thread t = new Thread("mantis-worker-thread-" + executionDetails.getExecuteStageRequest().getRequest().getJobId()) {
@Override
public void run() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
@SuppressWarnings("rawtypes") // suppressed due to unknown mantis job typ
public class ExecutionDetails {

private ClassLoader classLoader;
private WrappedExecuteStageRequest executeStageRequest;
private Observer<Status> status;
private Job mantisJob;
private List<Parameter> parameters;
private final ClassLoader classLoader;
private final WrappedExecuteStageRequest executeStageRequest;
private final Observer<Status> status;
private final Job mantisJob;
private final List<Parameter> parameters;

public ExecutionDetails(WrappedExecuteStageRequest executeStageRequest, Observer<Status> status,
Job mantisJob, ClassLoader classLoader, List<Parameter> parameters) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class Heartbeat {
}

void setPayload(String name, String value) {
logger.info("Setting payload " + name);
logger.info("Setting payload {}", name);
if (name != null && !name.isEmpty() && value != null)
payloads.put(name, value);
}
Expand All @@ -73,9 +73,9 @@ void addSingleUsePayload(String name, String value) {

Status getCurrentHeartbeatStatus() {
List<Status.Payload> payloadList = new ArrayList<>();
logger.debug("#Payloads = " + payloads.size());
logger.debug("#Payloads = {}", payloads.size());
for (Map.Entry<String, String> entry : payloads.entrySet()) {
logger.debug("Adding payload " + entry.getKey() + " with value " + entry.getValue());
logger.debug("Adding payload {} with value {}", entry.getKey(), entry.getValue());
payloadList.add(new Status.Payload(entry.getKey(), entry.getValue()));
}
List<PayloadPair> singleUsePlds = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ public class MantisWorker extends BaseService {
private static final Logger logger = LoggerFactory.getLogger(MantisWorker.class);
@Argument(alias = "p", description = "Specify a configuration file", required = false)
private static String propFile = "worker.properties";
private CountDownLatch blockUntilShutdown = new CountDownLatch(1);
private final CountDownLatch blockUntilShutdown = new CountDownLatch(1);
private final List<Service> mantisServices = new LinkedList<>();

// static {
// RxNetty.useNativeTransportIfApplicable();
// }
private List<Service> mantisServices = new LinkedList<Service>();

public MantisWorker(ConfigurationFactory configFactory, io.mantisrx.server.master.client.config.ConfigurationFactory coreConfigFactory) {
this(configFactory, Optional.empty());
Expand Down Expand Up @@ -102,12 +102,7 @@ public String toString() {
final MantisMasterGateway gateway =
highAvailabilityServices.getMasterClientApi();
// shutdown hook
Thread t = new Thread() {
@Override
public void run() {
shutdown();
}
};
Thread t = new Thread(this::shutdown);
t.setDaemon(true);
Runtime.getRuntime().addShutdownHook(t);

Expand Down Expand Up @@ -209,7 +204,7 @@ private static Properties loadProperties(String propFile) {
*
* @param resourceName the name of the resource. It can either be a file name, or a path.
* @return An {@link java.io.InputStream} instance that represents the found resource. Null otherwise.
* @throws java.io.FileNotFoundException
* @throws java.io.FileNotFoundException if the resource is not found.
*/
private static InputStream findResourceAsStream(String resourceName) throws FileNotFoundException {
File resource = new File(resourceName);
Expand Down Expand Up @@ -242,7 +237,7 @@ public static void main(String[] args) {
worker.start();
} catch (Exception e) {
// unexpected to get runtime exception, will exit
logger.error("Unexpected error: " + e.getMessage(), e);
logger.error("Unexpected error: {}", e.getMessage(), e);
System.exit(2);
}
}
Expand All @@ -264,14 +259,14 @@ public void startUp() {
logger.info("Starting Mantis Worker");
RxNetty.useMetricListenersFactory(new MantisNettyEventsListenerFactory());
for (Service service : mantisServices) {
logger.info("Starting service: " + service);
logger.info("Starting service: {}", service);
try {
service.start();
} catch (Throwable e) {
logger.error(String.format("Failed to start service %s: %s", service, e.getMessage()), e);
logger.error("Failed to start service {}: {}", service, e.getMessage(), e);
throw e;
}
logger.info("Started service: " + service);
logger.info("Started service: {}", service);
}

logger.info("Started Mantis Worker successfully");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private void setPayloadAndMetrics() {
try {
heartbeat.addSingleUsePayload("" + StatusPayloads.Type.ResourceUsage, objectMapper.writeValueAsString(usage));
} catch (JsonProcessingException e) {
logger.warn("Error writing json for resourceUsage payload: " + e.getMessage());
logger.warn("Error writing json for resourceUsage payload: {}", e.getMessage());
}
cpuLimitGauge.set(Math.round(usage.getCpuLimit() * 100.0));
cpuUsageCurrGauge.set(Math.round(usage.getCpuUsageCurrent() * 100.0));
Expand All @@ -168,7 +168,7 @@ private void setPayloadAndMetrics() {
} catch (Exception e) {
logger.error("Failed to compute resource usage", e);
} finally {
logger.debug("scheduling next metrics report with delay=" + delay);
logger.debug("scheduling next metrics report with delay = {}", delay);
executor.schedule(this::setPayloadAndMetrics, delay, TimeUnit.SECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ public class RunningWorker {

private static final Logger logger = LoggerFactory.getLogger(RunningWorker.class);
private final int totalStagesNet;
private Action0 onTerminateCallback;
private Action0 onCompleteCallback;
private Action1<Throwable> onErrorCallback;
private CountDownLatch blockUntilTerminate = new CountDownLatch(1);
private Job job;
private SchedulingInfo schedulingInfo;
private StageConfig stage;
private Observer<Status> jobStatus;
private String jobId;
private final Action0 onTerminateCallback;
private final Action0 onCompleteCallback;
private final Action1<Throwable> onErrorCallback;
private final CountDownLatch blockUntilTerminate = new CountDownLatch(1);
private final Job job;
private final SchedulingInfo schedulingInfo;
private final StageConfig stage;
private final Observer<Status> jobStatus;
private final String jobId;
private final int stageNum;
private final int workerNum;
private final int workerIndex;
Expand Down Expand Up @@ -88,46 +88,29 @@ public RunningWorker(Builder builder) {
this.stageTotalWorkersObservable = builder.stageTotalWorkersObservable;
this.jobSchedulingInfoObservable = builder.jobSchedulingInfoObservable;

this.onTerminateCallback = new Action0() {
@Override
public void call() {
blockUntilTerminate.countDown();
}
};
this.onCompleteCallback = new Action0() {
@Override
public void call() {
logger.info("JobId: " + jobId + " stage: " + stageNum + ", completed");
// setup a timeout to call forced exit as sure way to exit
new Thread() {
@Override
public void run() {
try {
sleep(3000);
System.exit(1);
} catch (Exception e) {
logger.error("Ignoring exception during exit: " + e.getMessage(), e);
}
}
}.start();
signalCompleted();
}
};
this.onErrorCallback = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
signalFailed(t);
}
this.onTerminateCallback = blockUntilTerminate::countDown;
this.onCompleteCallback = () -> {
logger.info("JobId: {} stage: {}, completed", jobId, stageNum);
// setup a timeout to call forced exit as sure way to exit
new Thread(() -> {
try {
Thread.sleep(3000);
System.exit(1);
} catch (Exception e) {
logger.error("Ignoring exception during exit: {}", e.getMessage(), e);
}
}).start();
signalCompleted();
};
this.onErrorCallback = this::signalFailed;
}

private String getWorkerStringPrefix(int stageNum, int index, int number) {
return "stage " + stageNum + " worker index=" + index + " number=" + number;
}

public void signalStartedInitiated() {
logger.info("JobId: " + jobId + ", stage: " + stageNum + " workerIndex: " + workerIndex + " workerNumber: " + workerNum + ","
+ " signaling started initiated");
logger.info("JobId: {}, stage: {} workerIndex: {} workerNumber: {}, signaling started initiated", jobId, stageNum, workerIndex, workerNum);
vmTaskStatusObserver.onNext(new VirtualMachineTaskStatus(
new WorkerId(jobId, workerIndex, workerNum).getId(),
VirtualMachineTaskStatus.TYPE.STARTED, jobName + ", " +
Expand All @@ -141,16 +124,14 @@ public void signalStartedInitiated() {
}

public void signalStarted() {
logger.info("JobId: " + jobId + ", " + getWorkerStringPrefix(stageNum, workerIndex, workerNum)
+ " signaling started");
logger.info("JobId: {}, {} signaling started", jobId, getWorkerStringPrefix(stageNum, workerIndex, workerNum));
jobStatus.onNext(new Status(jobId, stageNum, workerIndex, workerNum,
TYPE.INFO, getWorkerStringPrefix(stageNum, workerIndex, workerNum) + " running",
MantisJobState.Started));
}

public void signalCompleted() {
logger.info("JobId: " + jobId + ", stage: " + stageNum + " workerIndex: " + workerIndex + " workerNumber: " + workerNum + ","
+ " signaling completed");
logger.info("JobId: {}, stage: {} workerIndex: {} workerNumber: {}, signaling completed", jobId, stageNum, workerIndex, workerNum);
jobStatus.onNext(new Status(jobId, stageNum, workerIndex, workerNum,
TYPE.INFO, getWorkerStringPrefix(stageNum, workerIndex, workerNum) + " completed",
MantisJobState.Completed));
Expand All @@ -164,8 +145,7 @@ TYPE.INFO, getWorkerStringPrefix(stageNum, workerIndex, workerNum) + " completed
}

public void signalFailed(Throwable t) {
logger.info("JobId: " + jobId + ", stage: " + stageNum + " workerIndex: " + workerIndex + " workerNumber: " + workerNum + ","
+ " signaling failed");
logger.info("JobId: {}, stage: {} workerIndex: {} workerNumber: {}, signaling failed", jobId, stageNum, workerIndex, workerNum);
logger.error("Worker failure detected, shutting down job", t);
jobStatus.onNext(new Status(jobId, stageNum, workerIndex, workerNum,
TYPE.INFO, getWorkerStringPrefix(stageNum, workerIndex, workerNum) + " failed. error: " + t.getMessage(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@

public class TrackedExecuteStageRequest {

private WrappedExecuteStageRequest executeRequest;
private Observer<Status> status;
private final WrappedExecuteStageRequest executeRequest;
private final Observer<Status> status;

public TrackedExecuteStageRequest(WrappedExecuteStageRequest executeRequest,
Observer<Status> status) {
Expand Down
Loading