From 768505166280293fe1abea64a4ca4241b00d27ad Mon Sep 17 00:00:00 2001 From: Luis Uguina Date: Thu, 18 Jun 2020 21:57:52 +1000 Subject: [PATCH] Fix: Send reset signal to server on job upload error (#260) * Fix: Send reset signal to server on job upload error --- src/com/sheepit/client/Client.java | 65 +++++++++++++++++++++--------- src/com/sheepit/client/Log.java | 26 +++++++++--- src/com/sheepit/client/Server.java | 12 +++--- 3 files changed, 71 insertions(+), 32 deletions(-) diff --git a/src/com/sheepit/client/Client.java b/src/com/sheepit/client/Client.java index 74b29b4..00b18be 100644 --- a/src/com/sheepit/client/Client.java +++ b/src/com/sheepit/client/Client.java @@ -48,6 +48,7 @@ import com.sheepit.client.exception.FermeServerDown; import com.sheepit.client.hardware.cpu.CPU; import com.sheepit.client.os.OS; +import lombok.AllArgsConstructor; import lombok.Data; @Data public class Client { @@ -57,7 +58,7 @@ import lombok.Data; private Log log; private Job renderingJob; private Job previousJob; - private BlockingQueue jobsToValidate; + private BlockingQueue jobsToValidate; private boolean isValidatingJob; private long startTime; @@ -78,7 +79,7 @@ import lombok.Data; this.gui = gui_; this.renderingJob = null; this.previousJob = null; - this.jobsToValidate = new ArrayBlockingQueue(1024); + this.jobsToValidate = new ArrayBlockingQueue(5); this.isValidatingJob = false; this.disableErrorSending = false; @@ -354,7 +355,7 @@ import lombok.Data; if (this.renderingJob.isSynchronousUpload() == true) { // power or compute_method job, need to upload right away this.gui.status(String.format("Uploading frame (%.2fMB)", (this.renderingJob.getOutputImageSize() / 1024.0 / 1024.0))); - ret = confirmJob(this.renderingJob); + ret = confirmJob(this.renderingJob, step); if (ret != Error.Type.OK) { gui.error("Client::run problem with confirmJob (returned " + ret + ")"); sendError(step, this.renderingJob, Error.Type.VALIDATION_FAILED); @@ -363,7 +364,7 @@ import lombok.Data; else { this.gui.status(String.format("Queuing frame for upload (%.2fMB)", (this.renderingJob.getOutputImageSize() / 1024.0 / 1024.0))); - this.jobsToValidate.add(this.renderingJob); + this.jobsToValidate.add(new QueuedJob(step, this.renderingJob)); this.uploadQueueSize++; this.uploadQueueVolume += this.renderingJob.getOutputImageSize(); @@ -478,27 +479,41 @@ import lombok.Data; } public int senderLoop() { - int step = log.newCheckPoint(); - Error.Type ret; + int step = -1; + Error.Type ret = null; while (true) { - Job job_to_send = null; + QueuedJob queuedJob = null; try { - job_to_send = jobsToValidate.take(); - this.log.debug("will validate " + job_to_send); + queuedJob = jobsToValidate.take(); + step = queuedJob.checkpoint; // retrieve the checkpoint attached to the job - ret = confirmJob(job_to_send); + this.log.debug(step, "will validate " + queuedJob.job); + + ret = confirmJob(queuedJob.job, step); if (ret != Error.Type.OK) { this.gui.error(Error.humanString(ret)); - this.log.debug("Client::senderLoop confirm failed, ret: " + ret); - sendError(step); + this.log.debug(step, "Client::senderLoop confirm failed, ret: " + ret); } } catch (InterruptedException e) { + this.log.error(step, "Client::senderLoop Exception " + e.getMessage()); } finally { + if (ret != Error.Type.OK) { + if (queuedJob.job != null) { + sendError(step, queuedJob.job, ret); + } + else { + sendError(step); + } + } + + // Remove the checkpoint information + log.removeCheckPoint(step); + this.uploadQueueSize--; - if (job_to_send != null) { - this.uploadQueueVolume -= job_to_send.getOutputImageSize(); + if (queuedJob.job != null) { + this.uploadQueueVolume -= queuedJob.job.getOutputImageSize(); } this.gui.displayUploadQueueStats(this.uploadQueueSize, this.uploadQueueVolume); @@ -566,7 +581,7 @@ import lombok.Data; args += "&extras=" + job_to_reset_.getExtras(); } } - this.server.HTTPSendFile(this.server.getPage("error") + args, temp_file.getAbsolutePath()); + this.server.HTTPSendFile(this.server.getPage("error") + args, temp_file.getAbsolutePath(), step_); temp_file.delete(); } catch (Exception e) { @@ -839,11 +854,11 @@ import lombok.Data; return 0; } - protected Error.Type confirmJob(Job ajob) { + protected Error.Type confirmJob(Job ajob, int checkpoint) { String url_real = String.format("%s&rendertime=%d&memoryused=%s", ajob.getValidationUrl(), ajob.getProcessRender().getDuration(), ajob.getProcessRender().getMemoryUsed()); - this.log.debug("Client::confirmeJob url " + url_real); - this.log.debug("path frame " + ajob.getOutputImagePath()); + this.log.debug(checkpoint, "Client::confirmeJob url " + url_real); + this.log.debug(checkpoint, "path frame " + ajob.getOutputImagePath()); this.isValidatingJob = true; int nb_try = 1; @@ -852,7 +867,7 @@ import lombok.Data; Type confirmJobReturnCode = Error.Type.OK; retryLoop: while (nb_try < max_try && ret != ServerCode.OK) { - ret = this.server.HTTPSendFile(url_real, ajob.getOutputImagePath()); + ret = this.server.HTTPSendFile(url_real, ajob.getOutputImagePath(), checkpoint); switch (ret) { case OK: // no issue, exit the loop @@ -876,7 +891,7 @@ import lombok.Data; nb_try++; if (ret != ServerCode.OK && nb_try < max_try) { try { - this.log.debug("Sleep for 32s before trying to re-upload the frame"); + this.log.debug(checkpoint, "Sleep for 32s before trying to re-upload the frame"); Thread.sleep(32000); } catch (InterruptedException e) { @@ -907,4 +922,14 @@ import lombok.Data; } return (concurrent_job >= this.configuration.getMaxUploadingJob()); } + + /**************** + * Inner class that will hold the queued jobs. The constructor accepts two parameters: + * @int checkpoint - the checkpoint associated with the job (to add any additional log to the render output) + * @Job job - the job to be validated + */ + @AllArgsConstructor class QueuedJob { + final private int checkpoint; + final private Job job; + } } diff --git a/src/com/sheepit/client/Log.java b/src/com/sheepit/client/Log.java index 50f2a9a..6ef8d58 100644 --- a/src/com/sheepit/client/Log.java +++ b/src/com/sheepit/client/Log.java @@ -43,25 +43,39 @@ public class Log { } public void debug(String msg_) { - this.append("debug", msg_); + this.debug(-1, msg_); + } + + public void debug(int point_, String msg_) { + this.append(point_, "debug", msg_); } public void info(String msg_) { - this.append("info", msg_); + this.info(-1, msg_); + } + + public void info(int point_, String msg_) { + this.append(point_, "info", msg_); } public void error(String msg_) { - this.append("error", msg_); + this.error(-1, msg_); + } + + public void error(int point_, String msg_) { + this.append(point_, "error", msg_); } - private void append(String level_, String msg_) { + private synchronized void append(int point_, String level_, String msg_) { String line = null; try { + int checkpointToWrite = (point_ > 0 ? point_ : this.lastCheckPoint); + if (msg_.equals("") == false) { line = this.dateFormat.format(new java.util.Date()) + " (" + level_ + ") " + msg_; - if (this.checkpoints.containsKey(this.lastCheckPoint) && this.checkpoints.get(this.lastCheckPoint) != null) { - this.checkpoints.get(this.lastCheckPoint).add(line); + if (this.checkpoints.containsKey(checkpointToWrite) && this.checkpoints.get(checkpointToWrite) != null) { + this.checkpoints.get(checkpointToWrite).add(line); } if (this.printStdOut == true) { System.out.println(line); diff --git a/src/com/sheepit/client/Server.java b/src/com/sheepit/client/Server.java index bffc810..b5bbd1f 100644 --- a/src/com/sheepit/client/Server.java +++ b/src/com/sheepit/client/Server.java @@ -469,8 +469,8 @@ public class Server extends Thread { return -2; } - public ServerCode HTTPSendFile(String surl, String file1) { - this.log.debug("Server::HTTPSendFile(" + surl + "," + file1 + ")"); + public ServerCode HTTPSendFile(String surl, String file1, int checkpoint) { + this.log.debug(checkpoint, "Server::HTTPSendFile(" + surl + "," + file1 + ")"); try { String fileMimeType = Files.probeContentType(Paths.get(file1)); @@ -497,7 +497,7 @@ public class Server extends Thread { ServerCode serverCode = ServerCode.fromInt(jobValidation.getStatus()); if (serverCode != ServerCode.OK) { - this.log.error("Server::HTTPSendFile wrong status (is " + serverCode + ")"); + this.log.error(checkpoint, "Server::HTTPSendFile wrong status (is " + serverCode + ")"); return serverCode; } } @@ -520,21 +520,21 @@ public class Server extends Thread { StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw); e.printStackTrace(pw); - this.log.error(String.format("Server::HTTPSendFile Error in upload process. Exception %s stacktrace ", e.getMessage(), sw.toString())); + this.log.error(checkpoint, String.format("Server::HTTPSendFile Error in upload process. Exception %s stacktrace ", e.getMessage(), sw.toString())); return ServerCode.UNKNOWN; } catch (OutOfMemoryError e) { StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw); e.printStackTrace(pw); - this.log.error("Server::HTTPSendFile, OutOfMemoryError " + e + " stacktrace " + sw.toString()); + this.log.error(checkpoint, "Server::HTTPSendFile, OutOfMemoryError " + e + " stacktrace " + sw.toString()); return ServerCode.JOB_VALIDATION_ERROR_UPLOAD_FAILED; } catch (Exception e) { StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw); e.printStackTrace(pw); - this.log.error("Server::HTTPSendFile, Exception " + e + " stacktrace " + sw.toString()); + this.log.error(checkpoint, "Server::HTTPSendFile, Exception " + e + " stacktrace " + sw.toString()); return ServerCode.UNKNOWN; } }