netrender: fix some bugs with job cancellation, remove credits system, add more status report on server, cleanup server error management

This commit is contained in:
Martin Poirier 2009-09-26 16:22:52 +00:00
parent da5ff2ca98
commit 903d8231d9
6 changed files with 131 additions and 147 deletions

@ -61,12 +61,6 @@ class Balancer:
# ==========================
class RatingCredit(RatingRule):
def rate(self, job):
# more credit is better (sort at first in list)
return -job.credits * job.priority
class RatingUsage(RatingRule):
def rate(self, job):
# less usage is better

@ -35,9 +35,14 @@ class MRenderSlave(netrender.model.RenderSlave):
def seen(self):
self.last_seen = time.time()
def finishedFrame(self, frame_number):
self.job_frames.remove(frame_number)
if not self.job_frames:
self.job = None
class MRenderJob(netrender.model.RenderJob):
def __init__(self, job_id, name, files, chunks = 1, priority = 1, credits = 100.0, blacklist = []):
def __init__(self, job_id, name, files, chunks = 1, priority = 1, blacklist = []):
super().__init__()
self.id = job_id
self.name = name
@ -46,7 +51,6 @@ class MRenderJob(netrender.model.RenderJob):
self.chunks = chunks
self.priority = priority
self.usage = 0.0
self.credits = credits
self.blacklist = blacklist
self.last_dispatched = time.time()
@ -79,14 +83,6 @@ class MRenderJob(netrender.model.RenderJob):
def start(self):
self.status = JOB_QUEUED
def update(self):
if self.last_update == 0:
self.credits += (time.time() - self.last_dispatched) / 60
else:
self.credits += (time.time() - self.last_update) / 60
self.last_update = time.time()
def addLog(self, frames):
log_name = "_".join(("%04d" % f for f in frames)) + ".log"
@ -110,7 +106,6 @@ class MRenderJob(netrender.model.RenderJob):
frames = []
for f in self.frames:
if f.status == QUEUED:
self.credits -= 1 # cost of one frame
self.last_dispatched = time.time()
frames.append(f)
if len(frames) >= self.chunks:
@ -150,30 +145,24 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
self.end_headers()
def do_HEAD(self):
print(self.path)
if self.path == "/status":
job_id = self.headers.get('job-id', "")
job_frame = int(self.headers.get('job-frame', -1))
if job_id:
print("status:", job_id, "\n")
job = self.server.getJobID(job_id)
if job:
frame = job[job_frame]
job = self.server.getJobByID(job_id)
if job:
if job_frame != -1:
frame = job[frame]
if not frame:
# no such frame
self.send_heat(http.client.NO_CONTENT)
return
if frame:
self.send_head(http.client.OK)
else:
# no such job id
# no such frame
self.send_head(http.client.NO_CONTENT)
return
self.send_head()
else:
# no such job id
self.send_head(http.client.NO_CONTENT)
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
# -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
@ -182,19 +171,17 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
def do_GET(self):
print(self.path)
if self.path == "/version":
self.send_head()
self.server.stats("", "New client connection")
self.server.stats("", "Version check")
self.wfile.write(VERSION)
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
elif self.path == "/render":
job_id = self.headers['job-id']
job_frame = int(self.headers['job-frame'])
print("render:", job_id, job_frame)
job = self.server.getJobByID(job_id)
job = self.server.getJobID(job_id)
if job:
frame = job[job_frame]
@ -203,7 +190,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
if frame.status in (QUEUED, DISPATCHED):
self.send_head(http.client.ACCEPTED)
elif frame.status == DONE:
self.server.stats("", "Sending result back to client")
self.server.stats("", "Sending result to client")
f = open(job.save_path + "%04d" % job_frame + ".exr", 'rb')
self.send_head()
@ -223,9 +210,8 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
elif self.path == "/log":
job_id = self.headers['job-id']
job_frame = int(self.headers['job-frame'])
print("log:", job_id, job_frame)
job = self.server.getJobByID(job_id)
job = self.server.getJobID(job_id)
if job:
frame = job[job_frame]
@ -234,7 +220,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
if not frame.log_path or frame.status in (QUEUED, DISPATCHED):
self.send_head(http.client.PROCESSING)
else:
self.server.stats("", "Sending log back to client")
self.server.stats("", "Sending log to client")
f = open(frame.log_path, 'rb')
self.send_head()
@ -254,9 +240,8 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
job_frame = int(self.headers.get('job-frame', -1))
if job_id:
print("status:", job_id, "\n")
job = self.server.getJobByID(job_id)
job = self.server.getJobID(job_id)
if job:
if job_frame != -1:
frame = job[frame]
@ -279,21 +264,21 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
for job in self.server:
message.append(job.serialize())
self.server.stats("", "Sending status")
self.send_head()
self.wfile.write(bytes(repr(message), encoding='utf8'))
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
elif self.path == "/job":
self.server.update()
self.server.balance()
slave_id = self.headers['slave-id']
print("slave-id", slave_id)
slave = self.server.updateSlave(slave_id)
slave = self.server.getSeenSlave(slave_id)
if slave: # only if slave id is valid
job, frames = self.server.getNewJob(slave_id)
job, frames = self.server.newDispatch(slave_id)
if job and frames:
for f in frames:
@ -310,9 +295,12 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
self.wfile.write(bytes(repr(message), encoding='utf8'))
self.server.stats("", "Sending job frame to render node")
self.server.stats("", "Sending job to slave")
else:
# no job available, return error code
slave.job = None
slave.job_frames = []
self.send_head(http.client.ACCEPTED)
else: # invalid slave id
self.send_head(http.client.NO_CONTENT)
@ -320,21 +308,19 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
elif self.path == "/file":
slave_id = self.headers['slave-id']
slave = self.server.updateSlave(slave_id)
slave = self.server.getSeenSlave(slave_id)
if slave: # only if slave id is valid
job_id = self.headers['job-id']
job_file = self.headers['job-file']
print("job:", job_id, "\n")
print("file:", job_file, "\n")
job = self.server.getJobByID(job_id)
job = self.server.getJobID(job_id)
if job:
render_file = job.files_map.get(job_file, None)
if render_file:
self.server.stats("", "Sending file to render node")
self.server.stats("", "Sending file to slave")
f = open(render_file.filepath, 'rb')
self.send_head()
@ -350,9 +336,11 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
else: # invalid slave id
self.send_head(http.client.NO_CONTENT)
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
elif self.path == "/slave":
elif self.path == "/slaves":
message = []
self.server.stats("", "Sending slaves status")
for slave in self.server.slaves:
message.append(slave.serialize())
@ -370,12 +358,9 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
# -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
def do_POST(self):
print(self.path)
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
if self.path == "/job":
print("posting job info")
self.server.stats("", "Receiving job")
length = int(self.headers['content-length'])
@ -383,8 +368,6 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
job_id = self.server.nextJobID()
print(job_info.files)
job = MRenderJob(job_id, job_info.name, job_info.files, chunks = job_info.chunks, priority = job_info.priority, blacklist = job_info.blacklist)
for frame in job_info.frames:
@ -395,17 +378,30 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
headers={"job-id": job_id}
if job.testStart():
self.server.stats("", "New job, missing files")
self.send_head(headers=headers)
else:
self.server.stats("", "New job, started")
self.send_head(http.client.ACCEPTED, headers=headers)
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
elif self.path == "/cancel":
job_id = self.headers.get('job-id', "")
if job_id:
print("cancel:", job_id, "\n")
self.server.removeJob(job_id)
else: # cancel all jobs
self.server.clear()
job = self.server.getJobID(job_id)
if job:
self.server.stats("", "Cancelling job")
self.server.removeJob(job)
self.send_head()
else:
# no such job id
self.send_head(http.client.NO_CONTENT)
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
elif self.path == "/clear":
# cancel all jobs
self.server.stats("", "Clearing jobs")
self.server.clear()
self.send_head()
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
@ -414,15 +410,25 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
job_frame = int(self.headers.get('job-frame', "-1"))
all = bool(self.headers.get('reset-all', "False"))
job = self.server.getJobByID(job_id)
job = self.server.getJobID(job_id)
if job:
if job_frame != -1:
job[job_frame].reset(all)
else:
job.reset(all)
self.send_head()
frame = job[job_frame]
if frame:
self.server.stats("", "Reset job frame")
frame.reset(all)
self.send_head()
else:
# no such frame
self.send_head(http.client.NO_CONTENT)
else:
self.server.stats("", "Reset job")
job.reset(all)
self.send_head()
else: # job not found
self.send_head(http.client.NO_CONTENT)
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
@ -430,6 +436,8 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
length = int(self.headers['content-length'])
job_frame_string = self.headers['job-frame']
self.server.stats("", "New slave connected")
slave_info = netrender.model.RenderSlave.materialize(eval(str(self.rfile.read(length), encoding='utf8')))
slave_id = self.server.addSlave(slave_info.name, self.client_address, slave_info.stats)
@ -439,16 +447,17 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
elif self.path == "/log":
slave_id = self.headers['slave-id']
slave = self.server.updateSlave(slave_id)
slave = self.server.getSeenSlave(slave_id)
if slave: # only if slave id is valid
length = int(self.headers['content-length'])
log_info = netrender.model.LogFile.materialize(eval(str(self.rfile.read(length), encoding='utf8')))
job = self.server.getJobByID(log_info.job_id)
job = self.server.getJobID(log_info.job_id)
if job:
self.server.stats("", "Log announcement")
job.addLog(log_info.frames)
self.send_head(http.client.OK)
else:
@ -462,18 +471,16 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
# -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
def do_PUT(self):
print(self.path)
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
if self.path == "/file":
print("writing blend file")
self.server.stats("", "Receiving job")
length = int(self.headers['content-length'])
job_id = self.headers['job-id']
job_file = self.headers['job-file']
job = self.server.getJobByID(job_id)
job = self.server.getJobID(job_id)
if job:
@ -501,8 +508,10 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
render_file.filepath = file_path # set the new path
if job.testStart():
self.server.stats("", "File upload, starting job")
self.send_head(http.client.OK)
else:
self.server.stats("", "File upload, file missings")
self.send_head(http.client.ACCEPTED)
else: # invalid file
self.send_head(http.client.NO_CONTENT)
@ -510,17 +519,16 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
self.send_head(http.client.NO_CONTENT)
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
elif self.path == "/render":
print("writing result file")
self.server.stats("", "Receiving render result")
slave_id = self.headers['slave-id']
slave = self.server.updateSlave(slave_id)
slave = self.server.getSeenSlave(slave_id)
if slave: # only if slave id is valid
job_id = self.headers['job-id']
job = self.server.getJobByID(job_id)
job = self.server.getJobID(job_id)
if job:
job_frame = int(self.headers['job-frame'])
@ -528,43 +536,43 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
job_time = float(self.headers['job-time'])
frame = job[job_frame]
if job_result == DONE:
length = int(self.headers['content-length'])
buf = self.rfile.read(length)
f = open(job.save_path + "%04d" % job_frame + ".exr", 'wb')
f.write(buf)
f.close()
if frame:
if job_result == DONE:
length = int(self.headers['content-length'])
buf = self.rfile.read(length)
f = open(job.save_path + "%04d" % job_frame + ".exr", 'wb')
f.write(buf)
f.close()
del buf
elif job_result == ERROR:
# blacklist slave on this job on error
job.blacklist.append(slave.id)
del buf
elif job_result == ERROR:
# blacklist slave on this job on error
job.blacklist.append(slave.id)
slave.job_frames.remove(job_frame)
if not slave.job_frames:
slave.job = None
frame.status = job_result
frame.time = job_time
job.testFinished()
self.server.updateSlave(self.headers['slave-id'])
self.send_head()
self.server.stats("", "Receiving result")
slave.finishedFrame(job_frame)
frame.status = job_result
frame.time = job_time
job.testFinished()
self.send_head()
else: # frame not found
self.send_head(http.client.NO_CONTENT)
else: # job not found
self.send_head(http.client.NO_CONTENT)
else: # invalid slave id
self.send_head(http.client.NO_CONTENT)
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
elif self.path == "/log":
print("writing log file")
self.server.stats("", "Receiving log file")
job_id = self.headers['job-id']
job = self.server.getJobByID(job_id)
job = self.server.getJobID(job_id)
if job:
job_frame = int(self.headers['job-frame'])
@ -580,7 +588,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
del buf
self.server.updateSlave(self.headers['slave-id'])
self.server.getSeenSlave(self.headers['slave-id'])
self.send_head()
else: # frame not found
@ -600,10 +608,7 @@ class RenderMasterServer(http.server.HTTPServer):
self.slave_timeout = 2
self.first_usage = True
self.balancer = netrender.balancing.Balancer()
#self.balancer.addRule(netrender.balancing.RatingCredit())
self.balancer.addRule(netrender.balancing.RatingUsage())
self.balancer.addException(netrender.balancing.ExcludeQueuedEmptyJob())
self.balancer.addException(netrender.balancing.ExcludeSlavesLimit(self.countJobs, self.countSlaves, limit = 0.9))
@ -631,7 +636,7 @@ class RenderMasterServer(http.server.HTTPServer):
def getSlave(self, slave_id):
return self.slaves_map.get(slave_id, None)
def updateSlave(self, slave_id):
def getSeenSlave(self, slave_id):
slave = self.getSlave(slave_id)
if slave:
slave.seen()
@ -655,18 +660,12 @@ class RenderMasterServer(http.server.HTTPServer):
self.removeSlave(slave)
def updateUsage(self):
m = 1.0
if not self.first_usage:
for job in self.jobs:
job.usage *= 0.5
m = 0.5
else:
self.first_usage = False
blend = 0.5
for job in self.jobs:
job.usage *= (1 - blend)
if self.slaves:
slave_usage = m / self.countSlaves()
slave_usage = blend / self.countSlaves()
for slave in self.slaves:
if slave.job:
@ -679,9 +678,7 @@ class RenderMasterServer(http.server.HTTPServer):
for job in removed:
self.removeJob(job)
def update(self):
for job in self.jobs:
job.update()
def balance(self):
self.balancer.balance(self.jobs)
def countJobs(self, status = JOB_QUEUED):
@ -695,16 +692,14 @@ class RenderMasterServer(http.server.HTTPServer):
def countSlaves(self):
return len(self.slaves)
def removeJob(self, id):
job = self.jobs_map.pop(id)
if job:
self.jobs.remove(job)
for slave in self.slaves:
if slave.job == job:
slave.job = None
slave.job_frames = []
def removeJob(self, job):
self.jobs.remove(job)
self.jobs_map.pop(job.id)
for slave in self.slaves:
if slave.job == job:
slave.job = None
slave.job_frames = []
def addJob(self, job):
self.jobs.append(job)
@ -717,14 +712,14 @@ class RenderMasterServer(http.server.HTTPServer):
job.save()
def getJobByID(self, id):
def getJobID(self, id):
return self.jobs_map.get(id, None)
def __iter__(self):
for job in self.jobs:
yield job
def getNewJob(self, slave_id):
def newDispatch(self, slave_id):
if self.jobs:
for job in self.jobs:
if not self.balancer.applyExceptions(job) and slave_id not in job.blacklist:

@ -55,7 +55,6 @@ def get(handler):
headerTable(
"name",
"priority",
"credits",
"usage",
"wait",
"length",
@ -66,14 +65,13 @@ def get(handler):
"exception"
)
handler.server.update()
handler.server.balance()
for job in handler.server.jobs:
results = job.framesStatus()
rowTable(
link(job.name, "/html/job" + job.id),
job.priority,
round(job.credits, 1),
"%0.1f%%" % (job.usage * 100),
"%is" % int(time.time() - job.last_dispatched),
len(job),
@ -92,7 +90,7 @@ def get(handler):
output("<html><head><title>NetRender</title></head><body>")
job = handler.server.getJobByID(job_id)
job = handler.server.getJobID(job_id)
if job:
output("<h2>Frames</h2>")
@ -119,7 +117,7 @@ def get(handler):
job_id = match.groups()[0]
frame_number = int(match.groups()[1])
job = handler.server.getJobByID(job_id)
job = handler.server.getJobID(job_id)
if job:
frame = job[frame_number]

@ -80,7 +80,6 @@ class RenderJob:
self.frames = []
self.chunks = 0
self.priority = 0
self.credits = 0
self.usage = 0.0
self.blacklist = []
self.last_dispatched = 0.0
@ -145,7 +144,6 @@ class RenderJob:
"chunks": self.chunks,
"priority": self.priority,
"usage": self.usage,
"credits": self.credits,
"blacklist": self.blacklist,
"last_dispatched": self.last_dispatched
}
@ -163,7 +161,6 @@ class RenderJob:
job.chunks = data["chunks"]
job.priority = data["priority"]
job.usage = data["usage"]
job.credits = data["credits"]
job.blacklist = data["blacklist"]
job.last_dispatched = data["last_dispatched"]

@ -205,7 +205,7 @@ class RENDER_OT_netclientslaves(bpy.types.Operator):
conn = clientConnection(context.scene)
if conn:
conn.request("GET", "/slave")
conn.request("GET", "/slaves")
response = conn.getresponse()
print( response.status, response.reason )
@ -289,7 +289,7 @@ class RENDER_OT_netclientcancelall(bpy.types.Operator):
conn = clientConnection(context.scene)
if conn:
conn.request("POST", "/cancel")
conn.request("POST", "/clear")
response = conn.getresponse()
print( response.status, response.reason )

@ -32,8 +32,8 @@ def slave_Info():
slave.stats = sysname + " " + release + " " + machine + " " + processor
return slave
def testCancel(conn, job_id):
conn.request("HEAD", "/status", headers={"job-id":job_id})
def testCancel(conn, job_id, frame_number):
conn.request("HEAD", "/status", headers={"job-id":job_id, "job-frame": str(frame_number)})
response = conn.getresponse()
# cancelled if job isn't found anymore
@ -152,7 +152,7 @@ def render_slave(engine, scene):
stdout = bytes()
run_t = current_t
if testCancel(conn, job.id):
if testCancel(conn, job.id, first_frame):
cancelled = True
if cancelled: