Fix: Send reset signal to server on job upload error (#260)

* Fix: Send reset signal to server on job upload error
This commit is contained in:
Luis Uguina
2020-06-18 21:57:52 +10:00
committed by GitHub
parent 838cd7c0ec
commit 7685051662
3 changed files with 71 additions and 32 deletions

View File

@@ -48,6 +48,7 @@ import com.sheepit.client.exception.FermeServerDown;
import com.sheepit.client.hardware.cpu.CPU; import com.sheepit.client.hardware.cpu.CPU;
import com.sheepit.client.os.OS; import com.sheepit.client.os.OS;
import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
@Data public class Client { @Data public class Client {
@@ -57,7 +58,7 @@ import lombok.Data;
private Log log; private Log log;
private Job renderingJob; private Job renderingJob;
private Job previousJob; private Job previousJob;
private BlockingQueue<Job> jobsToValidate; private BlockingQueue<QueuedJob> jobsToValidate;
private boolean isValidatingJob; private boolean isValidatingJob;
private long startTime; private long startTime;
@@ -78,7 +79,7 @@ import lombok.Data;
this.gui = gui_; this.gui = gui_;
this.renderingJob = null; this.renderingJob = null;
this.previousJob = null; this.previousJob = null;
this.jobsToValidate = new ArrayBlockingQueue<Job>(1024); this.jobsToValidate = new ArrayBlockingQueue<QueuedJob>(5);
this.isValidatingJob = false; this.isValidatingJob = false;
this.disableErrorSending = false; this.disableErrorSending = false;
@@ -354,7 +355,7 @@ import lombok.Data;
if (this.renderingJob.isSynchronousUpload() == true) { // power or compute_method job, need to upload right away if (this.renderingJob.isSynchronousUpload() == true) { // power or compute_method job, need to upload right away
this.gui.status(String.format("Uploading frame (%.2fMB)", (this.renderingJob.getOutputImageSize() / 1024.0 / 1024.0))); this.gui.status(String.format("Uploading frame (%.2fMB)", (this.renderingJob.getOutputImageSize() / 1024.0 / 1024.0)));
ret = confirmJob(this.renderingJob); ret = confirmJob(this.renderingJob, step);
if (ret != Error.Type.OK) { if (ret != Error.Type.OK) {
gui.error("Client::run problem with confirmJob (returned " + ret + ")"); gui.error("Client::run problem with confirmJob (returned " + ret + ")");
sendError(step, this.renderingJob, Error.Type.VALIDATION_FAILED); sendError(step, this.renderingJob, Error.Type.VALIDATION_FAILED);
@@ -363,7 +364,7 @@ import lombok.Data;
else { else {
this.gui.status(String.format("Queuing frame for upload (%.2fMB)", (this.renderingJob.getOutputImageSize() / 1024.0 / 1024.0))); this.gui.status(String.format("Queuing frame for upload (%.2fMB)", (this.renderingJob.getOutputImageSize() / 1024.0 / 1024.0)));
this.jobsToValidate.add(this.renderingJob); this.jobsToValidate.add(new QueuedJob(step, this.renderingJob));
this.uploadQueueSize++; this.uploadQueueSize++;
this.uploadQueueVolume += this.renderingJob.getOutputImageSize(); this.uploadQueueVolume += this.renderingJob.getOutputImageSize();
@@ -478,27 +479,41 @@ import lombok.Data;
} }
public int senderLoop() { public int senderLoop() {
int step = log.newCheckPoint(); int step = -1;
Error.Type ret; Error.Type ret = null;
while (true) { while (true) {
Job job_to_send = null; QueuedJob queuedJob = null;
try { try {
job_to_send = jobsToValidate.take(); queuedJob = jobsToValidate.take();
this.log.debug("will validate " + job_to_send); step = queuedJob.checkpoint; // retrieve the checkpoint attached to the job
ret = confirmJob(job_to_send); this.log.debug(step, "will validate " + queuedJob.job);
ret = confirmJob(queuedJob.job, step);
if (ret != Error.Type.OK) { if (ret != Error.Type.OK) {
this.gui.error(Error.humanString(ret)); this.gui.error(Error.humanString(ret));
this.log.debug("Client::senderLoop confirm failed, ret: " + ret); this.log.debug(step, "Client::senderLoop confirm failed, ret: " + ret);
sendError(step);
} }
} }
catch (InterruptedException e) { catch (InterruptedException e) {
this.log.error(step, "Client::senderLoop Exception " + e.getMessage());
} }
finally { finally {
if (ret != Error.Type.OK) {
if (queuedJob.job != null) {
sendError(step, queuedJob.job, ret);
}
else {
sendError(step);
}
}
// Remove the checkpoint information
log.removeCheckPoint(step);
this.uploadQueueSize--; this.uploadQueueSize--;
if (job_to_send != null) { if (queuedJob.job != null) {
this.uploadQueueVolume -= job_to_send.getOutputImageSize(); this.uploadQueueVolume -= queuedJob.job.getOutputImageSize();
} }
this.gui.displayUploadQueueStats(this.uploadQueueSize, this.uploadQueueVolume); this.gui.displayUploadQueueStats(this.uploadQueueSize, this.uploadQueueVolume);
@@ -566,7 +581,7 @@ import lombok.Data;
args += "&extras=" + job_to_reset_.getExtras(); args += "&extras=" + job_to_reset_.getExtras();
} }
} }
this.server.HTTPSendFile(this.server.getPage("error") + args, temp_file.getAbsolutePath()); this.server.HTTPSendFile(this.server.getPage("error") + args, temp_file.getAbsolutePath(), step_);
temp_file.delete(); temp_file.delete();
} }
catch (Exception e) { catch (Exception e) {
@@ -839,11 +854,11 @@ import lombok.Data;
return 0; return 0;
} }
protected Error.Type confirmJob(Job ajob) { protected Error.Type confirmJob(Job ajob, int checkpoint) {
String url_real = String.format("%s&rendertime=%d&memoryused=%s", ajob.getValidationUrl(), ajob.getProcessRender().getDuration(), String url_real = String.format("%s&rendertime=%d&memoryused=%s", ajob.getValidationUrl(), ajob.getProcessRender().getDuration(),
ajob.getProcessRender().getMemoryUsed()); ajob.getProcessRender().getMemoryUsed());
this.log.debug("Client::confirmeJob url " + url_real); this.log.debug(checkpoint, "Client::confirmeJob url " + url_real);
this.log.debug("path frame " + ajob.getOutputImagePath()); this.log.debug(checkpoint, "path frame " + ajob.getOutputImagePath());
this.isValidatingJob = true; this.isValidatingJob = true;
int nb_try = 1; int nb_try = 1;
@@ -852,7 +867,7 @@ import lombok.Data;
Type confirmJobReturnCode = Error.Type.OK; Type confirmJobReturnCode = Error.Type.OK;
retryLoop: retryLoop:
while (nb_try < max_try && ret != ServerCode.OK) { while (nb_try < max_try && ret != ServerCode.OK) {
ret = this.server.HTTPSendFile(url_real, ajob.getOutputImagePath()); ret = this.server.HTTPSendFile(url_real, ajob.getOutputImagePath(), checkpoint);
switch (ret) { switch (ret) {
case OK: case OK:
// no issue, exit the loop // no issue, exit the loop
@@ -876,7 +891,7 @@ import lombok.Data;
nb_try++; nb_try++;
if (ret != ServerCode.OK && nb_try < max_try) { if (ret != ServerCode.OK && nb_try < max_try) {
try { try {
this.log.debug("Sleep for 32s before trying to re-upload the frame"); this.log.debug(checkpoint, "Sleep for 32s before trying to re-upload the frame");
Thread.sleep(32000); Thread.sleep(32000);
} }
catch (InterruptedException e) { catch (InterruptedException e) {
@@ -907,4 +922,14 @@ import lombok.Data;
} }
return (concurrent_job >= this.configuration.getMaxUploadingJob()); return (concurrent_job >= this.configuration.getMaxUploadingJob());
} }
/****************
* Inner class that will hold the queued jobs. The constructor accepts two parameters:
* @int checkpoint - the checkpoint associated with the job (to add any additional log to the render output)
* @Job job - the job to be validated
*/
@AllArgsConstructor class QueuedJob {
final private int checkpoint;
final private Job job;
}
} }

View File

@@ -43,25 +43,39 @@ public class Log {
} }
public void debug(String msg_) { public void debug(String msg_) {
this.append("debug", msg_); this.debug(-1, msg_);
}
public void debug(int point_, String msg_) {
this.append(point_, "debug", msg_);
} }
public void info(String msg_) { public void info(String msg_) {
this.append("info", msg_); this.info(-1, msg_);
}
public void info(int point_, String msg_) {
this.append(point_, "info", msg_);
} }
public void error(String msg_) { public void error(String msg_) {
this.append("error", msg_); this.error(-1, msg_);
} }
private void append(String level_, String msg_) { public void error(int point_, String msg_) {
this.append(point_, "error", msg_);
}
private synchronized void append(int point_, String level_, String msg_) {
String line = null; String line = null;
try { try {
int checkpointToWrite = (point_ > 0 ? point_ : this.lastCheckPoint);
if (msg_.equals("") == false) { if (msg_.equals("") == false) {
line = this.dateFormat.format(new java.util.Date()) + " (" + level_ + ") " + msg_; line = this.dateFormat.format(new java.util.Date()) + " (" + level_ + ") " + msg_;
if (this.checkpoints.containsKey(this.lastCheckPoint) && this.checkpoints.get(this.lastCheckPoint) != null) { if (this.checkpoints.containsKey(checkpointToWrite) && this.checkpoints.get(checkpointToWrite) != null) {
this.checkpoints.get(this.lastCheckPoint).add(line); this.checkpoints.get(checkpointToWrite).add(line);
} }
if (this.printStdOut == true) { if (this.printStdOut == true) {
System.out.println(line); System.out.println(line);

View File

@@ -469,8 +469,8 @@ public class Server extends Thread {
return -2; return -2;
} }
public ServerCode HTTPSendFile(String surl, String file1) { public ServerCode HTTPSendFile(String surl, String file1, int checkpoint) {
this.log.debug("Server::HTTPSendFile(" + surl + "," + file1 + ")"); this.log.debug(checkpoint, "Server::HTTPSendFile(" + surl + "," + file1 + ")");
try { try {
String fileMimeType = Files.probeContentType(Paths.get(file1)); String fileMimeType = Files.probeContentType(Paths.get(file1));
@@ -497,7 +497,7 @@ public class Server extends Thread {
ServerCode serverCode = ServerCode.fromInt(jobValidation.getStatus()); ServerCode serverCode = ServerCode.fromInt(jobValidation.getStatus());
if (serverCode != ServerCode.OK) { if (serverCode != ServerCode.OK) {
this.log.error("Server::HTTPSendFile wrong status (is " + serverCode + ")"); this.log.error(checkpoint, "Server::HTTPSendFile wrong status (is " + serverCode + ")");
return serverCode; return serverCode;
} }
} }
@@ -520,21 +520,21 @@ public class Server extends Thread {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw); e.printStackTrace(pw);
this.log.error(String.format("Server::HTTPSendFile Error in upload process. Exception %s stacktrace ", e.getMessage(), sw.toString())); this.log.error(checkpoint, String.format("Server::HTTPSendFile Error in upload process. Exception %s stacktrace ", e.getMessage(), sw.toString()));
return ServerCode.UNKNOWN; return ServerCode.UNKNOWN;
} }
catch (OutOfMemoryError e) { catch (OutOfMemoryError e) {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw); e.printStackTrace(pw);
this.log.error("Server::HTTPSendFile, OutOfMemoryError " + e + " stacktrace " + sw.toString()); this.log.error(checkpoint, "Server::HTTPSendFile, OutOfMemoryError " + e + " stacktrace " + sw.toString());
return ServerCode.JOB_VALIDATION_ERROR_UPLOAD_FAILED; return ServerCode.JOB_VALIDATION_ERROR_UPLOAD_FAILED;
} }
catch (Exception e) { catch (Exception e) {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw); e.printStackTrace(pw);
this.log.error("Server::HTTPSendFile, Exception " + e + " stacktrace " + sw.toString()); this.log.error(checkpoint, "Server::HTTPSendFile, Exception " + e + " stacktrace " + sw.toString());
return ServerCode.UNKNOWN; return ServerCode.UNKNOWN;
} }
} }