From cb3f3d7b106d2966c538df32911f15985b8394e2 Mon Sep 17 00:00:00 2001 From: SimLeek Date: Mon, 25 Feb 2019 20:14:09 -0700 Subject: [PATCH] exceptions: Moved callbacks to its own location. Added VideoHandlerThread callback exception handling. Added exception tests. --- cvpubsubs/__init__.py | 2 +- cvpubsubs/callbacks.py | 67 ++++++++++++ cvpubsubs/webcam_pub/callbacks.py | 144 -------------------------- cvpubsubs/webcam_pub/frame_handler.py | 16 ++- cvpubsubs/window_sub/cv_window_sub.py | 2 + tests/test_sub_win.py | 67 +++++------- 6 files changed, 112 insertions(+), 186 deletions(-) create mode 100644 cvpubsubs/callbacks.py delete mode 100644 cvpubsubs/webcam_pub/callbacks.py diff --git a/cvpubsubs/__init__.py b/cvpubsubs/__init__.py index 2b8877c..ef7eb44 100644 --- a/cvpubsubs/__init__.py +++ b/cvpubsubs/__init__.py @@ -1 +1 @@ -__version__ = '0.5.0' +__version__ = '0.6.0' diff --git a/cvpubsubs/callbacks.py b/cvpubsubs/callbacks.py new file mode 100644 index 0000000..d95f835 --- /dev/null +++ b/cvpubsubs/callbacks.py @@ -0,0 +1,67 @@ +from cvpubsubs.window_sub.winctrl import WinCtrl +import numpy as np + +if False: + from typing import Union + + +def global_cv_display_callback(frame, # type: np.ndarray + cam_id # type: Union[int, str] + ): + from cvpubsubs.window_sub import SubscriberWindows + """Default callback for sending frames to the global frame dictionary. + + :param frame: The video or image frame + :type frame: np.ndarray + :param cam_id: The video or image source + :type cam_id: Union[int, str] + """ + SubscriberWindows.frame_dict[str(cam_id) + "frame"] = frame + + +class function_display_callback(object): # NOSONAR + def __init__(self, display_function, finish_function=None): + """Used for running arbitrary functions on pixels. + + >>> import random + >>> from cvpubsubs.webcam_pub import VideoHandlerThread + >>> img = np.zeros((300, 300, 3)) + >>> def fun(array, coords, finished): + ... r,g,b = random.random()/20.0, random.random()/20.0, random.random()/20.0 + ... array[coords[0:2]] = (array[coords[0:2]] + [r,g,b])%1.0 + >>> VideoHandlerThread(video_source=img, callbacks=function_display_callback(fun)).display() + + :param display_function: a function to run on the input image. + :param finish_function: a function to run on the input image when the other function finishes. + """ + self.looping = True + self.first_call = True + + def _run_finisher(self, frame, finished, *args, **kwargs): + if not callable(finish_function): + WinCtrl.quit() + else: + finished = finish_function(frame, Ellipsis, finished, *args, **kwargs) + if finished: + WinCtrl.quit() + + def _display_internal(self, frame, cam_id, *args, **kwargs): + finished = True + if self.first_call: + # return to display initial frame + self.first_call = False + return + if self.looping: + it = np.nditer(frame, flags=['multi_index']) + while not it.finished: + x, y, c = it.multi_index + finished = display_function(frame, (x, y, c), finished, *args, **kwargs) + it.iternext() + if finished: + self.looping = False + _run_finisher(self, frame, finished, *args, **kwargs) + + self.inner_function = _display_internal + + def __call__(self, *args, **kwargs): + return self.inner_function(self, *args, **kwargs) diff --git a/cvpubsubs/webcam_pub/callbacks.py b/cvpubsubs/webcam_pub/callbacks.py deleted file mode 100644 index e972172..0000000 --- a/cvpubsubs/webcam_pub/callbacks.py +++ /dev/null @@ -1,144 +0,0 @@ -from cvpubsubs.window_sub.winctrl import WinCtrl -import numpy as np - -if False: - from typing import Union - - -def global_cv_display_callback(frame, # type: np.ndarray - cam_id # type: Union[int, str] - ): - from cvpubsubs.window_sub import SubscriberWindows - """Default callback for sending frames to the global frame dictionary. - - :param frame: The video or image frame - :type frame: np.ndarray - :param cam_id: The video or image source - :type cam_id: Union[int, str] - """ - SubscriberWindows.frame_dict[str(cam_id) + "frame"] = frame - - -class function_display_callback(object): # NOSONAR - def __init__(self, display_function, finish_function=None): - """Used for running arbitrary functions on pixels. - - >>> import random - >>> from cvpubsubs.webcam_pub import VideoHandlerThread - >>> img = np.zeros((300, 300, 3)) - >>> def fun(array, coords, finished): - ... r,g,b = random.random()/20.0, random.random()/20.0, random.random()/20.0 - ... array[coords[0:2]] = (array[coords[0:2]] + [r,g,b])%1.0 - >>> VideoHandlerThread(video_source=img, callbacks=function_display_callback(fun)).display() - - :param display_function: - :param finish_function: - """ - self.looping = True - self.first_call = True - - def _run_finisher(self, frame, finished, *args, **kwargs): - if not callable(finish_function): - WinCtrl.quit() - else: - finished = finish_function(frame, Ellipsis, finished, *args, **kwargs) - if finished: - WinCtrl.quit() - - def _display_internal(self, frame, cam_id, *args, **kwargs): - finished = True - if self.first_call: - # return to display initial frame - self.first_call = False - return - if self.looping: - it = np.nditer(frame, flags=['multi_index']) - while not it.finished: - x, y, c = it.multi_index - finished = display_function(frame, (x, y, c), finished, *args, **kwargs) - it.iternext() - if finished: - self.looping = False - _run_finisher(self, frame, finished, *args, **kwargs) - - self.inner_function = _display_internal - - def __call__(self, *args, **kwargs): - return self.inner_function(self, *args, **kwargs) - - -class pytorch_function_display_callback(object): # NOSONAR - def __init__(self, display_function, finish_function=None): - """Used for running arbitrary functions on pixels. - - >>> import random - >>> import torch - >>> from cvpubsubs.webcam_pub import VideoHandlerThread - >>> img = np.zeros((300, 300, 3)) - >>> def fun(array, coords, finished): - ... rgb = torch.empty(array.shape).uniform_(0,1).type(torch.DoubleTensor).to(array.device)/150.0 - ... trans = np.zeros_like(coords) - ... trans[0,...] = 1 - ... array[coords] = (array[coords+trans] + rgb[coords])%1.0 - >>> VideoHandlerThread(video_source=img, callbacks=pytorch_function_display_callback(fun)).display() - - thanks: https://medium.com/@awildtaber/building-a-rendering-engine-in-tensorflow-262438b2e062 - - :param display_function: - :param finish_function: - """ - - import torch - from torch.autograd import Variable - - self.looping = True - self.first_call = True - - def _run_finisher(self, frame, finished, *args, **kwargs): - if not callable(finish_function): - WinCtrl.quit() - else: - finished = finish_function(frame, Ellipsis, finished, *args, **kwargs) - if finished: - WinCtrl.quit() - - def _setup(self, frame, cam_id, *args, **kwargs): - - if "device" in kwargs: - self.device = torch.device(kwargs["device"]) - else: - if torch.cuda.is_available(): - self.device = torch.device("cuda") - else: - self.device = torch.device("cpu") - - self.min_bounds = [0 for _ in frame.shape] - self.max_bounds = list(frame.shape) - grid_slices = [slice(self.min_bounds[d], self.max_bounds[d]) for d in range(len(frame.shape))] - self.space_grid = np.mgrid[grid_slices] - x_tens = torch.LongTensor(self.space_grid[0, ...]).to(self.device) - y_tens = torch.LongTensor(self.space_grid[1, ...]).to(self.device) - c_tens = torch.LongTensor(self.space_grid[2, ...]).to(self.device) - self.x = Variable(x_tens, requires_grad=False) - self.y = Variable(y_tens, requires_grad=False) - self.c = Variable(c_tens, requires_grad=False) - - def _display_internal(self, frame, cam_id, *args, **kwargs): - finished = True - if self.first_call: - # return to display initial frame - _setup(self, frame, finished, *args, **kwargs) - self.first_call = False - return - if self.looping: - tor_frame = torch.from_numpy(frame).to(self.device) - finished = display_function(tor_frame, (self.x, self.y, self.c), finished, *args, **kwargs) - frame[...] = tor_frame.cpu().numpy()[...] - if finished: - self.looping = False - _run_finisher(self, frame, finished, *args, **kwargs) - - self.inner_function = _display_internal - - def __call__(self, *args, **kwargs): - return self.inner_function(self, *args, **kwargs) diff --git a/cvpubsubs/webcam_pub/frame_handler.py b/cvpubsubs/webcam_pub/frame_handler.py index efed326..c218a82 100644 --- a/cvpubsubs/webcam_pub/frame_handler.py +++ b/cvpubsubs/webcam_pub/frame_handler.py @@ -4,13 +4,15 @@ import numpy as np from cvpubsubs.webcam_pub.pub_cam import pub_cam_thread from cvpubsubs.webcam_pub.camctrl import CamCtrl +from cvpubsubs.window_sub.winctrl import WinCtrl + if False: from typing import Union, Tuple, Any, Callable, List, Optional FrameCallable = Callable[[np.ndarray, int], Optional[np.ndarray]] -from cvpubsubs.webcam_pub.callbacks import global_cv_display_callback +from cvpubsubs.callbacks import global_cv_display_callback display_callbacks = [global_cv_display_callback] @@ -53,6 +55,7 @@ class VideoHandlerThread(threading.Thread): self.request_size = request_size self.high_speed = high_speed self.fps_limit = fps_limit + self.exception_raised = None def loop(self): """Continually gets frames from the video publisher, runs callbacks on them, and listens to commands.""" @@ -66,7 +69,14 @@ class VideoHandlerThread(threading.Thread): frame = sub_cam.get(blocking=True, timeout=1.0) # type: np.ndarray if frame is not None: for c in self.callbacks: - frame_c = c(frame, self.cam_id) + try: + frame_c = c(frame, self.cam_id) + except Exception as e: + import traceback + CamCtrl.stop_cam(self.cam_id) + WinCtrl.quit() + self.exception_raised = e + frame_c = self.exception_raised if frame_c is not None: frame = frame_c msg_owner = sub_owner.get() @@ -93,3 +103,5 @@ class VideoHandlerThread(threading.Thread): self.start() SubscriberWindows(video_sources=[self.cam_id], callbacks=callbacks).loop() self.join() + if self.exception_raised is not None: + raise self.exception_raised diff --git a/cvpubsubs/window_sub/cv_window_sub.py b/cvpubsubs/window_sub/cv_window_sub.py index 6e0c4cf..561fb6a 100644 --- a/cvpubsubs/window_sub/cv_window_sub.py +++ b/cvpubsubs/window_sub/cv_window_sub.py @@ -68,6 +68,8 @@ class SubscriberWindows(object): ) def _display_frames(self, frames, win_num): + if isinstance(frames, Exception): + raise frames for f in range(len(frames)): # detect nested: if isinstance(frames[f], (list, tuple)) or frames[f].dtype.num == 17 or len(frames[f].shape) > 3: diff --git a/tests/test_sub_win.py b/tests/test_sub_win.py index addfedc..8f51eae 100644 --- a/tests/test_sub_win.py +++ b/tests/test_sub_win.py @@ -1,8 +1,6 @@ import threading import unittest as ut -import numpy as np - import cvpubsubs.webcam_pub as w from cvpubsubs.window_sub import SubscriberWindows from cvpubsubs.window_sub.winctrl import WinCtrl @@ -61,6 +59,16 @@ class TestSubWin(ut.TestCase): w.VideoHandlerThread(callbacks=redden_frame_print_spam).display() + def test_sub_with_callback_exception(self): + def redden_frame_print_spam(frame, cam_id): + frame[:, :, 0] = 0 + frame[:, :, 2] = 1 / 0 + + with self.assertRaises(ZeroDivisionError) as e: + v = w.VideoHandlerThread(callbacks=redden_frame_print_spam) + v.display() + self.assertEqual(v.exception_raised, e) + def test_multi_cams_one_source(self): def cam_handler(frame, cam_id): SubscriberWindows.set_global_frame_dict(cam_id, frame, frame) @@ -108,9 +116,26 @@ class TestSubWin(ut.TestCase): v.join() + def test_nested_frames_exception(self): + def nest_frame(frame, cam_id): + frame = np.asarray([[[[[[frame + 1 / 0]]]]], [[[[[frame]]], [[[frame]]]]]]) + return frame + + v = w.VideoHandlerThread(callbacks=[nest_frame] + w.display_callbacks) + v.start() + + with self.assertRaises(ZeroDivisionError) as e: + SubscriberWindows(window_names=[str(i) for i in range(3)], + video_sources=[str(0)] + ).loop() + self.assertEqual(v.exception_raised, e) + + v.join() + def test_conway_life(self): from cvpubsubs.webcam_pub import VideoHandlerThread - from cvpubsubs.webcam_pub.callbacks import function_display_callback + from cvpubsubs.callbacks import function_display_callback + import numpy as np img = np.zeros((50, 50, 1)) img[0:5, 0:5, :] = 1 @@ -128,39 +153,3 @@ class TestSubWin(ut.TestCase): array[coords] = 1.0 VideoHandlerThread(video_source=img, callbacks=function_display_callback(conway)).display() - - def test_conway_life_pytorch(self): - import torch - from torch import functional as F - from cvpubsubs.webcam_pub import VideoHandlerThread - from cvpubsubs.webcam_pub.callbacks import pytorch_function_display_callback - - img = np.ones((600, 800, 1)) - img[10:590, 10:790, :] = 0 - - def fun(frame, coords, finished): - array = frame - neighbor_weights = torch.ones(torch.Size([3, 3])) - neighbor_weights[1, 1, ...] = 0 - neighbor_weights = torch.Tensor(neighbor_weights).type_as(array).to(array.device) - neighbor_weights = neighbor_weights.squeeze()[None, None, :, :] - array = array.permute(2, 1, 0)[None, ...] - neighbors = torch.nn.functional.conv2d(array, neighbor_weights, stride=1, padding=1) - live_array = torch.where((neighbors < 2) | (neighbors > 3), - torch.zeros_like(array), - torch.where((2 <= neighbors) & (neighbors <= 3), - torch.ones_like(array), - array - ) - ) - dead_array = torch.where(neighbors == 3, - torch.ones_like(array), - array) - array = torch.where(array == 1.0, - live_array, - dead_array - ) - array = array.squeeze().permute(1, 0)[...,None] - frame[...] = array[...] - - VideoHandlerThread(video_source=img, callbacks=pytorch_function_display_callback(fun)).display()