forked from bartvdbraak/blender
netrender: first draft for job balancer + some minor fixes
This commit is contained in:
parent
8b4ad3584c
commit
b28109b442
@ -6,6 +6,7 @@ import client
|
||||
import slave
|
||||
import master
|
||||
import utils
|
||||
import balancing
|
||||
import ui
|
||||
|
||||
# store temp data in bpy module
|
||||
|
75
release/io/netrender/balancing.py
Normal file
75
release/io/netrender/balancing.py
Normal file
@ -0,0 +1,75 @@
|
||||
import time
|
||||
|
||||
from netrender.utils import *
|
||||
import netrender.model
|
||||
|
||||
class RatingRule:
|
||||
def rate(self, job):
|
||||
return 0
|
||||
|
||||
class ExclusionRule:
|
||||
def test(self, job):
|
||||
return False
|
||||
|
||||
class PriorityRule:
|
||||
def test(self, job):
|
||||
return False
|
||||
|
||||
class Balancer:
|
||||
def __init__(self):
|
||||
self.rules = []
|
||||
self.priorities = []
|
||||
self.exceptions = []
|
||||
|
||||
def addRule(self, rule):
|
||||
self.rules.append(rule)
|
||||
|
||||
def addPriority(self, priority):
|
||||
self.priorities.append(priority)
|
||||
|
||||
def addException(self, exception):
|
||||
self.exceptions.append(exception)
|
||||
|
||||
def applyRules(self, job):
|
||||
return sum((rule.rate(job) for rule in self.rules))
|
||||
|
||||
def applyPriorities(self, job):
|
||||
for priority in self.priorities:
|
||||
if priority.test(job):
|
||||
return True # priorities are first
|
||||
|
||||
return False
|
||||
|
||||
def applyExceptions(self, job):
|
||||
for exception in self.exceptions:
|
||||
if exception.test(job):
|
||||
return True # exceptions are last
|
||||
|
||||
return False
|
||||
|
||||
def sortKey(self, job):
|
||||
return (1 if self.applyExceptions(job) else 0, # exceptions after
|
||||
0 if self.applyPriorities(job) else 1, # priorities first
|
||||
self.applyRules(job))
|
||||
|
||||
def balance(self, jobs):
|
||||
if jobs:
|
||||
jobs.sort(key=self.sortKey)
|
||||
return jobs[0]
|
||||
else:
|
||||
return None
|
||||
|
||||
# ==========================
|
||||
|
||||
|
||||
class RatingCredit(RatingRule):
|
||||
def rate(self, job):
|
||||
return -job.credits # more credit is better (sort at first in list)
|
||||
|
||||
class NewJobPriority(PriorityRule):
|
||||
def test(self, job):
|
||||
return job.countFrames(status = DISPATCHED) == 0
|
||||
|
||||
class ExcludeQueuedEmptyJob(ExclusionRule):
|
||||
def test(self, job):
|
||||
return job.status != JOB_QUEUED or job.countFrames(status = QUEUED) == 0
|
@ -103,7 +103,7 @@ def clientSendJob(conn, scene, anim = False, chunks = 5):
|
||||
job.priority = netsettings.priority
|
||||
|
||||
# try to send path first
|
||||
conn.request("POST", "job", repr(job.serialize()))
|
||||
conn.request("POST", "/job", repr(job.serialize()))
|
||||
response = conn.getresponse()
|
||||
|
||||
job_id = response.getheader("job-id")
|
||||
@ -112,7 +112,7 @@ def clientSendJob(conn, scene, anim = False, chunks = 5):
|
||||
if response.status == http.client.ACCEPTED:
|
||||
for filepath, start, end in job.files:
|
||||
f = open(filepath, "rb")
|
||||
conn.request("PUT", "file", f, headers={"job-id": job_id, "job-file": filepath})
|
||||
conn.request("PUT", "/file", f, headers={"job-id": job_id, "job-file": filepath})
|
||||
f.close()
|
||||
response = conn.getresponse()
|
||||
|
||||
@ -121,7 +121,7 @@ def clientSendJob(conn, scene, anim = False, chunks = 5):
|
||||
return job_id
|
||||
|
||||
def requestResult(conn, job_id, frame):
|
||||
conn.request("GET", "render", headers={"job-id": job_id, "job-frame":str(frame)})
|
||||
conn.request("GET", "/render", headers={"job-id": job_id, "job-frame":str(frame)})
|
||||
|
||||
@rnaType
|
||||
class NetworkRenderEngine(bpy.types.RenderEngine):
|
||||
@ -174,7 +174,6 @@ class NetworkRenderEngine(bpy.types.RenderEngine):
|
||||
requestResult(conn, job_id, scene.current_frame)
|
||||
|
||||
while response.status == http.client.ACCEPTED and not self.test_break():
|
||||
print("waiting")
|
||||
time.sleep(1)
|
||||
requestResult(conn, job_id, scene.current_frame)
|
||||
response = conn.getresponse()
|
||||
|
@ -4,10 +4,7 @@ import subprocess, shutil, time, hashlib
|
||||
|
||||
from netrender.utils import *
|
||||
import netrender.model
|
||||
|
||||
JOB_WAITING = 0 # before all data has been entered
|
||||
JOB_PAUSED = 1 # paused by user
|
||||
JOB_QUEUED = 2 # ready to be dispatched
|
||||
import netrender.balancing
|
||||
|
||||
class MRenderFile:
|
||||
def __init__(self, filepath, start, end):
|
||||
@ -38,10 +35,6 @@ class MRenderSlave(netrender.model.RenderSlave):
|
||||
def seen(self):
|
||||
self.last_seen = time.time()
|
||||
|
||||
# sorting key for jobs
|
||||
def groupKey(job):
|
||||
return (job.status, job.framesLeft() > 0, job.priority, job.credits)
|
||||
|
||||
class MRenderJob(netrender.model.RenderJob):
|
||||
def __init__(self, job_id, name, files, chunks = 1, priority = 1, credits = 100.0, blacklist = []):
|
||||
super().__init__()
|
||||
@ -95,14 +88,6 @@ class MRenderJob(netrender.model.RenderJob):
|
||||
frame = MRenderFrame(frame_number)
|
||||
self.frames.append(frame)
|
||||
return frame
|
||||
|
||||
def framesLeft(self):
|
||||
total = 0
|
||||
for j in self.frames:
|
||||
if j.status == QUEUED:
|
||||
total += 1
|
||||
|
||||
return total
|
||||
|
||||
def reset(self, all):
|
||||
for f in self.frames:
|
||||
@ -153,7 +138,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
|
||||
def do_HEAD(self):
|
||||
print(self.path)
|
||||
|
||||
if self.path == "status":
|
||||
if self.path == "/status":
|
||||
job_id = self.headers.get('job-id', "")
|
||||
job_frame = int(self.headers.get('job-frame', -1))
|
||||
|
||||
@ -185,12 +170,12 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
|
||||
def do_GET(self):
|
||||
print(self.path)
|
||||
|
||||
if self.path == "version":
|
||||
if self.path == "/version":
|
||||
self.send_head()
|
||||
self.server.stats("", "New client connection")
|
||||
self.wfile.write(VERSION)
|
||||
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
||||
elif self.path == "render":
|
||||
elif self.path == "/render":
|
||||
job_id = self.headers['job-id']
|
||||
job_frame = int(self.headers['job-frame'])
|
||||
print("render:", job_id, job_frame)
|
||||
@ -221,7 +206,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
|
||||
# no such job id
|
||||
self.send_head(http.client.NO_CONTENT)
|
||||
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
||||
elif self.path == "log":
|
||||
elif self.path == "/log":
|
||||
job_id = self.headers['job-id']
|
||||
job_frame = int(self.headers['job-frame'])
|
||||
print("log:", job_id, job_frame)
|
||||
@ -250,7 +235,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
|
||||
# no such job id
|
||||
self.send_head(http.client.NO_CONTENT)
|
||||
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
||||
elif self.path == "status":
|
||||
elif self.path == "/status":
|
||||
job_id = self.headers.get('job-id', "")
|
||||
job_frame = int(self.headers.get('job-frame', -1))
|
||||
|
||||
@ -284,7 +269,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
|
||||
self.wfile.write(bytes(repr(message), encoding='utf8'))
|
||||
|
||||
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
||||
elif self.path == "job":
|
||||
elif self.path == "/job":
|
||||
self.server.update()
|
||||
|
||||
slave_id = self.headers['slave-id']
|
||||
@ -315,7 +300,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
|
||||
else: # invalid slave id
|
||||
self.send_head(http.client.NO_CONTENT)
|
||||
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
||||
elif self.path == "file":
|
||||
elif self.path == "/file":
|
||||
slave_id = self.headers['slave-id']
|
||||
|
||||
slave = self.server.updateSlave(slave_id)
|
||||
@ -348,7 +333,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
|
||||
else: # invalid slave id
|
||||
self.send_head(http.client.NO_CONTENT)
|
||||
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
||||
elif self.path == "slave":
|
||||
elif self.path == "/slave":
|
||||
message = []
|
||||
|
||||
for slave in self.server.slaves:
|
||||
@ -368,7 +353,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
|
||||
print(self.path)
|
||||
|
||||
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
||||
if self.path == "job":
|
||||
if self.path == "/job":
|
||||
print("posting job info")
|
||||
self.server.stats("", "Receiving job")
|
||||
|
||||
@ -394,7 +379,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
|
||||
else:
|
||||
self.send_head(http.client.ACCEPTED, headers=headers)
|
||||
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
||||
elif self.path == "cancel":
|
||||
elif self.path == "/cancel":
|
||||
job_id = self.headers.get('job-id', "")
|
||||
if job_id:
|
||||
print("cancel:", job_id, "\n")
|
||||
@ -404,7 +389,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
|
||||
|
||||
self.send_head()
|
||||
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
||||
elif self.path == "reset":
|
||||
elif self.path == "/reset":
|
||||
job_id = self.headers.get('job-id', "")
|
||||
job_frame = int(self.headers.get('job-frame', "-1"))
|
||||
all = bool(self.headers.get('reset-all', "False"))
|
||||
@ -421,7 +406,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
|
||||
else: # job not found
|
||||
self.send_head(http.client.NO_CONTENT)
|
||||
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
||||
elif self.path == "slave":
|
||||
elif self.path == "/slave":
|
||||
length = int(self.headers['content-length'])
|
||||
job_frame_string = self.headers['job-frame']
|
||||
|
||||
@ -431,7 +416,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
|
||||
|
||||
self.send_head(headers = {"slave-id": slave_id})
|
||||
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
||||
elif self.path == "log":
|
||||
elif self.path == "/log":
|
||||
slave_id = self.headers['slave-id']
|
||||
|
||||
slave = self.server.updateSlave(slave_id)
|
||||
@ -460,7 +445,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
|
||||
print(self.path)
|
||||
|
||||
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
||||
if self.path == "file":
|
||||
if self.path == "/file":
|
||||
print("writing blend file")
|
||||
self.server.stats("", "Receiving job")
|
||||
|
||||
@ -504,7 +489,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
|
||||
else: # job not found
|
||||
self.send_head(http.client.NO_CONTENT)
|
||||
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
||||
elif self.path == "render":
|
||||
elif self.path == "/render":
|
||||
print("writing result file")
|
||||
self.server.stats("", "Receiving render result")
|
||||
|
||||
@ -547,7 +532,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
|
||||
else: # invalid slave id
|
||||
self.send_head(http.client.NO_CONTENT)
|
||||
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
||||
elif self.path == "log":
|
||||
elif self.path == "/log":
|
||||
print("writing log file")
|
||||
self.server.stats("", "Receiving log file")
|
||||
|
||||
@ -587,6 +572,11 @@ class RenderMasterServer(http.server.HTTPServer):
|
||||
self.job_id = 0
|
||||
self.path = path + "master_" + str(os.getpid()) + os.sep
|
||||
|
||||
self.balancer = netrender.balancing.Balancer()
|
||||
self.balancer.addRule(netrender.balancing.RatingCredit())
|
||||
self.balancer.addException(netrender.balancing.ExcludeQueuedEmptyJob())
|
||||
self.balancer.addPriority(netrender.balancing.NewJobPriority())
|
||||
|
||||
if not os.path.exists(self.path):
|
||||
os.mkdir(self.path)
|
||||
|
||||
@ -616,7 +606,7 @@ class RenderMasterServer(http.server.HTTPServer):
|
||||
self.jobs = []
|
||||
|
||||
def update(self):
|
||||
self.jobs.sort(key = groupKey)
|
||||
self.balancer.balance(self.jobs)
|
||||
|
||||
def removeJob(self, id):
|
||||
job = self.jobs_map.pop(id)
|
||||
@ -644,8 +634,8 @@ class RenderMasterServer(http.server.HTTPServer):
|
||||
|
||||
def getNewJob(self, slave_id):
|
||||
if self.jobs:
|
||||
for job in reversed(self.jobs):
|
||||
if job.status == JOB_QUEUED and job.framesLeft() > 0 and slave_id not in job.blacklist:
|
||||
for job in self.jobs:
|
||||
if not self.balancer.applyExceptions(job) and slave_id not in job.blacklist:
|
||||
return job, job.getFrames()
|
||||
|
||||
return None, None
|
||||
|
@ -95,6 +95,18 @@ class RenderJob:
|
||||
def __len__(self):
|
||||
return len(self.frames)
|
||||
|
||||
def countFrames(self, status=QUEUED):
|
||||
total = 0
|
||||
for j in self.frames:
|
||||
if j.status == status:
|
||||
total += 1
|
||||
|
||||
return total
|
||||
|
||||
def countSlaves(self):
|
||||
return len(set((frame.slave for frame in self.frames if frame.status == DISPATCHED)))
|
||||
|
||||
|
||||
def framesStatus(self):
|
||||
results = {
|
||||
QUEUED: 0,
|
||||
|
@ -89,7 +89,7 @@ class RENDER_OT_netclientstatus(bpy.types.Operator):
|
||||
conn = clientConnection(context.scene)
|
||||
|
||||
if conn:
|
||||
conn.request("GET", "status")
|
||||
conn.request("GET", "/status")
|
||||
|
||||
response = conn.getresponse()
|
||||
print( response.status, response.reason )
|
||||
@ -205,7 +205,7 @@ class RENDER_OT_netclientslaves(bpy.types.Operator):
|
||||
conn = clientConnection(context.scene)
|
||||
|
||||
if conn:
|
||||
conn.request("GET", "slave")
|
||||
conn.request("GET", "/slave")
|
||||
|
||||
response = conn.getresponse()
|
||||
print( response.status, response.reason )
|
||||
@ -258,7 +258,7 @@ class RENDER_OT_netclientcancel(bpy.types.Operator):
|
||||
if conn:
|
||||
job = bpy.data.netrender_jobs[netsettings.active_job_index]
|
||||
|
||||
conn.request("POST", "cancel", headers={"job-id":job.id})
|
||||
conn.request("POST", "/cancel", headers={"job-id":job.id})
|
||||
|
||||
response = conn.getresponse()
|
||||
print( response.status, response.reason )
|
||||
|
@ -33,7 +33,7 @@ def slave_Info():
|
||||
return slave
|
||||
|
||||
def testCancel(conn, job_id):
|
||||
conn.request("HEAD", "status", headers={"job-id":job_id})
|
||||
conn.request("HEAD", "/status", headers={"job-id":job_id})
|
||||
response = conn.getresponse()
|
||||
|
||||
# cancelled if job isn't found anymore
|
||||
@ -47,7 +47,7 @@ def testFile(conn, job_id, slave_id, JOB_PREFIX, file_path, main_path = None):
|
||||
|
||||
if not os.path.exists(job_full_path):
|
||||
temp_path = JOB_PREFIX + "slave.temp.blend"
|
||||
conn.request("GET", "file", headers={"job-id": job_id, "slave-id":slave_id, "job-file":file_path})
|
||||
conn.request("GET", "/file", headers={"job-id": job_id, "slave-id":slave_id, "job-file":file_path})
|
||||
response = conn.getresponse()
|
||||
|
||||
if response.status != http.client.OK:
|
||||
@ -76,7 +76,7 @@ def render_slave(engine, scene):
|
||||
conn = clientConnection(scene)
|
||||
|
||||
if conn:
|
||||
conn.request("POST", "slave", repr(slave_Info().serialize()))
|
||||
conn.request("POST", "/slave", repr(slave_Info().serialize()))
|
||||
response = conn.getresponse()
|
||||
|
||||
slave_id = response.getheader("slave-id")
|
||||
@ -87,7 +87,7 @@ def render_slave(engine, scene):
|
||||
|
||||
while not engine.test_break():
|
||||
|
||||
conn.request("GET", "job", headers={"slave-id":slave_id})
|
||||
conn.request("GET", "/job", headers={"slave-id":slave_id})
|
||||
response = conn.getresponse()
|
||||
|
||||
if response.status == http.client.OK:
|
||||
@ -119,7 +119,7 @@ def render_slave(engine, scene):
|
||||
|
||||
# announce log to master
|
||||
logfile = netrender.model.LogFile(job.id, [frame.number for frame in job.frames])
|
||||
conn.request("POST", "log", bytes(repr(logfile.serialize()), encoding='utf8'), headers={"slave-id":slave_id})
|
||||
conn.request("POST", "/log", bytes(repr(logfile.serialize()), encoding='utf8'), headers={"slave-id":slave_id})
|
||||
response = conn.getresponse()
|
||||
|
||||
first_frame = job.frames[0].number
|
||||
@ -146,7 +146,7 @@ def render_slave(engine, scene):
|
||||
if stdout:
|
||||
# (only need to update on one frame, they are linked
|
||||
headers["job-frame"] = str(first_frame)
|
||||
conn.request("PUT", "log", stdout, headers=headers)
|
||||
conn.request("PUT", "/log", stdout, headers=headers)
|
||||
response = conn.getresponse()
|
||||
|
||||
stdout = bytes()
|
||||
@ -173,7 +173,7 @@ def render_slave(engine, scene):
|
||||
if stdout:
|
||||
# (only need to update on one frame, they are linked
|
||||
headers["job-frame"] = str(first_frame)
|
||||
conn.request("PUT", "log", stdout, headers=headers)
|
||||
conn.request("PUT", "/log", stdout, headers=headers)
|
||||
response = conn.getresponse()
|
||||
|
||||
headers = {"job-id":job.id, "slave-id":slave_id, "job-time":str(avg_t)}
|
||||
@ -184,7 +184,7 @@ def render_slave(engine, scene):
|
||||
headers["job-frame"] = str(frame.number)
|
||||
# send result back to server
|
||||
f = open(JOB_PREFIX + "%06d" % frame.number + ".exr", 'rb')
|
||||
conn.request("PUT", "render", f, headers=headers)
|
||||
conn.request("PUT", "/render", f, headers=headers)
|
||||
f.close()
|
||||
response = conn.getresponse()
|
||||
else:
|
||||
@ -192,7 +192,7 @@ def render_slave(engine, scene):
|
||||
for frame in job.frames:
|
||||
headers["job-frame"] = str(frame.number)
|
||||
# send error result back to server
|
||||
conn.request("PUT", "render", headers=headers)
|
||||
conn.request("PUT", "/render", headers=headers)
|
||||
response = conn.getresponse()
|
||||
else:
|
||||
if timeout < MAX_TIMEOUT:
|
||||
|
@ -8,6 +8,12 @@ import netrender.model
|
||||
|
||||
VERSION = b"0.5"
|
||||
|
||||
# Jobs status
|
||||
JOB_WAITING = 0 # before all data has been entered
|
||||
JOB_PAUSED = 1 # paused by user
|
||||
JOB_QUEUED = 2 # ready to be dispatched
|
||||
|
||||
# Frames status
|
||||
QUEUED = 0
|
||||
DISPATCHED = 1
|
||||
DONE = 2
|
||||
@ -36,7 +42,7 @@ def clientConnection(scene):
|
||||
return None
|
||||
|
||||
def clientVerifyVersion(conn):
|
||||
conn.request("GET", "version")
|
||||
conn.request("GET", "/version")
|
||||
response = conn.getresponse()
|
||||
|
||||
if response.status != http.client.OK:
|
||||
|
Loading…
Reference in New Issue
Block a user