From b28109b442f5e87edf865c6bcbdb8f53665cdef5 Mon Sep 17 00:00:00 2001 From: Martin Poirier Date: Sat, 19 Sep 2009 22:11:26 +0000 Subject: [PATCH] netrender: first draft for job balancer + some minor fixes --- release/io/netrender/__init__.py | 1 + release/io/netrender/balancing.py | 75 +++++++++++++++++++++++++++++++ release/io/netrender/client.py | 7 ++- release/io/netrender/master.py | 60 +++++++++++-------------- release/io/netrender/model.py | 12 +++++ release/io/netrender/operators.py | 6 +-- release/io/netrender/slave.py | 18 ++++---- release/io/netrender/utils.py | 8 +++- 8 files changed, 135 insertions(+), 52 deletions(-) create mode 100644 release/io/netrender/balancing.py diff --git a/release/io/netrender/__init__.py b/release/io/netrender/__init__.py index 1eb91abb938..b313d64ccbb 100644 --- a/release/io/netrender/__init__.py +++ b/release/io/netrender/__init__.py @@ -6,6 +6,7 @@ import client import slave import master import utils +import balancing import ui # store temp data in bpy module diff --git a/release/io/netrender/balancing.py b/release/io/netrender/balancing.py new file mode 100644 index 00000000000..89e1e3f7b06 --- /dev/null +++ b/release/io/netrender/balancing.py @@ -0,0 +1,75 @@ +import time + +from netrender.utils import * +import netrender.model + +class RatingRule: + def rate(self, job): + return 0 + +class ExclusionRule: + def test(self, job): + return False + +class PriorityRule: + def test(self, job): + return False + +class Balancer: + def __init__(self): + self.rules = [] + self.priorities = [] + self.exceptions = [] + + def addRule(self, rule): + self.rules.append(rule) + + def addPriority(self, priority): + self.priorities.append(priority) + + def addException(self, exception): + self.exceptions.append(exception) + + def applyRules(self, job): + return sum((rule.rate(job) for rule in self.rules)) + + def applyPriorities(self, job): + for priority in self.priorities: + if priority.test(job): + return True # priorities are first + + return False + + def applyExceptions(self, job): + for exception in self.exceptions: + if exception.test(job): + return True # exceptions are last + + return False + + def sortKey(self, job): + return (1 if self.applyExceptions(job) else 0, # exceptions after + 0 if self.applyPriorities(job) else 1, # priorities first + self.applyRules(job)) + + def balance(self, jobs): + if jobs: + jobs.sort(key=self.sortKey) + return jobs[0] + else: + return None + +# ========================== + + +class RatingCredit(RatingRule): + def rate(self, job): + return -job.credits # more credit is better (sort at first in list) + +class NewJobPriority(PriorityRule): + def test(self, job): + return job.countFrames(status = DISPATCHED) == 0 + +class ExcludeQueuedEmptyJob(ExclusionRule): + def test(self, job): + return job.status != JOB_QUEUED or job.countFrames(status = QUEUED) == 0 diff --git a/release/io/netrender/client.py b/release/io/netrender/client.py index a6cfb4e020d..f445fe2f608 100644 --- a/release/io/netrender/client.py +++ b/release/io/netrender/client.py @@ -103,7 +103,7 @@ def clientSendJob(conn, scene, anim = False, chunks = 5): job.priority = netsettings.priority # try to send path first - conn.request("POST", "job", repr(job.serialize())) + conn.request("POST", "/job", repr(job.serialize())) response = conn.getresponse() job_id = response.getheader("job-id") @@ -112,7 +112,7 @@ def clientSendJob(conn, scene, anim = False, chunks = 5): if response.status == http.client.ACCEPTED: for filepath, start, end in job.files: f = open(filepath, "rb") - conn.request("PUT", "file", f, headers={"job-id": job_id, "job-file": filepath}) + conn.request("PUT", "/file", f, headers={"job-id": job_id, "job-file": filepath}) f.close() response = conn.getresponse() @@ -121,7 +121,7 @@ def clientSendJob(conn, scene, anim = False, chunks = 5): return job_id def requestResult(conn, job_id, frame): - conn.request("GET", "render", headers={"job-id": job_id, "job-frame":str(frame)}) + conn.request("GET", "/render", headers={"job-id": job_id, "job-frame":str(frame)}) @rnaType class NetworkRenderEngine(bpy.types.RenderEngine): @@ -174,7 +174,6 @@ class NetworkRenderEngine(bpy.types.RenderEngine): requestResult(conn, job_id, scene.current_frame) while response.status == http.client.ACCEPTED and not self.test_break(): - print("waiting") time.sleep(1) requestResult(conn, job_id, scene.current_frame) response = conn.getresponse() diff --git a/release/io/netrender/master.py b/release/io/netrender/master.py index 58af47d6240..58c6c1b2d00 100644 --- a/release/io/netrender/master.py +++ b/release/io/netrender/master.py @@ -4,10 +4,7 @@ import subprocess, shutil, time, hashlib from netrender.utils import * import netrender.model - -JOB_WAITING = 0 # before all data has been entered -JOB_PAUSED = 1 # paused by user -JOB_QUEUED = 2 # ready to be dispatched +import netrender.balancing class MRenderFile: def __init__(self, filepath, start, end): @@ -38,10 +35,6 @@ class MRenderSlave(netrender.model.RenderSlave): def seen(self): self.last_seen = time.time() -# sorting key for jobs -def groupKey(job): - return (job.status, job.framesLeft() > 0, job.priority, job.credits) - class MRenderJob(netrender.model.RenderJob): def __init__(self, job_id, name, files, chunks = 1, priority = 1, credits = 100.0, blacklist = []): super().__init__() @@ -95,14 +88,6 @@ class MRenderJob(netrender.model.RenderJob): frame = MRenderFrame(frame_number) self.frames.append(frame) return frame - - def framesLeft(self): - total = 0 - for j in self.frames: - if j.status == QUEUED: - total += 1 - - return total def reset(self, all): for f in self.frames: @@ -153,7 +138,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): def do_HEAD(self): print(self.path) - if self.path == "status": + if self.path == "/status": job_id = self.headers.get('job-id', "") job_frame = int(self.headers.get('job-frame', -1)) @@ -185,12 +170,12 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): def do_GET(self): print(self.path) - if self.path == "version": + if self.path == "/version": self.send_head() self.server.stats("", "New client connection") self.wfile.write(VERSION) # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- - elif self.path == "render": + elif self.path == "/render": job_id = self.headers['job-id'] job_frame = int(self.headers['job-frame']) print("render:", job_id, job_frame) @@ -221,7 +206,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): # no such job id self.send_head(http.client.NO_CONTENT) # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- - elif self.path == "log": + elif self.path == "/log": job_id = self.headers['job-id'] job_frame = int(self.headers['job-frame']) print("log:", job_id, job_frame) @@ -250,7 +235,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): # no such job id self.send_head(http.client.NO_CONTENT) # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- - elif self.path == "status": + elif self.path == "/status": job_id = self.headers.get('job-id', "") job_frame = int(self.headers.get('job-frame', -1)) @@ -284,7 +269,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): self.wfile.write(bytes(repr(message), encoding='utf8')) # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- - elif self.path == "job": + elif self.path == "/job": self.server.update() slave_id = self.headers['slave-id'] @@ -315,7 +300,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): else: # invalid slave id self.send_head(http.client.NO_CONTENT) # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- - elif self.path == "file": + elif self.path == "/file": slave_id = self.headers['slave-id'] slave = self.server.updateSlave(slave_id) @@ -348,7 +333,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): else: # invalid slave id self.send_head(http.client.NO_CONTENT) # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- - elif self.path == "slave": + elif self.path == "/slave": message = [] for slave in self.server.slaves: @@ -368,7 +353,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): print(self.path) # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- - if self.path == "job": + if self.path == "/job": print("posting job info") self.server.stats("", "Receiving job") @@ -394,7 +379,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): else: self.send_head(http.client.ACCEPTED, headers=headers) # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- - elif self.path == "cancel": + elif self.path == "/cancel": job_id = self.headers.get('job-id', "") if job_id: print("cancel:", job_id, "\n") @@ -404,7 +389,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): self.send_head() # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- - elif self.path == "reset": + elif self.path == "/reset": job_id = self.headers.get('job-id', "") job_frame = int(self.headers.get('job-frame', "-1")) all = bool(self.headers.get('reset-all', "False")) @@ -421,7 +406,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): else: # job not found self.send_head(http.client.NO_CONTENT) # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- - elif self.path == "slave": + elif self.path == "/slave": length = int(self.headers['content-length']) job_frame_string = self.headers['job-frame'] @@ -431,7 +416,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): self.send_head(headers = {"slave-id": slave_id}) # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- - elif self.path == "log": + elif self.path == "/log": slave_id = self.headers['slave-id'] slave = self.server.updateSlave(slave_id) @@ -460,7 +445,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): print(self.path) # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- - if self.path == "file": + if self.path == "/file": print("writing blend file") self.server.stats("", "Receiving job") @@ -504,7 +489,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): else: # job not found self.send_head(http.client.NO_CONTENT) # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- - elif self.path == "render": + elif self.path == "/render": print("writing result file") self.server.stats("", "Receiving render result") @@ -547,7 +532,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): else: # invalid slave id self.send_head(http.client.NO_CONTENT) # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- - elif self.path == "log": + elif self.path == "/log": print("writing log file") self.server.stats("", "Receiving log file") @@ -587,6 +572,11 @@ class RenderMasterServer(http.server.HTTPServer): self.job_id = 0 self.path = path + "master_" + str(os.getpid()) + os.sep + self.balancer = netrender.balancing.Balancer() + self.balancer.addRule(netrender.balancing.RatingCredit()) + self.balancer.addException(netrender.balancing.ExcludeQueuedEmptyJob()) + self.balancer.addPriority(netrender.balancing.NewJobPriority()) + if not os.path.exists(self.path): os.mkdir(self.path) @@ -616,7 +606,7 @@ class RenderMasterServer(http.server.HTTPServer): self.jobs = [] def update(self): - self.jobs.sort(key = groupKey) + self.balancer.balance(self.jobs) def removeJob(self, id): job = self.jobs_map.pop(id) @@ -644,8 +634,8 @@ class RenderMasterServer(http.server.HTTPServer): def getNewJob(self, slave_id): if self.jobs: - for job in reversed(self.jobs): - if job.status == JOB_QUEUED and job.framesLeft() > 0 and slave_id not in job.blacklist: + for job in self.jobs: + if not self.balancer.applyExceptions(job) and slave_id not in job.blacklist: return job, job.getFrames() return None, None diff --git a/release/io/netrender/model.py b/release/io/netrender/model.py index 924493fd34a..9a6645e79cf 100644 --- a/release/io/netrender/model.py +++ b/release/io/netrender/model.py @@ -95,6 +95,18 @@ class RenderJob: def __len__(self): return len(self.frames) + def countFrames(self, status=QUEUED): + total = 0 + for j in self.frames: + if j.status == status: + total += 1 + + return total + + def countSlaves(self): + return len(set((frame.slave for frame in self.frames if frame.status == DISPATCHED))) + + def framesStatus(self): results = { QUEUED: 0, diff --git a/release/io/netrender/operators.py b/release/io/netrender/operators.py index e6888731437..bfc67c25285 100644 --- a/release/io/netrender/operators.py +++ b/release/io/netrender/operators.py @@ -89,7 +89,7 @@ class RENDER_OT_netclientstatus(bpy.types.Operator): conn = clientConnection(context.scene) if conn: - conn.request("GET", "status") + conn.request("GET", "/status") response = conn.getresponse() print( response.status, response.reason ) @@ -205,7 +205,7 @@ class RENDER_OT_netclientslaves(bpy.types.Operator): conn = clientConnection(context.scene) if conn: - conn.request("GET", "slave") + conn.request("GET", "/slave") response = conn.getresponse() print( response.status, response.reason ) @@ -258,7 +258,7 @@ class RENDER_OT_netclientcancel(bpy.types.Operator): if conn: job = bpy.data.netrender_jobs[netsettings.active_job_index] - conn.request("POST", "cancel", headers={"job-id":job.id}) + conn.request("POST", "/cancel", headers={"job-id":job.id}) response = conn.getresponse() print( response.status, response.reason ) diff --git a/release/io/netrender/slave.py b/release/io/netrender/slave.py index ecdbf69591a..406b987c990 100644 --- a/release/io/netrender/slave.py +++ b/release/io/netrender/slave.py @@ -33,7 +33,7 @@ def slave_Info(): return slave def testCancel(conn, job_id): - conn.request("HEAD", "status", headers={"job-id":job_id}) + conn.request("HEAD", "/status", headers={"job-id":job_id}) response = conn.getresponse() # cancelled if job isn't found anymore @@ -47,7 +47,7 @@ def testFile(conn, job_id, slave_id, JOB_PREFIX, file_path, main_path = None): if not os.path.exists(job_full_path): temp_path = JOB_PREFIX + "slave.temp.blend" - conn.request("GET", "file", headers={"job-id": job_id, "slave-id":slave_id, "job-file":file_path}) + conn.request("GET", "/file", headers={"job-id": job_id, "slave-id":slave_id, "job-file":file_path}) response = conn.getresponse() if response.status != http.client.OK: @@ -76,7 +76,7 @@ def render_slave(engine, scene): conn = clientConnection(scene) if conn: - conn.request("POST", "slave", repr(slave_Info().serialize())) + conn.request("POST", "/slave", repr(slave_Info().serialize())) response = conn.getresponse() slave_id = response.getheader("slave-id") @@ -87,7 +87,7 @@ def render_slave(engine, scene): while not engine.test_break(): - conn.request("GET", "job", headers={"slave-id":slave_id}) + conn.request("GET", "/job", headers={"slave-id":slave_id}) response = conn.getresponse() if response.status == http.client.OK: @@ -119,7 +119,7 @@ def render_slave(engine, scene): # announce log to master logfile = netrender.model.LogFile(job.id, [frame.number for frame in job.frames]) - conn.request("POST", "log", bytes(repr(logfile.serialize()), encoding='utf8'), headers={"slave-id":slave_id}) + conn.request("POST", "/log", bytes(repr(logfile.serialize()), encoding='utf8'), headers={"slave-id":slave_id}) response = conn.getresponse() first_frame = job.frames[0].number @@ -146,7 +146,7 @@ def render_slave(engine, scene): if stdout: # (only need to update on one frame, they are linked headers["job-frame"] = str(first_frame) - conn.request("PUT", "log", stdout, headers=headers) + conn.request("PUT", "/log", stdout, headers=headers) response = conn.getresponse() stdout = bytes() @@ -173,7 +173,7 @@ def render_slave(engine, scene): if stdout: # (only need to update on one frame, they are linked headers["job-frame"] = str(first_frame) - conn.request("PUT", "log", stdout, headers=headers) + conn.request("PUT", "/log", stdout, headers=headers) response = conn.getresponse() headers = {"job-id":job.id, "slave-id":slave_id, "job-time":str(avg_t)} @@ -184,7 +184,7 @@ def render_slave(engine, scene): headers["job-frame"] = str(frame.number) # send result back to server f = open(JOB_PREFIX + "%06d" % frame.number + ".exr", 'rb') - conn.request("PUT", "render", f, headers=headers) + conn.request("PUT", "/render", f, headers=headers) f.close() response = conn.getresponse() else: @@ -192,7 +192,7 @@ def render_slave(engine, scene): for frame in job.frames: headers["job-frame"] = str(frame.number) # send error result back to server - conn.request("PUT", "render", headers=headers) + conn.request("PUT", "/render", headers=headers) response = conn.getresponse() else: if timeout < MAX_TIMEOUT: diff --git a/release/io/netrender/utils.py b/release/io/netrender/utils.py index 46c2011b188..50ca08d1723 100644 --- a/release/io/netrender/utils.py +++ b/release/io/netrender/utils.py @@ -8,6 +8,12 @@ import netrender.model VERSION = b"0.5" +# Jobs status +JOB_WAITING = 0 # before all data has been entered +JOB_PAUSED = 1 # paused by user +JOB_QUEUED = 2 # ready to be dispatched + +# Frames status QUEUED = 0 DISPATCHED = 1 DONE = 2 @@ -36,7 +42,7 @@ def clientConnection(scene): return None def clientVerifyVersion(conn): - conn.request("GET", "version") + conn.request("GET", "/version") response = conn.getresponse() if response.status != http.client.OK: