forked from bartvdbraak/blender
netrender: draft code for cluster usage per job calculations. Eventually, this will be used for load balancing
This commit is contained in:
parent
2a63c4ab7b
commit
ddb46e12f9
@ -91,4 +91,4 @@ class ExcludeSlavesLimit(ExclusionRule):
|
|||||||
self.limit = limit
|
self.limit = limit
|
||||||
|
|
||||||
def test(self, job):
|
def test(self, job):
|
||||||
return not ( self.count_jobs() == 1 or self.count_slaves() == 1 or float(job.countSlaves() + 1) / self.count_slaves() <= self.limit )
|
return not ( self.count_jobs() == 1 or self.count_slaves() <= 1 or float(job.countSlaves() + 1) / self.count_slaves() <= self.limit )
|
||||||
|
@ -29,7 +29,7 @@ class MRenderSlave(netrender.model.RenderSlave):
|
|||||||
self.last_seen = time.time()
|
self.last_seen = time.time()
|
||||||
|
|
||||||
self.job = None
|
self.job = None
|
||||||
self.frame = None
|
self.job_frames = []
|
||||||
|
|
||||||
netrender.model.RenderSlave._slave_map[self.id] = self
|
netrender.model.RenderSlave._slave_map[self.id] = self
|
||||||
|
|
||||||
@ -50,6 +50,7 @@ class MRenderJob(netrender.model.RenderJob):
|
|||||||
self.last_dispatched = time.time()
|
self.last_dispatched = time.time()
|
||||||
|
|
||||||
# special server properties
|
# special server properties
|
||||||
|
self.usage = 0.0
|
||||||
self.last_update = 0
|
self.last_update = 0
|
||||||
self.save_path = ""
|
self.save_path = ""
|
||||||
self.files_map = {path: MRenderFile(path, start, end) for path, start, end in files}
|
self.files_map = {path: MRenderFile(path, start, end) for path, start, end in files}
|
||||||
@ -300,6 +301,9 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
|
|||||||
f.status = DISPATCHED
|
f.status = DISPATCHED
|
||||||
f.slave = slave
|
f.slave = slave
|
||||||
|
|
||||||
|
slave.job = job
|
||||||
|
slave.job_frames = [f.number for f in frames]
|
||||||
|
|
||||||
self.send_head(headers={"job-id": job.id})
|
self.send_head(headers={"job-id": job.id})
|
||||||
|
|
||||||
message = job.serialize(frames)
|
message = job.serialize(frames)
|
||||||
@ -536,7 +540,11 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
|
|||||||
elif job_result == ERROR:
|
elif job_result == ERROR:
|
||||||
# blacklist slave on this job on error
|
# blacklist slave on this job on error
|
||||||
job.blacklist.append(slave.id)
|
job.blacklist.append(slave.id)
|
||||||
|
|
||||||
|
slave.job_frames.remove(job_frame)
|
||||||
|
if not slave.job_frames:
|
||||||
|
slave.job = None
|
||||||
|
|
||||||
frame.status = job_result
|
frame.status = job_result
|
||||||
frame.time = job_time
|
frame.time = job_time
|
||||||
|
|
||||||
@ -590,6 +598,10 @@ class RenderMasterServer(http.server.HTTPServer):
|
|||||||
self.job_id = 0
|
self.job_id = 0
|
||||||
self.path = path + "master_" + str(os.getpid()) + os.sep
|
self.path = path + "master_" + str(os.getpid()) + os.sep
|
||||||
|
|
||||||
|
self.slave_timeout = 2
|
||||||
|
|
||||||
|
self.first_usage = True
|
||||||
|
|
||||||
self.balancer = netrender.balancing.Balancer()
|
self.balancer = netrender.balancing.Balancer()
|
||||||
self.balancer.addRule(netrender.balancing.RatingCredit())
|
self.balancer.addRule(netrender.balancing.RatingCredit())
|
||||||
self.balancer.addException(netrender.balancing.ExcludeQueuedEmptyJob())
|
self.balancer.addException(netrender.balancing.ExcludeQueuedEmptyJob())
|
||||||
@ -611,6 +623,10 @@ class RenderMasterServer(http.server.HTTPServer):
|
|||||||
|
|
||||||
return slave.id
|
return slave.id
|
||||||
|
|
||||||
|
def removeSlave(self, slave):
|
||||||
|
self.slaves.remove(slave)
|
||||||
|
self.slaves_map.pop(slave.id)
|
||||||
|
|
||||||
def getSlave(self, slave_id):
|
def getSlave(self, slave_id):
|
||||||
return self.slaves_map.get(slave_id, None)
|
return self.slaves_map.get(slave_id, None)
|
||||||
|
|
||||||
@ -621,9 +637,46 @@ class RenderMasterServer(http.server.HTTPServer):
|
|||||||
|
|
||||||
return slave
|
return slave
|
||||||
|
|
||||||
|
def timeoutSlaves(self):
|
||||||
|
removed = []
|
||||||
|
|
||||||
|
t = time.time()
|
||||||
|
|
||||||
|
for slave in self.slaves:
|
||||||
|
if (t - slave.last_seen) / 60 > self.slave_timeout:
|
||||||
|
removed.append(slave)
|
||||||
|
|
||||||
|
if slave.job:
|
||||||
|
for f in slave.job_frames:
|
||||||
|
slave.job[f].status = ERROR
|
||||||
|
|
||||||
|
for slave in removed:
|
||||||
|
self.removeSlave(slave)
|
||||||
|
|
||||||
|
def updateUsage(self):
|
||||||
|
m = 1.0
|
||||||
|
|
||||||
|
if not self.first_usage:
|
||||||
|
for job in self.jobs:
|
||||||
|
job.usage *= 0.5
|
||||||
|
|
||||||
|
m = 0.5
|
||||||
|
else:
|
||||||
|
self.first_usage = False
|
||||||
|
|
||||||
|
if self.slaves:
|
||||||
|
slave_usage = m / self.countSlaves()
|
||||||
|
|
||||||
|
for slave in self.slaves:
|
||||||
|
if slave.job:
|
||||||
|
slave.job.usage += slave_usage
|
||||||
|
|
||||||
|
|
||||||
def clear(self):
|
def clear(self):
|
||||||
self.jobs_map = {}
|
removed = self.jobs[:]
|
||||||
self.jobs = []
|
|
||||||
|
for job in removed:
|
||||||
|
self.removeJob(job)
|
||||||
|
|
||||||
def update(self):
|
def update(self):
|
||||||
for job in self.jobs:
|
for job in self.jobs:
|
||||||
@ -646,6 +699,11 @@ class RenderMasterServer(http.server.HTTPServer):
|
|||||||
|
|
||||||
if job:
|
if job:
|
||||||
self.jobs.remove(job)
|
self.jobs.remove(job)
|
||||||
|
|
||||||
|
for slave in self.slaves:
|
||||||
|
if slave.job == job:
|
||||||
|
slave.job = None
|
||||||
|
slave.job_frames = []
|
||||||
|
|
||||||
def addJob(self, job):
|
def addJob(self, job):
|
||||||
self.jobs.append(job)
|
self.jobs.append(job)
|
||||||
@ -687,8 +745,12 @@ def runMaster(address, broadcast, path, update_stats, test_break):
|
|||||||
while not test_break():
|
while not test_break():
|
||||||
httpd.handle_request()
|
httpd.handle_request()
|
||||||
|
|
||||||
if broadcast:
|
if time.time() - start_time >= 10: # need constant here
|
||||||
if time.time() - start_time >= 10: # need constant here
|
httpd.timeoutSlaves()
|
||||||
print("broadcasting address")
|
|
||||||
s.sendto(bytes("%i" % address[1], encoding='utf8'), 0, ('<broadcast>', 8000))
|
httpd.updateUsage()
|
||||||
start_time = time.time()
|
|
||||||
|
if broadcast:
|
||||||
|
print("broadcasting address")
|
||||||
|
s.sendto(bytes("%i" % address[1], encoding='utf8'), 0, ('<broadcast>', 8000))
|
||||||
|
start_time = time.time()
|
||||||
|
@ -42,23 +42,32 @@ def get(handler):
|
|||||||
output("<h2>Slaves</h2>")
|
output("<h2>Slaves</h2>")
|
||||||
|
|
||||||
startTable()
|
startTable()
|
||||||
headerTable("id", "name", "address", "stats")
|
headerTable("name", "address", "last seen", "stats", "job")
|
||||||
|
|
||||||
for slave in handler.server.slaves:
|
for slave in handler.server.slaves:
|
||||||
rowTable(slave.id, slave.name, slave.address[0], slave.stats)
|
rowTable(slave.name, slave.address[0], time.ctime(slave.last_seen), slave.stats, link(slave.job.name, "/html/job" + slave.job.id) if slave.job else "None")
|
||||||
|
|
||||||
endTable()
|
endTable()
|
||||||
|
|
||||||
output("<h2>Jobs</h2>")
|
output("<h2>Jobs</h2>")
|
||||||
|
|
||||||
startTable()
|
startTable()
|
||||||
headerTable("id", "name", "credits", "time since last", "length", "done", "dispatched", "error", "priority", "exception")
|
headerTable("name", "credits", "usage", "time since last", "length", "done", "dispatched", "error", "priority", "exception")
|
||||||
|
|
||||||
handler.server.update()
|
handler.server.update()
|
||||||
|
|
||||||
for job in handler.server.jobs:
|
for job in handler.server.jobs:
|
||||||
results = job.framesStatus()
|
results = job.framesStatus()
|
||||||
rowTable(link(job.id, "/html/job" + job.id), job.name, round(job.credits, 1), int(time.time() - job.last_dispatched), len(job), results[DONE], results[DISPATCHED], results[ERROR], handler.server.balancer.applyPriorities(job), handler.server.balancer.applyExceptions(job))
|
rowTable( link(job.name, "/html/job" + job.id),
|
||||||
|
round(job.credits, 1),
|
||||||
|
"%0.1f%%" % (job.usage * 100),
|
||||||
|
int(time.time() - job.last_dispatched),
|
||||||
|
len(job),
|
||||||
|
results[DONE],
|
||||||
|
results[DISPATCHED],
|
||||||
|
results[ERROR],
|
||||||
|
handler.server.balancer.applyPriorities(job), handler.server.balancer.applyExceptions(job)
|
||||||
|
)
|
||||||
|
|
||||||
endTable()
|
endTable()
|
||||||
|
|
||||||
|
@ -165,7 +165,7 @@ class SCENE_PT_network_jobs(RenderButtonsPanel):
|
|||||||
subcol = col.column(align=True)
|
subcol = col.column(align=True)
|
||||||
subcol.itemO("render.netclientstatus", icon="ICON_FILE_REFRESH", text="")
|
subcol.itemO("render.netclientstatus", icon="ICON_FILE_REFRESH", text="")
|
||||||
subcol.itemO("render.netclientcancel", icon="ICON_ZOOMOUT", text="")
|
subcol.itemO("render.netclientcancel", icon="ICON_ZOOMOUT", text="")
|
||||||
subcol.itemO("render.netclientcancelall", icon="ICON_ZOOMOUT", text="")
|
subcol.itemO("render.netclientcancelall", icon="ICON_PANEL_CLOSE", text="")
|
||||||
subcol.itemO("render.netclientdownload", icon='ICON_RENDER_ANIMATION', text="")
|
subcol.itemO("render.netclientdownload", icon='ICON_RENDER_ANIMATION', text="")
|
||||||
|
|
||||||
if len(bpy.data.netrender_jobs) == 0 and len(netsettings.jobs) > 0:
|
if len(bpy.data.netrender_jobs) == 0 and len(netsettings.jobs) > 0:
|
||||||
|
Loading…
Reference in New Issue
Block a user