forked from bartvdbraak/blender
c0bae16dad
Downloading results for jobs from blender now uses the current output settings, it doesn't just download the multilayer exr as it used to. Render output panel now visible under the jobs panel in client mode.
1080 lines
39 KiB
Python
1080 lines
39 KiB
Python
# ##### BEGIN GPL LICENSE BLOCK #####
|
|
#
|
|
# This program is free software; you can redistribute it and/or
|
|
# modify it under the terms of the GNU General Public License
|
|
# as published by the Free Software Foundation; either version 2
|
|
# of the License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, write to the Free Software Foundation,
|
|
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|
#
|
|
# ##### END GPL LICENSE BLOCK #####
|
|
|
|
import sys, os
|
|
import http, http.client, http.server, urllib, socket, socketserver, threading
|
|
import subprocess, shutil, time, hashlib
|
|
import pickle
|
|
import select # for select.error
|
|
import json
|
|
|
|
from netrender.utils import *
|
|
import netrender.model
|
|
import netrender.balancing
|
|
import netrender.master_html
|
|
import netrender.thumbnail as thumbnail
|
|
|
|
class MRenderFile(netrender.model.RenderFile):
|
|
def __init__(self, filepath, index, start, end, signature):
|
|
super().__init__(filepath, index, start, end, signature)
|
|
self.found = False
|
|
|
|
def test(self):
|
|
self.found = os.path.exists(self.filepath)
|
|
if self.found and self.signature != None:
|
|
found_signature = hashFile(self.filepath)
|
|
self.found = self.signature == found_signature
|
|
|
|
return self.found
|
|
|
|
|
|
class MRenderSlave(netrender.model.RenderSlave):
|
|
def __init__(self, name, address, stats):
|
|
super().__init__()
|
|
self.id = hashlib.md5(bytes(repr(name) + repr(address), encoding='utf8')).hexdigest()
|
|
self.name = name
|
|
self.address = address
|
|
self.stats = stats
|
|
self.last_seen = time.time()
|
|
|
|
self.job = None
|
|
self.job_frames = []
|
|
|
|
netrender.model.RenderSlave._slave_map[self.id] = self
|
|
|
|
def seen(self):
|
|
self.last_seen = time.time()
|
|
|
|
def finishedFrame(self, frame_number):
|
|
self.job_frames.remove(frame_number)
|
|
if not self.job_frames:
|
|
self.job = None
|
|
|
|
class MRenderJob(netrender.model.RenderJob):
|
|
def __init__(self, job_id, job_info):
|
|
super().__init__(job_info)
|
|
self.id = job_id
|
|
self.last_dispatched = time.time()
|
|
|
|
# force one chunk for process jobs
|
|
if self.type == netrender.model.JOB_PROCESS:
|
|
self.chunks = 1
|
|
|
|
# Force WAITING status on creation
|
|
self.status = JOB_WAITING
|
|
|
|
# special server properties
|
|
self.last_update = 0
|
|
self.save_path = ""
|
|
self.files = [MRenderFile(rfile.filepath, rfile.index, rfile.start, rfile.end, rfile.signature) for rfile in job_info.files]
|
|
|
|
def initInfo(self):
|
|
if not self.resolution:
|
|
self.resolution = tuple(getFileInfo(self.files[0].filepath, ["bpy.context.scene.render.resolution_x", "bpy.context.scene.render.resolution_y", "bpy.context.scene.render.resolution_percentage"]))
|
|
|
|
def save(self):
|
|
if self.save_path:
|
|
f = open(os.path.join(self.save_path, "job.txt"), "w")
|
|
f.write(json.dumps(self.serialize()))
|
|
f.close()
|
|
|
|
def edit(self, info_map):
|
|
if "status" in info_map:
|
|
self.status = info_map["status"]
|
|
|
|
if "priority" in info_map:
|
|
self.priority = info_map["priority"]
|
|
|
|
if "chunks" in info_map:
|
|
self.chunks = info_map["chunks"]
|
|
|
|
def testStart(self):
|
|
# Don't test files for versionned jobs
|
|
if not self.version_info:
|
|
for f in self.files:
|
|
if not f.test():
|
|
return False
|
|
|
|
self.start()
|
|
self.initInfo()
|
|
return True
|
|
|
|
def testFinished(self):
|
|
for f in self.frames:
|
|
if f.status == QUEUED or f.status == DISPATCHED:
|
|
break
|
|
else:
|
|
self.status = JOB_FINISHED
|
|
|
|
def pause(self, status = None):
|
|
if self.status not in {JOB_PAUSED, JOB_QUEUED}:
|
|
return
|
|
|
|
if status is None:
|
|
self.status = JOB_PAUSED if self.status == JOB_QUEUED else JOB_QUEUED
|
|
elif status:
|
|
self.status = JOB_QUEUED
|
|
else:
|
|
self.status = JOB_PAUSED
|
|
|
|
def start(self):
|
|
self.status = JOB_QUEUED
|
|
|
|
def addLog(self, frames):
|
|
log_name = "_".join(("%06d" % f for f in frames)) + ".log"
|
|
log_path = os.path.join(self.save_path, log_name)
|
|
|
|
for number in frames:
|
|
frame = self[number]
|
|
if frame:
|
|
frame.log_path = log_path
|
|
|
|
def addFrame(self, frame_number, command):
|
|
frame = MRenderFrame(frame_number, command)
|
|
self.frames.append(frame)
|
|
return frame
|
|
|
|
def reset(self, all):
|
|
for f in self.frames:
|
|
f.reset(all)
|
|
|
|
def getFrames(self):
|
|
frames = []
|
|
for f in self.frames:
|
|
if f.status == QUEUED:
|
|
self.last_dispatched = time.time()
|
|
frames.append(f)
|
|
if len(frames) >= self.chunks:
|
|
break
|
|
|
|
return frames
|
|
|
|
class MRenderFrame(netrender.model.RenderFrame):
|
|
def __init__(self, frame, command):
|
|
super().__init__()
|
|
self.number = frame
|
|
self.slave = None
|
|
self.time = 0
|
|
self.status = QUEUED
|
|
self.command = command
|
|
|
|
self.log_path = None
|
|
|
|
def reset(self, all):
|
|
if all or self.status == ERROR:
|
|
self.log_path = None
|
|
self.slave = None
|
|
self.time = 0
|
|
self.status = QUEUED
|
|
|
|
|
|
# -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
|
|
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
|
# -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
|
|
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
|
file_pattern = re.compile("/file_([a-zA-Z0-9]+)_([0-9]+)")
|
|
render_pattern = re.compile("/render_([a-zA-Z0-9]+)_([0-9]+).exr")
|
|
thumb_pattern = re.compile("/thumb_([a-zA-Z0-9]+)_([0-9]+).jpg")
|
|
log_pattern = re.compile("/log_([a-zA-Z0-9]+)_([0-9]+).log")
|
|
reset_pattern = re.compile("/reset(all|)_([a-zA-Z0-9]+)_([0-9]+)")
|
|
cancel_pattern = re.compile("/cancel_([a-zA-Z0-9]+)")
|
|
pause_pattern = re.compile("/pause_([a-zA-Z0-9]+)")
|
|
edit_pattern = re.compile("/edit_([a-zA-Z0-9]+)")
|
|
|
|
class RenderHandler(http.server.BaseHTTPRequestHandler):
|
|
def log_message(self, format, *args):
|
|
# override because the original calls self.address_string(), which
|
|
# is extremely slow due to some timeout..
|
|
sys.stderr.write("[%s] %s\n" % (self.log_date_time_string(), format%args))
|
|
|
|
def getInfoMap(self):
|
|
length = int(self.headers['content-length'])
|
|
|
|
if length > 0:
|
|
msg = str(self.rfile.read(length), encoding='utf8')
|
|
return json.loads(msg)
|
|
else:
|
|
return {}
|
|
|
|
def send_head(self, code = http.client.OK, headers = {}, content = "application/octet-stream"):
|
|
self.send_response(code)
|
|
self.send_header("Content-type", content)
|
|
|
|
for key, value in headers.items():
|
|
self.send_header(key, value)
|
|
|
|
self.end_headers()
|
|
|
|
def do_HEAD(self):
|
|
|
|
if self.path == "/status":
|
|
job_id = self.headers.get('job-id', "")
|
|
job_frame = int(self.headers.get('job-frame', -1))
|
|
|
|
job = self.server.getJobID(job_id)
|
|
if job:
|
|
frame = job[job_frame]
|
|
|
|
|
|
if frame:
|
|
self.send_head(http.client.OK)
|
|
else:
|
|
# no such frame
|
|
self.send_head(http.client.NO_CONTENT)
|
|
else:
|
|
# no such job id
|
|
self.send_head(http.client.NO_CONTENT)
|
|
|
|
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
|
# -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
|
|
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
|
# -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
|
|
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
|
|
|
def do_GET(self):
|
|
|
|
if self.path == "/version":
|
|
self.send_head()
|
|
self.server.stats("", "Version check")
|
|
self.wfile.write(VERSION)
|
|
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
|
elif self.path.startswith("/render"):
|
|
match = render_pattern.match(self.path)
|
|
|
|
if match:
|
|
job_id = match.groups()[0]
|
|
frame_number = int(match.groups()[1])
|
|
|
|
job = self.server.getJobID(job_id)
|
|
|
|
if job:
|
|
frame = job[frame_number]
|
|
|
|
if frame:
|
|
if frame.status in (QUEUED, DISPATCHED):
|
|
self.send_head(http.client.ACCEPTED)
|
|
elif frame.status == DONE:
|
|
self.server.stats("", "Sending result to client")
|
|
|
|
filename = os.path.join(job.save_path, "%06d.exr" % frame_number)
|
|
|
|
f = open(filename, 'rb')
|
|
self.send_head(content = "image/x-exr")
|
|
shutil.copyfileobj(f, self.wfile)
|
|
f.close()
|
|
elif frame.status == ERROR:
|
|
self.send_head(http.client.PARTIAL_CONTENT)
|
|
else:
|
|
# no such frame
|
|
self.send_head(http.client.NO_CONTENT)
|
|
else:
|
|
# no such job id
|
|
self.send_head(http.client.NO_CONTENT)
|
|
else:
|
|
# invalid url
|
|
self.send_head(http.client.NO_CONTENT)
|
|
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
|
elif self.path.startswith("/thumb"):
|
|
match = thumb_pattern.match(self.path)
|
|
|
|
if match:
|
|
job_id = match.groups()[0]
|
|
frame_number = int(match.groups()[1])
|
|
|
|
job = self.server.getJobID(job_id)
|
|
|
|
if job:
|
|
frame = job[frame_number]
|
|
|
|
if frame:
|
|
if frame.status in (QUEUED, DISPATCHED):
|
|
self.send_head(http.client.ACCEPTED)
|
|
elif frame.status == DONE:
|
|
filename = os.path.join(job.save_path, "%06d.exr" % frame_number)
|
|
|
|
thumbname = thumbnail.generate(filename)
|
|
|
|
if thumbname:
|
|
f = open(thumbname, 'rb')
|
|
self.send_head(content = "image/jpeg")
|
|
shutil.copyfileobj(f, self.wfile)
|
|
f.close()
|
|
else: # thumbnail couldn't be generated
|
|
self.send_head(http.client.PARTIAL_CONTENT)
|
|
return
|
|
elif frame.status == ERROR:
|
|
self.send_head(http.client.PARTIAL_CONTENT)
|
|
else:
|
|
# no such frame
|
|
self.send_head(http.client.NO_CONTENT)
|
|
else:
|
|
# no such job id
|
|
self.send_head(http.client.NO_CONTENT)
|
|
else:
|
|
# invalid url
|
|
self.send_head(http.client.NO_CONTENT)
|
|
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
|
elif self.path.startswith("/log"):
|
|
match = log_pattern.match(self.path)
|
|
|
|
if match:
|
|
job_id = match.groups()[0]
|
|
frame_number = int(match.groups()[1])
|
|
|
|
job = self.server.getJobID(job_id)
|
|
|
|
if job:
|
|
frame = job[frame_number]
|
|
|
|
if frame:
|
|
if not frame.log_path or frame.status in (QUEUED, DISPATCHED):
|
|
self.send_head(http.client.PROCESSING)
|
|
else:
|
|
self.server.stats("", "Sending log to client")
|
|
f = open(frame.log_path, 'rb')
|
|
|
|
self.send_head(content = "text/plain")
|
|
|
|
shutil.copyfileobj(f, self.wfile)
|
|
|
|
f.close()
|
|
else:
|
|
# no such frame
|
|
self.send_head(http.client.NO_CONTENT)
|
|
else:
|
|
# no such job id
|
|
self.send_head(http.client.NO_CONTENT)
|
|
else:
|
|
# invalid URL
|
|
self.send_head(http.client.NO_CONTENT)
|
|
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
|
elif self.path == "/status":
|
|
job_id = self.headers.get('job-id', "")
|
|
job_frame = int(self.headers.get('job-frame', -1))
|
|
|
|
if job_id:
|
|
|
|
job = self.server.getJobID(job_id)
|
|
if job:
|
|
if job_frame != -1:
|
|
frame = job[frame]
|
|
|
|
if frame:
|
|
message = frame.serialize()
|
|
else:
|
|
# no such frame
|
|
self.send_heat(http.client.NO_CONTENT)
|
|
return
|
|
else:
|
|
message = job.serialize()
|
|
else:
|
|
# no such job id
|
|
self.send_head(http.client.NO_CONTENT)
|
|
return
|
|
else: # status of all jobs
|
|
message = []
|
|
|
|
for job in self.server:
|
|
message.append(job.serialize())
|
|
|
|
|
|
self.server.stats("", "Sending status")
|
|
self.send_head()
|
|
self.wfile.write(bytes(json.dumps(message), encoding='utf8'))
|
|
|
|
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
|
elif self.path == "/job":
|
|
self.server.balance()
|
|
|
|
slave_id = self.headers['slave-id']
|
|
|
|
slave = self.server.getSeenSlave(slave_id)
|
|
|
|
if slave: # only if slave id is valid
|
|
job, frames = self.server.newDispatch(slave_id)
|
|
|
|
if job and frames:
|
|
for f in frames:
|
|
print("dispatch", f.number)
|
|
f.status = DISPATCHED
|
|
f.slave = slave
|
|
|
|
slave.job = job
|
|
slave.job_frames = [f.number for f in frames]
|
|
|
|
self.send_head(headers={"job-id": job.id})
|
|
|
|
message = job.serialize(frames)
|
|
|
|
self.wfile.write(bytes(json.dumps(message), encoding='utf8'))
|
|
|
|
self.server.stats("", "Sending job to slave")
|
|
else:
|
|
# no job available, return error code
|
|
slave.job = None
|
|
slave.job_frames = []
|
|
|
|
self.send_head(http.client.ACCEPTED)
|
|
else: # invalid slave id
|
|
self.send_head(http.client.NO_CONTENT)
|
|
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
|
elif self.path.startswith("/file"):
|
|
match = file_pattern.match(self.path)
|
|
|
|
if match:
|
|
slave_id = self.headers['slave-id']
|
|
slave = self.server.getSeenSlave(slave_id)
|
|
|
|
if not slave:
|
|
# invalid slave id
|
|
print("invalid slave id")
|
|
|
|
job_id = match.groups()[0]
|
|
file_index = int(match.groups()[1])
|
|
|
|
job = self.server.getJobID(job_id)
|
|
|
|
if job:
|
|
render_file = job.files[file_index]
|
|
|
|
if render_file:
|
|
self.server.stats("", "Sending file to slave")
|
|
f = open(render_file.filepath, 'rb')
|
|
|
|
self.send_head()
|
|
shutil.copyfileobj(f, self.wfile)
|
|
|
|
f.close()
|
|
else:
|
|
# no such file
|
|
self.send_head(http.client.NO_CONTENT)
|
|
else:
|
|
# no such job id
|
|
self.send_head(http.client.NO_CONTENT)
|
|
else: # invalid url
|
|
self.send_head(http.client.NO_CONTENT)
|
|
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
|
elif self.path == "/slaves":
|
|
message = []
|
|
|
|
self.server.stats("", "Sending slaves status")
|
|
|
|
for slave in self.server.slaves:
|
|
message.append(slave.serialize())
|
|
|
|
self.send_head()
|
|
|
|
self.wfile.write(bytes(json.dumps(message), encoding='utf8'))
|
|
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
|
else:
|
|
# hand over the rest to the html section
|
|
netrender.master_html.get(self)
|
|
|
|
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
|
# -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
|
|
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
|
# -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
|
|
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
|
def do_POST(self):
|
|
|
|
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
|
if self.path == "/job":
|
|
|
|
length = int(self.headers['content-length'])
|
|
|
|
job_info = netrender.model.RenderJob.materialize(json.loads(str(self.rfile.read(length), encoding='utf8')))
|
|
|
|
job_id = self.server.nextJobID()
|
|
|
|
job = MRenderJob(job_id, job_info)
|
|
|
|
for frame in job_info.frames:
|
|
frame = job.addFrame(frame.number, frame.command)
|
|
|
|
self.server.addJob(job)
|
|
|
|
headers={"job-id": job_id}
|
|
|
|
if job.testStart():
|
|
self.server.stats("", "New job, started")
|
|
self.send_head(headers=headers)
|
|
else:
|
|
self.server.stats("", "New job, missing files (%i total)" % len(job.files))
|
|
self.send_head(http.client.ACCEPTED, headers=headers)
|
|
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
|
elif self.path.startswith("/edit"):
|
|
match = edit_pattern.match(self.path)
|
|
|
|
if match:
|
|
job_id = match.groups()[0]
|
|
|
|
job = self.server.getJobID(job_id)
|
|
|
|
if job:
|
|
info_map = self.getInfoMap()
|
|
|
|
job.edit(info_map)
|
|
self.send_head()
|
|
else:
|
|
# no such job id
|
|
self.send_head(http.client.NO_CONTENT)
|
|
else:
|
|
# invalid url
|
|
self.send_head(http.client.NO_CONTENT)
|
|
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
|
elif self.path == "/balance_limit":
|
|
info_map = self.getInfoMap()
|
|
for rule_id, limit in info_map.items():
|
|
try:
|
|
rule = self.server.balancer.ruleByID(rule_id)
|
|
if rule:
|
|
rule.setLimit(limit)
|
|
except:
|
|
pass # invalid type
|
|
|
|
self.send_head()
|
|
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
|
elif self.path == "/balance_enable":
|
|
info_map = self.getInfoMap()
|
|
for rule_id, enabled in info_map.items():
|
|
rule = self.server.balancer.ruleByID(rule_id)
|
|
if rule:
|
|
rule.enabled = enabled
|
|
|
|
self.send_head()
|
|
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
|
elif self.path.startswith("/cancel"):
|
|
match = cancel_pattern.match(self.path)
|
|
|
|
if match:
|
|
info_map = self.getInfoMap()
|
|
clear = info_map.get("clear", False)
|
|
|
|
job_id = match.groups()[0]
|
|
|
|
job = self.server.getJobID(job_id)
|
|
|
|
if job:
|
|
self.server.stats("", "Cancelling job")
|
|
self.server.removeJob(job, clear)
|
|
self.send_head()
|
|
else:
|
|
# no such job id
|
|
self.send_head(http.client.NO_CONTENT)
|
|
else:
|
|
# invalid url
|
|
self.send_head(http.client.NO_CONTENT)
|
|
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
|
elif self.path.startswith("/pause"):
|
|
match = pause_pattern.match(self.path)
|
|
|
|
if match:
|
|
info_map = self.getInfoMap()
|
|
status = info_map.get("status", None)
|
|
|
|
job_id = match.groups()[0]
|
|
|
|
job = self.server.getJobID(job_id)
|
|
|
|
if job:
|
|
self.server.stats("", "Pausing job")
|
|
job.pause(status)
|
|
self.send_head()
|
|
else:
|
|
# no such job id
|
|
self.send_head(http.client.NO_CONTENT)
|
|
else:
|
|
# invalid url
|
|
self.send_head(http.client.NO_CONTENT)
|
|
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
|
elif self.path == "/clear":
|
|
# cancel all jobs
|
|
info_map = self.getInfoMap()
|
|
clear = info_map.get("clear", False)
|
|
|
|
self.server.stats("", "Clearing jobs")
|
|
self.server.clear(clear)
|
|
|
|
self.send_head()
|
|
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
|
elif self.path.startswith("/reset"):
|
|
match = reset_pattern.match(self.path)
|
|
|
|
if match:
|
|
all = match.groups()[0] == 'all'
|
|
job_id = match.groups()[1]
|
|
job_frame = int(match.groups()[2])
|
|
|
|
job = self.server.getJobID(job_id)
|
|
|
|
if job:
|
|
if job_frame != 0:
|
|
|
|
frame = job[job_frame]
|
|
if frame:
|
|
self.server.stats("", "Reset job frame")
|
|
frame.reset(all)
|
|
self.send_head()
|
|
else:
|
|
# no such frame
|
|
self.send_head(http.client.NO_CONTENT)
|
|
|
|
else:
|
|
self.server.stats("", "Reset job")
|
|
job.reset(all)
|
|
self.send_head()
|
|
|
|
else: # job not found
|
|
self.send_head(http.client.NO_CONTENT)
|
|
else: # invalid url
|
|
self.send_head(http.client.NO_CONTENT)
|
|
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
|
elif self.path == "/slave":
|
|
length = int(self.headers['content-length'])
|
|
job_frame_string = self.headers['job-frame']
|
|
|
|
self.server.stats("", "New slave connected")
|
|
|
|
slave_info = netrender.model.RenderSlave.materialize(json.loads(str(self.rfile.read(length), encoding='utf8')), cache = False)
|
|
|
|
slave_id = self.server.addSlave(slave_info.name, self.client_address, slave_info.stats)
|
|
|
|
self.send_head(headers = {"slave-id": slave_id})
|
|
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
|
elif self.path == "/log":
|
|
length = int(self.headers['content-length'])
|
|
|
|
log_info = netrender.model.LogFile.materialize(json.loads(str(self.rfile.read(length), encoding='utf8')))
|
|
|
|
slave_id = log_info.slave_id
|
|
|
|
slave = self.server.getSeenSlave(slave_id)
|
|
|
|
if slave: # only if slave id is valid
|
|
job = self.server.getJobID(log_info.job_id)
|
|
|
|
if job:
|
|
self.server.stats("", "Log announcement")
|
|
job.addLog(log_info.frames)
|
|
self.send_head(http.client.OK)
|
|
else:
|
|
# no such job id
|
|
self.send_head(http.client.NO_CONTENT)
|
|
else: # invalid slave id
|
|
self.send_head(http.client.NO_CONTENT)
|
|
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
|
# -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
|
|
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
|
# -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
|
|
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
|
def do_PUT(self):
|
|
|
|
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
|
if self.path.startswith("/file"):
|
|
match = file_pattern.match(self.path)
|
|
|
|
if match:
|
|
self.server.stats("", "Receiving job")
|
|
|
|
length = int(self.headers['content-length'])
|
|
job_id = match.groups()[0]
|
|
file_index = int(match.groups()[1])
|
|
|
|
job = self.server.getJobID(job_id)
|
|
|
|
if job:
|
|
|
|
render_file = job.files[file_index]
|
|
|
|
if render_file:
|
|
main_file = job.files[0].filepath # filename of the first file
|
|
|
|
main_path, main_name = os.path.split(main_file)
|
|
|
|
if file_index > 0:
|
|
file_path = prefixPath(job.save_path, render_file.filepath, main_path)
|
|
else:
|
|
file_path = os.path.join(job.save_path, main_name)
|
|
|
|
buf = self.rfile.read(length)
|
|
|
|
# add same temp file + renames as slave
|
|
|
|
f = open(file_path, "wb")
|
|
f.write(buf)
|
|
f.close()
|
|
del buf
|
|
|
|
render_file.filepath = file_path # set the new path
|
|
|
|
if job.testStart():
|
|
self.server.stats("", "File upload, starting job")
|
|
self.send_head(http.client.OK)
|
|
else:
|
|
self.server.stats("", "File upload, file missings")
|
|
self.send_head(http.client.ACCEPTED)
|
|
else: # invalid file
|
|
print("file not found", job_id, file_index)
|
|
self.send_head(http.client.NO_CONTENT)
|
|
else: # job not found
|
|
print("job not found", job_id, file_index)
|
|
self.send_head(http.client.NO_CONTENT)
|
|
else: # invalid url
|
|
print("no match")
|
|
self.send_head(http.client.NO_CONTENT)
|
|
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
|
elif self.path == "/render":
|
|
self.server.stats("", "Receiving render result")
|
|
|
|
# need some message content here or the slave doesn't like it
|
|
self.wfile.write(bytes("foo", encoding='utf8'))
|
|
|
|
slave_id = self.headers['slave-id']
|
|
|
|
slave = self.server.getSeenSlave(slave_id)
|
|
|
|
if slave: # only if slave id is valid
|
|
job_id = self.headers['job-id']
|
|
|
|
job = self.server.getJobID(job_id)
|
|
|
|
if job:
|
|
job_frame = int(self.headers['job-frame'])
|
|
job_result = int(self.headers['job-result'])
|
|
job_time = float(self.headers['job-time'])
|
|
|
|
frame = job[job_frame]
|
|
|
|
if frame:
|
|
if job.hasRenderResult():
|
|
if job_result == DONE:
|
|
length = int(self.headers['content-length'])
|
|
buf = self.rfile.read(length)
|
|
f = open(os.path.join(job.save_path, "%06d.exr" % job_frame), 'wb')
|
|
f.write(buf)
|
|
f.close()
|
|
|
|
del buf
|
|
elif job_result == ERROR:
|
|
# blacklist slave on this job on error
|
|
# slaves might already be in blacklist if errors on the whole chunk
|
|
if not slave.id in job.blacklist:
|
|
job.blacklist.append(slave.id)
|
|
|
|
slave.finishedFrame(job_frame)
|
|
|
|
frame.status = job_result
|
|
frame.time = job_time
|
|
|
|
job.testFinished()
|
|
|
|
self.send_head()
|
|
else: # frame not found
|
|
self.send_head(http.client.NO_CONTENT)
|
|
else: # job not found
|
|
self.send_head(http.client.NO_CONTENT)
|
|
else: # invalid slave id
|
|
self.send_head(http.client.NO_CONTENT)
|
|
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
|
elif self.path == "/thumb":
|
|
self.server.stats("", "Receiving thumbnail result")
|
|
|
|
# need some message content here or the slave doesn't like it
|
|
self.wfile.write(bytes("foo", encoding='utf8'))
|
|
|
|
slave_id = self.headers['slave-id']
|
|
|
|
slave = self.server.getSeenSlave(slave_id)
|
|
|
|
if slave: # only if slave id is valid
|
|
job_id = self.headers['job-id']
|
|
|
|
job = self.server.getJobID(job_id)
|
|
|
|
if job:
|
|
job_frame = int(self.headers['job-frame'])
|
|
|
|
frame = job[job_frame]
|
|
|
|
if frame:
|
|
if job.hasRenderResult():
|
|
length = int(self.headers['content-length'])
|
|
buf = self.rfile.read(length)
|
|
f = open(os.path.join(job.save_path, "%06d.jpg" % job_frame), 'wb')
|
|
f.write(buf)
|
|
f.close()
|
|
|
|
del buf
|
|
|
|
else: # frame not found
|
|
self.send_head(http.client.NO_CONTENT)
|
|
else: # job not found
|
|
self.send_head(http.client.NO_CONTENT)
|
|
else: # invalid slave id
|
|
self.send_head(http.client.NO_CONTENT)
|
|
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
|
elif self.path.startswith("/log"):
|
|
self.server.stats("", "Receiving log file")
|
|
|
|
match = log_pattern.match(self.path)
|
|
|
|
if match:
|
|
job_id = match.groups()[0]
|
|
|
|
job = self.server.getJobID(job_id)
|
|
|
|
if job:
|
|
job_frame = int(match.groups()[1])
|
|
|
|
frame = job[job_frame]
|
|
|
|
if frame and frame.log_path:
|
|
length = int(self.headers['content-length'])
|
|
buf = self.rfile.read(length)
|
|
f = open(frame.log_path, 'ab')
|
|
f.write(buf)
|
|
f.close()
|
|
|
|
del buf
|
|
|
|
self.server.getSeenSlave(self.headers['slave-id'])
|
|
|
|
self.send_head()
|
|
else: # frame not found
|
|
self.send_head(http.client.NO_CONTENT)
|
|
else: # job not found
|
|
self.send_head(http.client.NO_CONTENT)
|
|
else: # invalid url
|
|
self.send_head(http.client.NO_CONTENT)
|
|
|
|
class RenderMasterServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
|
|
def __init__(self, address, handler_class, path, subdir=True):
|
|
super().__init__(address, handler_class)
|
|
self.jobs = []
|
|
self.jobs_map = {}
|
|
self.slaves = []
|
|
self.slaves_map = {}
|
|
self.job_id = 0
|
|
|
|
if subdir:
|
|
self.path = os.path.join(path, "master_" + str(os.getpid()))
|
|
else:
|
|
self.path = path
|
|
|
|
self.slave_timeout = 5 # 5 mins: need a parameter for that
|
|
|
|
self.balancer = netrender.balancing.Balancer()
|
|
self.balancer.addRule(netrender.balancing.RatingUsageByCategory(self.getJobs))
|
|
self.balancer.addRule(netrender.balancing.RatingUsage())
|
|
self.balancer.addException(netrender.balancing.ExcludeQueuedEmptyJob())
|
|
self.balancer.addException(netrender.balancing.ExcludeSlavesLimit(self.countJobs, self.countSlaves, limit = 0.9))
|
|
self.balancer.addPriority(netrender.balancing.NewJobPriority())
|
|
self.balancer.addPriority(netrender.balancing.MinimumTimeBetweenDispatchPriority(limit = 2))
|
|
|
|
if not os.path.exists(self.path):
|
|
os.mkdir(self.path)
|
|
|
|
def restore(self, jobs, slaves, balancer = None):
|
|
self.jobs = jobs
|
|
self.jobs_map = {}
|
|
|
|
for job in self.jobs:
|
|
self.jobs_map[job.id] = job
|
|
self.job_id = max(self.job_id, int(job.id))
|
|
|
|
self.slaves = slaves
|
|
for slave in self.slaves:
|
|
self.slaves_map[slave.id] = slave
|
|
|
|
if balancer:
|
|
self.balancer = balancer
|
|
|
|
|
|
def nextJobID(self):
|
|
self.job_id += 1
|
|
return str(self.job_id)
|
|
|
|
def addSlave(self, name, address, stats):
|
|
slave = MRenderSlave(name, address, stats)
|
|
self.slaves.append(slave)
|
|
self.slaves_map[slave.id] = slave
|
|
|
|
return slave.id
|
|
|
|
def removeSlave(self, slave):
|
|
self.slaves.remove(slave)
|
|
self.slaves_map.pop(slave.id)
|
|
|
|
def getSlave(self, slave_id):
|
|
return self.slaves_map.get(slave_id)
|
|
|
|
def getSeenSlave(self, slave_id):
|
|
slave = self.getSlave(slave_id)
|
|
if slave:
|
|
slave.seen()
|
|
|
|
return slave
|
|
|
|
def timeoutSlaves(self):
|
|
removed = []
|
|
|
|
t = time.time()
|
|
|
|
for slave in self.slaves:
|
|
if (t - slave.last_seen) / 60 > self.slave_timeout:
|
|
removed.append(slave)
|
|
|
|
if slave.job:
|
|
for f in slave.job_frames:
|
|
slave.job[f].status = ERROR
|
|
|
|
for slave in removed:
|
|
self.removeSlave(slave)
|
|
|
|
def updateUsage(self):
|
|
blend = 0.5
|
|
for job in self.jobs:
|
|
job.usage *= (1 - blend)
|
|
|
|
if self.slaves:
|
|
slave_usage = blend / self.countSlaves()
|
|
|
|
for slave in self.slaves:
|
|
if slave.job:
|
|
slave.job.usage += slave_usage
|
|
|
|
|
|
def clear(self, clear_files = False):
|
|
removed = self.jobs[:]
|
|
|
|
for job in removed:
|
|
self.removeJob(job, clear_files)
|
|
|
|
def balance(self):
|
|
self.balancer.balance(self.jobs)
|
|
|
|
def getJobs(self):
|
|
return self.jobs
|
|
|
|
def countJobs(self, status = JOB_QUEUED):
|
|
total = 0
|
|
for j in self.jobs:
|
|
if j.status == status:
|
|
total += 1
|
|
|
|
return total
|
|
|
|
def countSlaves(self):
|
|
return len(self.slaves)
|
|
|
|
def removeJob(self, job, clear_files = False):
|
|
self.jobs.remove(job)
|
|
self.jobs_map.pop(job.id)
|
|
|
|
if clear_files:
|
|
shutil.rmtree(job.save_path)
|
|
|
|
for slave in self.slaves:
|
|
if slave.job == job:
|
|
slave.job = None
|
|
slave.job_frames = []
|
|
|
|
def addJob(self, job):
|
|
self.jobs.append(job)
|
|
self.jobs_map[job.id] = job
|
|
|
|
# create job directory
|
|
job.save_path = os.path.join(self.path, "job_" + job.id)
|
|
if not os.path.exists(job.save_path):
|
|
os.mkdir(job.save_path)
|
|
|
|
job.save()
|
|
|
|
def getJobID(self, id):
|
|
return self.jobs_map.get(id)
|
|
|
|
def __iter__(self):
|
|
for job in self.jobs:
|
|
yield job
|
|
|
|
def newDispatch(self, slave_id):
|
|
if self.jobs:
|
|
for job in self.jobs:
|
|
if not self.balancer.applyExceptions(job) and slave_id not in job.blacklist:
|
|
return job, job.getFrames()
|
|
|
|
return None, None
|
|
|
|
def clearMaster(path):
|
|
shutil.rmtree(path)
|
|
|
|
def createMaster(address, clear, path):
|
|
filepath = os.path.join(path, "blender_master.data")
|
|
|
|
if not clear and os.path.exists(filepath):
|
|
print("loading saved master:", filepath)
|
|
with open(filepath, 'rb') as f:
|
|
path, jobs, slaves = pickle.load(f)
|
|
|
|
httpd = RenderMasterServer(address, RenderHandler, path, subdir=False)
|
|
httpd.restore(jobs, slaves)
|
|
|
|
return httpd
|
|
|
|
return RenderMasterServer(address, RenderHandler, path)
|
|
|
|
def saveMaster(path, httpd):
|
|
filepath = os.path.join(path, "blender_master.data")
|
|
|
|
with open(filepath, 'wb') as f:
|
|
pickle.dump((httpd.path, httpd.jobs, httpd.slaves), f, pickle.HIGHEST_PROTOCOL)
|
|
|
|
def runMaster(address, broadcast, clear, path, update_stats, test_break):
|
|
httpd = createMaster(address, clear, path)
|
|
httpd.timeout = 1
|
|
httpd.stats = update_stats
|
|
|
|
if broadcast:
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
|
|
|
start_time = time.time() - 2
|
|
|
|
while not test_break():
|
|
try:
|
|
httpd.handle_request()
|
|
except select.error:
|
|
pass
|
|
|
|
if time.time() - start_time >= 2: # need constant here
|
|
httpd.timeoutSlaves()
|
|
|
|
httpd.updateUsage()
|
|
|
|
if broadcast:
|
|
print("broadcasting address")
|
|
s.sendto(bytes("%i" % address[1], encoding='utf8'), 0, ('<broadcast>', 8000))
|
|
start_time = time.time()
|
|
|
|
httpd.server_close()
|
|
if clear:
|
|
clearMaster(httpd.path)
|
|
else:
|
|
saveMaster(path, httpd)
|
|
|