Cycles: merge some changes from a local branch to bring network rendering a bit

more up to date, still nowhere near working though, but might as well commit this
in case someone else is interested in working on it.
This commit is contained in:
Brecht Van Lommel 2012-12-21 11:13:46 +00:00
parent 5f4c7e5da4
commit e5b457dbc9
5 changed files with 503 additions and 223 deletions

@ -68,7 +68,7 @@ if(WITH_CYCLES_BLENDER)
add_subdirectory(blender)
endif()
if(WITH_CYCLES_TEST)
if(WITH_CYCLES_TEST OR WITH_CYCLES_NETWORK)
add_subdirectory(app)
endif()

@ -23,7 +23,9 @@
#include "util_args.h"
#include "util_foreach.h"
#include "util_path.h"
#include "util_stats.h"
#include "util_string.h"
#include "util_task.h"
using namespace ccl;
@ -32,29 +34,29 @@ int main(int argc, const char **argv)
path_init();
/* device types */
string devices = "";
string devicelist = "";
string devicename = "cpu";
bool list = false;
vector<DeviceType>& types = Device::available_types();
foreach(DeviceType type, types) {
if(devices != "")
devices += ", ";
if(devicelist != "")
devicelist += ", ";
devices += Device::string_from_type(type);
devicelist += Device::string_from_type(type);
}
/* parse options */
ArgParse ap;
ap.options ("Usage: cycles_server [options]",
"--device %s", &devicename, ("Devices to use: " + devices).c_str(),
"--device %s", &devicename, ("Devices to use: " + devicelist).c_str(),
"--list-devices", &list, "List information about all available devices",
NULL);
if(ap.parse(argc, argv) < 0) {
fprintf(stderr, "%s\n", ap.error_message().c_str());
fprintf(stderr, "%s\n", ap.geterror().c_str());
ap.usage();
exit(EXIT_FAILURE);
}
@ -84,13 +86,18 @@ int main(int argc, const char **argv)
}
}
TaskScheduler::init();
while(1) {
Device *device = Device::create(device_info);
printf("Cycles Server with device: %s\n", device->description().c_str());
Stats stats;
Device *device = Device::create(device_info, stats);
printf("Cycles Server with device: %s\n", device->info.description.c_str());
device->server_run();
delete device;
}
TaskScheduler::exit();
return 0;
}

@ -331,7 +331,13 @@ SessionParams BlenderSync::get_session_params(BL::RenderEngine b_engine, BL::Use
/* device default CPU */
params.device = devices[0];
if(RNA_enum_get(&cscene, "device") != 0) {
if(RNA_enum_get(&cscene, "device") == 2) {
/* find network device */
foreach(DeviceInfo& info, devices)
if(info.type == DEVICE_NETWORK)
params.device = info;
}
else if(RNA_enum_get(&cscene, "device") == 1) {
/* find GPU device with given id */
PointerRNA systemptr = b_userpref.system().ptr;
PropertyRNA *deviceprop = RNA_struct_find_property(&systemptr, "compute_device");

@ -31,6 +31,8 @@ class NetworkDevice : public Device
public:
boost::asio::io_service io_service;
tcp::socket socket;
device_ptr mem_counter;
DeviceTask the_task; /* todo: handle multiple tasks */
NetworkDevice(Stats &stats, const char *address)
: Device(stats), socket(io_service)
@ -49,75 +51,72 @@ public:
socket.close();
socket.connect(*endpoint_iterator++, error);
}
if(error)
throw boost::system::system_error(error);
mem_counter = 0;
}
~NetworkDevice()
{
RPCSend snd(socket, "stop");
snd.write();
}
void mem_alloc(device_memory& mem, MemoryType type)
{
#if 0
mem.device_pointer = ++mem_counter;
RPCSend snd(socket, "mem_alloc");
snd.archive & size & type;
snd.add(mem);
snd.add(type);
snd.write();
RPCReceive rcv(socket);
device_ptr mem;
*rcv.archive & mem;
return mem;
#endif
}
void mem_copy_to(device_memory& mem)
{
#if 0
RPCSend snd(socket, "mem_copy_to");
snd.archive & mem & size;
snd.add(mem);
snd.write();
snd.write_buffer(host, size);
#endif
snd.write_buffer((void*)mem.data_pointer, mem.memory_size());
}
void mem_copy_from(device_memory& mem, int y, int w, int h, int elem)
{
#if 0
RPCSend snd(socket, "mem_copy_from");
snd.archive & mem & offset & size;
snd.add(mem);
snd.add(y);
snd.add(w);
snd.add(h);
snd.add(elem);
snd.write();
RPCReceive rcv(socket);
rcv.read_buffer(host, size);
#endif
rcv.read_buffer((void*)mem.data_pointer, mem.memory_size());
}
void mem_zero(device_memory& mem)
{
#if 0
RPCSend snd(socket, "mem_zero");
snd.archive & mem & size;
snd.add(mem);
snd.write();
#endif
}
void mem_free(device_memory& mem)
{
#if 0
if(mem) {
if(mem.device_pointer) {
RPCSend snd(socket, "mem_free");
snd.archive & mem;
snd.add(mem);
snd.write();
mem.device_pointer = 0;
}
#endif
}
void const_copy_to(const char *name, void *host, size_t size)
@ -126,79 +125,107 @@ public:
string name_string(name);
snd.archive & name_string & size;
snd.add(name_string);
snd.add(size);
snd.write();
snd.write_buffer(host, size);
}
void tex_alloc(const char *name, device_memory& mem, bool interpolation, bool periodic)
{
#if 0
mem.device_pointer = ++mem_counter;
RPCSend snd(socket, "tex_alloc");
string name_string(name);
snd.archive & name_string & width & height & datatype & components & interpolation;
snd.add(name_string);
snd.add(mem);
snd.add(interpolation);
snd.add(periodic);
snd.write();
size_t size = width*height*components*datatype_size(datatype);
snd.write_buffer(host, size);
RPCReceive rcv(socket);
device_ptr mem;
*rcv.archive & mem;
return mem;
#endif
snd.write_buffer((void*)mem.data_pointer, mem.memory_size());
}
void tex_free(device_memory& mem)
{
#if 0
if(mem) {
if(mem.device_pointer) {
RPCSend snd(socket, "tex_free");
snd.archive & mem;
snd.add(mem);
snd.write();
mem.device_pointer = 0;
}
#endif
}
void path_trace(int x, int y, int w, int h, device_ptr buffer, device_ptr rng_state, int sample)
{
#if 0
RPCSend snd(socket, "path_trace");
snd.archive & x & y & w & h & buffer & rng_state & sample;
snd.write();
#endif
}
void tonemap(int x, int y, int w, int h, device_ptr rgba, device_ptr buffer, int sample, int resolution)
{
#if 0
RPCSend snd(socket, "tonemap");
snd.archive & x & y & w & h & rgba & buffer & sample & resolution;
snd.write();
#endif
}
void task_add(DeviceTask& task)
{
if(task.type == DeviceTask::TONEMAP)
tonemap(task.x, task.y, task.w, task.h, task.rgba, task.buffer, task.sample, task.resolution);
else if(task.type == DeviceTask::PATH_TRACE)
path_trace(task.x, task.y, task.w, task.h, task.buffer, task.rng_state, task.sample);
the_task = task;
RPCSend snd(socket, "task_add");
snd.add(task);
snd.write();
}
void task_wait()
{
RPCSend snd(socket, "task_wait");
snd.write();
list<RenderTile> the_tiles;
/* todo: run this threaded for connecting to multiple clients */
for(;;) {
RPCReceive rcv(socket);
RenderTile tile;
if(rcv.name == "acquire_tile") {
/* todo: watch out for recursive calls! */
if(the_task.acquire_tile(this, tile)) { /* write return as bool */
the_tiles.push_back(tile);
RPCSend snd(socket, "acquire_tile");
snd.add(tile);
snd.write();
}
else {
RPCSend snd(socket, "acquire_tile_none");
snd.write();
}
}
else if(rcv.name == "release_tile") {
rcv.read(tile);
for(list<RenderTile>::iterator it = the_tiles.begin(); it != the_tiles.end(); it++) {
if(tile.x == it->x && tile.y == it->y && tile.start_sample == it->start_sample) {
tile.buffers = it->buffers;
the_tiles.erase(it);
break;
}
}
assert(tile.buffers != NULL);
the_task.release_tile(tile);
RPCSend snd(socket, "release_tile");
snd.write();
}
else if(rcv.name == "task_wait_done")
break;
}
}
void task_cancel()
{
RPCSend snd(socket, "task_cancel");
snd.write();
}
bool support_advanced_shading()
{
return true; /* todo: get this info from device */
}
};
@ -219,16 +246,285 @@ void device_network_info(vector<DeviceInfo>& devices)
devices.push_back(info);
}
class DeviceServer {
public:
DeviceServer(Device *device_, tcp::socket& socket_)
: device(device_), socket(socket_)
{
}
void listen()
{
/* receive remote function calls */
for(;;) {
RPCReceive rcv(socket);
if(rcv.name == "stop")
break;
process(rcv);
}
}
protected:
void process(RPCReceive& rcv)
{
// fprintf(stderr, "receive process %s\n", rcv.name.c_str());
if(rcv.name == "mem_alloc") {
MemoryType type;
network_device_memory mem;
device_ptr remote_pointer;
rcv.read(mem);
rcv.read(type);
/* todo: CPU needs mem.data_pointer */
remote_pointer = mem.device_pointer;
mem_data[remote_pointer] = vector<uint8_t>();
mem_data[remote_pointer].resize(mem.memory_size());
if(mem.memory_size())
mem.data_pointer = (device_ptr)&(mem_data[remote_pointer][0]);
else
mem.data_pointer = 0;
device->mem_alloc(mem, type);
ptr_map[remote_pointer] = mem.device_pointer;
ptr_imap[mem.device_pointer] = remote_pointer;
}
else if(rcv.name == "mem_copy_to") {
network_device_memory mem;
rcv.read(mem);
device_ptr remote_pointer = mem.device_pointer;
mem.data_pointer = (device_ptr)&(mem_data[remote_pointer][0]);
rcv.read_buffer((uint8_t*)mem.data_pointer, mem.memory_size());
mem.device_pointer = ptr_map[remote_pointer];
device->mem_copy_to(mem);
}
else if(rcv.name == "mem_copy_from") {
network_device_memory mem;
int y, w, h, elem;
rcv.read(mem);
rcv.read(y);
rcv.read(w);
rcv.read(h);
rcv.read(elem);
device_ptr remote_pointer = mem.device_pointer;
mem.device_pointer = ptr_map[remote_pointer];
mem.data_pointer = (device_ptr)&(mem_data[remote_pointer][0]);
device->mem_copy_from(mem, y, w, h, elem);
RPCSend snd(socket);
snd.write();
snd.write_buffer((uint8_t*)mem.data_pointer, mem.memory_size());
}
else if(rcv.name == "mem_zero") {
network_device_memory mem;
rcv.read(mem);
device_ptr remote_pointer = mem.device_pointer;
mem.device_pointer = ptr_map[mem.device_pointer];
mem.data_pointer = (device_ptr)&(mem_data[remote_pointer][0]);
device->mem_zero(mem);
}
else if(rcv.name == "mem_free") {
network_device_memory mem;
device_ptr remote_pointer;
rcv.read(mem);
remote_pointer = mem.device_pointer;
mem.device_pointer = ptr_map[mem.device_pointer];
ptr_map.erase(remote_pointer);
ptr_imap.erase(mem.device_pointer);
mem_data.erase(remote_pointer);
device->mem_free(mem);
}
else if(rcv.name == "const_copy_to") {
string name_string;
size_t size;
rcv.read(name_string);
rcv.read(size);
vector<char> host_vector(size);
rcv.read_buffer(&host_vector[0], size);
device->const_copy_to(name_string.c_str(), &host_vector[0], size);
}
else if(rcv.name == "tex_alloc") {
network_device_memory mem;
string name;
bool interpolation;
bool periodic;
device_ptr remote_pointer;
rcv.read(name);
rcv.read(mem);
rcv.read(interpolation);
rcv.read(periodic);
remote_pointer = mem.device_pointer;
mem_data[remote_pointer] = vector<uint8_t>();
mem_data[remote_pointer].resize(mem.memory_size());
if(mem.memory_size())
mem.data_pointer = (device_ptr)&(mem_data[remote_pointer][0]);
else
mem.data_pointer = 0;
rcv.read_buffer((uint8_t*)mem.data_pointer, mem.memory_size());
device->tex_alloc(name.c_str(), mem, interpolation, periodic);
ptr_map[remote_pointer] = mem.device_pointer;
ptr_imap[mem.device_pointer] = remote_pointer;
}
else if(rcv.name == "tex_free") {
network_device_memory mem;
device_ptr remote_pointer;
rcv.read(mem);
remote_pointer = mem.device_pointer;
mem.device_pointer = ptr_map[mem.device_pointer];
ptr_map.erase(remote_pointer);
ptr_map.erase(mem.device_pointer);
mem_data.erase(remote_pointer);
device->tex_free(mem);
}
else if(rcv.name == "task_add") {
DeviceTask task;
rcv.read(task);
if(task.buffer) task.buffer = ptr_map[task.buffer];
if(task.rgba) task.rgba = ptr_map[task.rgba];
if(task.shader_input) task.shader_input = ptr_map[task.shader_input];
if(task.shader_output) task.shader_output = ptr_map[task.shader_output];
task.acquire_tile = function_bind(&DeviceServer::task_acquire_tile, this, _1, _2);
task.release_tile = function_bind(&DeviceServer::task_release_tile, this, _1);
task.update_progress_sample = function_bind(&DeviceServer::task_update_progress_sample, this);
task.update_tile_sample = function_bind(&DeviceServer::task_update_tile_sample, this, _1);
task.get_cancel = function_bind(&DeviceServer::task_get_cancel, this);
device->task_add(task);
}
else if(rcv.name == "task_wait") {
device->task_wait();
RPCSend snd(socket, "task_wait_done");
snd.write();
}
else if(rcv.name == "task_cancel") {
device->task_cancel();
}
}
bool task_acquire_tile(Device *device, RenderTile& tile)
{
thread_scoped_lock acquire_lock(acquire_mutex);
bool result = false;
RPCSend snd(socket, "acquire_tile");
snd.write();
while(1) {
RPCReceive rcv(socket);
if(rcv.name == "acquire_tile") {
rcv.read(tile);
if(tile.buffer) tile.buffer = ptr_map[tile.buffer];
if(tile.rng_state) tile.rng_state = ptr_map[tile.rng_state];
if(tile.rgba) tile.rgba = ptr_map[tile.rgba];
result = true;
break;
}
else if(rcv.name == "acquire_tile_none")
break;
else
process(rcv);
}
return result;
}
void task_update_progress_sample()
{
; /* skip */
}
void task_update_tile_sample(RenderTile&)
{
; /* skip */
}
void task_release_tile(RenderTile& tile)
{
thread_scoped_lock acquire_lock(acquire_mutex);
if(tile.buffer) tile.buffer = ptr_imap[tile.buffer];
if(tile.rng_state) tile.rng_state = ptr_imap[tile.rng_state];
if(tile.rgba) tile.rgba = ptr_imap[tile.rgba];
RPCSend snd(socket, "release_tile");
snd.add(tile);
snd.write();
while(1) {
RPCReceive rcv(socket);
if(rcv.name == "release_tile")
break;
else
process(rcv);
}
}
bool task_get_cancel()
{
return false;
}
/* properties */
Device *device;
tcp::socket& socket;
/* mapping of remote to local pointer */
map<device_ptr, device_ptr> ptr_map;
map<device_ptr, device_ptr> ptr_imap;
map<device_ptr, vector<uint8_t> > mem_data;
thread_mutex acquire_mutex;
/* todo: free memory and device (osl) on network error */
};
void Device::server_run()
{
try
{
try {
/* starts thread that responds to discovery requests */
ServerDiscovery discovery;
for(;;)
{
for(;;) {
/* accept connection */
boost::asio::io_service io_service;
tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), SERVER_PORT));
@ -236,145 +532,17 @@ void Device::server_run()
tcp::socket socket(io_service);
acceptor.accept(socket);
/* receive remote function calls */
for(;;) {
RPCReceive rcv(socket);
string remote_address = socket.remote_endpoint().address().to_string();
printf("Connected to remote client at: %s\n", remote_address.c_str());
if(rcv.name == "description") {
string desc = description();
DeviceServer server(this, socket);
server.listen();
RPCSend snd(socket);
snd.archive & desc;
snd.write();
}
else if(rcv.name == "mem_alloc") {
#if 0
MemoryType type;
size_t size;
device_ptr mem;
*rcv.archive & size & type;
mem = mem_alloc(size, type);
RPCSend snd(socket);
snd.archive & mem;
snd.write();
#endif
}
else if(rcv.name == "mem_copy_to") {
#if 0
device_ptr mem;
size_t size;
*rcv.archive & mem & size;
vector<char> host_vector(size);
rcv.read_buffer(&host_vector[0], size);
mem_copy_to(mem, &host_vector[0], size);
#endif
}
else if(rcv.name == "mem_copy_from") {
#if 0
device_ptr mem;
size_t offset, size;
*rcv.archive & mem & offset & size;
vector<char> host_vector(size);
mem_copy_from(&host_vector[0], mem, offset, size);
RPCSend snd(socket);
snd.write();
snd.write_buffer(&host_vector[0], size);
#endif
}
else if(rcv.name == "mem_zero") {
#if 0
device_ptr mem;
size_t size;
*rcv.archive & mem & size;
mem_zero(mem, size);
#endif
}
else if(rcv.name == "mem_free") {
#if 0
device_ptr mem;
*rcv.archive & mem;
mem_free(mem);
#endif
}
else if(rcv.name == "const_copy_to") {
string name_string;
size_t size;
*rcv.archive & name_string & size;
vector<char> host_vector(size);
rcv.read_buffer(&host_vector[0], size);
const_copy_to(name_string.c_str(), &host_vector[0], size);
}
else if(rcv.name == "tex_alloc") {
#if 0
string name_string;
DataType datatype;
device_ptr mem;
size_t width, height;
int components;
bool interpolation;
*rcv.archive & name_string & width & height & datatype & components & interpolation;
size_t size = width*height*components*datatype_size(datatype);
vector<char> host_vector(size);
rcv.read_buffer(&host_vector[0], size);
mem = tex_alloc(name_string.c_str(), &host_vector[0], width, height, datatype, components, interpolation);
RPCSend snd(socket);
snd.archive & mem;
snd.write();
#endif
}
else if(rcv.name == "tex_free") {
#if 0
device_ptr mem;
*rcv.archive & mem;
tex_free(mem);
#endif
}
else if(rcv.name == "path_trace") {
#if 0
device_ptr buffer, rng_state;
int x, y, w, h;
int sample;
*rcv.archive & x & y & w & h & buffer & rng_state & sample;
path_trace(x, y, w, h, buffer, rng_state, sample);
#endif
}
else if(rcv.name == "tonemap") {
#if 0
device_ptr rgba, buffer;
int x, y, w, h;
int sample, resolution;
*rcv.archive & x & y & w & h & rgba & buffer & sample & resolution;
tonemap(x, y, w, h, rgba, buffer, sample, resolution);
#endif
}
}
printf("Disconnected.\n");
}
}
catch(exception& e)
{
cerr << "Network server exception: " << e.what() << endl;
catch(exception& e) {
fprintf(stderr, "Network server exception: %s\n", e.what());
}
}

@ -31,15 +31,17 @@
#include <iostream>
#include "buffers.h"
#include "util_foreach.h"
#include "util_list.h"
#include "util_map.h"
#include "util_string.h"
CCL_NAMESPACE_BEGIN
using std::cout;
using std::cerr;
using std::endl;
using std::hex;
using std::setw;
using std::exception;
@ -51,13 +53,63 @@ static const int DISCOVER_PORT = 5121;
static const string DISCOVER_REQUEST_MSG = "REQUEST_RENDER_SERVER_IP";
static const string DISCOVER_REPLY_MSG = "REPLY_RENDER_SERVER_IP";
typedef struct RPCSend {
/* Serialization of device memory */
class network_device_memory : public device_memory
{
public:
network_device_memory() {}
~network_device_memory() { device_pointer = 0; };
vector<char> local_data;
};
/* Remote procedure call Send */
class RPCSend {
public:
RPCSend(tcp::socket& socket_, const string& name_ = "")
: name(name_), socket(socket_), archive(archive_stream)
: name(name_), socket(socket_), archive(archive_stream), sent(false)
{
archive & name_;
}
~RPCSend()
{
if(!sent)
fprintf(stderr, "Error: RPC %s not sent\n", name.c_str());
}
void add(const device_memory& mem)
{
archive & mem.data_type & mem.data_elements & mem.data_size;
archive & mem.data_width & mem.data_height & mem.device_pointer;
}
template<typename T> void add(const T& data)
{
archive & data;
}
void add(const DeviceTask& task)
{
int type = (int)task.type;
archive & type & task.x & task.y & task.w & task.h;
archive & task.rgba & task.buffer & task.sample & task.num_samples;
archive & task.resolution & task.offset & task.stride;
archive & task.shader_input & task.shader_output & task.shader_eval_type;
archive & task.shader_x & task.shader_w;
}
void add(const RenderTile& tile)
{
archive & tile.x & tile.y & tile.w & tile.h;
archive & tile.start_sample & tile.num_samples & tile.sample;
archive & tile.resolution & tile.offset & tile.stride;
archive & tile.buffer & tile.rng_state & tile.rgba;
}
void write()
{
boost::system::error_code error;
@ -84,6 +136,8 @@ typedef struct RPCSend {
if(error.value())
cout << "Network send error: " << error.message() << "\n";
sent = true;
}
void write_buffer(void *buffer, size_t size)
@ -98,13 +152,18 @@ typedef struct RPCSend {
cout << "Network send error: " << error.message() << "\n";
}
protected:
string name;
tcp::socket& socket;
ostringstream archive_stream;
boost::archive::text_oarchive archive;
} RPCSend;
bool sent;
};
typedef struct RPCReceive {
/* Remote procedure call Receive */
class RPCReceive {
public:
RPCReceive(tcp::socket& socket_)
: socket(socket_), archive_stream(NULL), archive(NULL)
{
@ -151,6 +210,19 @@ typedef struct RPCReceive {
delete archive_stream;
}
void read(network_device_memory& mem)
{
*archive & mem.data_type & mem.data_elements & mem.data_size;
*archive & mem.data_width & mem.data_height & mem.device_pointer;
mem.data_pointer = 0;
}
template<typename T> void read(T& data)
{
*archive & data;
}
void read_buffer(void *buffer, size_t size)
{
size_t len = boost::asio::read(socket, boost::asio::buffer(buffer, size));
@ -159,12 +231,39 @@ typedef struct RPCReceive {
cout << "Network receive error: buffer size doesn't match expected size\n";
}
void read(DeviceTask& task)
{
int type;
*archive & type & task.x & task.y & task.w & task.h;
*archive & task.rgba & task.buffer & task.sample & task.num_samples;
*archive & task.resolution & task.offset & task.stride;
*archive & task.shader_input & task.shader_output & task.shader_eval_type;
*archive & task.shader_x & task.shader_w;
task.type = (DeviceTask::Type)type;
}
void read(RenderTile& tile)
{
*archive & tile.x & tile.y & tile.w & tile.h;
*archive & tile.start_sample & tile.num_samples & tile.sample;
*archive & tile.resolution & tile.offset & tile.stride;
*archive & tile.buffer & tile.rng_state & tile.rgba;
tile.buffers = NULL;
}
string name;
protected:
tcp::socket& socket;
string archive_str;
istringstream *archive_stream;
boost::archive::text_iarchive *archive;
} RPCReceive;
};
/* Server auto discovery */
class ServerDiscovery {
public: