diff --git a/displayarray/callbacks.py b/displayarray/callbacks.py index 1d3ecb9..c4b1d5d 100644 --- a/displayarray/callbacks.py +++ b/displayarray/callbacks.py @@ -23,12 +23,12 @@ class function_display_callback(object): # NOSONAR Used for running arbitrary functions on pixels. >>> import random - >>> from displayarray.frame import VideoHandlerThread + >>> from displayarray.frame import FrameUpdater >>> img = np.zeros((300, 300, 3)) >>> def fun(array, coords, finished): ... r,g,b = random.random()/20.0, random.random()/20.0, random.random()/20.0 ... array[coords[0:2]] = (array[coords[0:2]] + [r,g,b])%1.0 - >>> VideoHandlerThread(video_source=img, callbacks=function_display_callback(fun)).display() + >>> FrameUpdater(video_source=img, callbacks=function_display_callback(fun)).display() :param display_function: a function to run on the input image. :param finish_function: a function to run on the input image when the other function finishes. diff --git a/displayarray/effects/select_channels.py b/displayarray/effects/select_channels.py new file mode 100644 index 0000000..1a03993 --- /dev/null +++ b/displayarray/effects/select_channels.py @@ -0,0 +1,53 @@ +import numpy as np +from ..input import mouse_loop +import cv2 + + +class SelectChannels(object): + def __init__(self, selected_channels=None): + if selected_channels is None: + selected_channels = [0, 0, 0] + self.selected_channels = selected_channels + self.mouse_control = None + self.mouse_print_channels = False + self.num_input_channels = None + + def __call__(self, arr): + self.num_input_channels = arr.shape[-1] + out_arr = [arr[..., min(max(0, x), arr.shape[-1]-1)] for x in self.selected_channels] + out_arr = np.stack(out_arr, axis=-1) + return out_arr + + def enable_mouse_control(self): + @mouse_loop + def m_loop(me): + if me.event == cv2.EVENT_MOUSEWHEEL: + if me.flags & cv2.EVENT_FLAG_CTRLKEY: + if me.flags > 0: + self.selected_channels[0] += 1 + self.selected_channels[0] = min(self.selected_channels[0], self.num_input_channels - 1) + else: + self.selected_channels[0] -= 1 + self.selected_channels[0] = max(self.selected_channels[0], 0) + if self.mouse_print_channels: + print(f"Channel 0 now maps to {self.selected_channels[0]}.") + elif me.flags & cv2.EVENT_FLAG_SHIFTKEY: + if me.flags > 0: + self.selected_channels[1] += 1 + self.selected_channels[1] = min(self.selected_channels[1], self.num_input_channels - 1) + else: + self.selected_channels[1] -= 1 + self.selected_channels[1] = max(self.selected_channels[1], 0) + if self.mouse_print_channels: + print(f"Channel 1 now maps to {self.selected_channels[1]}.") + elif me.flags & cv2.EVENT_FLAG_ALTKEY: + if me.flags > 0: + self.selected_channels[2] += 1 + self.selected_channels[2] = min(self.selected_channels[2], self.num_input_channels - 1) + else: + self.selected_channels[2] -= 1 + self.selected_channels[2] = max(self.selected_channels[2], 0) + if self.mouse_print_channels: + print(f"Channel 2 now maps to {self.selected_channels[2]}.") + + self.mouse_control = m_loop diff --git a/displayarray/frame/__init__.py b/displayarray/frame/__init__.py index d1af476..5a14093 100644 --- a/displayarray/frame/__init__.py +++ b/displayarray/frame/__init__.py @@ -9,7 +9,7 @@ np_cam simulates numpy arrays as OpenCV cameras """ from . import subscriber_dictionary -from .frame_update_thread import VideoHandlerThread +from .frame_updater import FrameUpdater from .get_frame_ids import get_cam_ids from .np_to_opencv import NpCam from .frame_publishing import pub_cam_thread diff --git a/displayarray/frame/frame_publishing.py b/displayarray/frame/frame_publishing.py index e95c98c..20f20d0 100644 --- a/displayarray/frame/frame_publishing.py +++ b/displayarray/frame/frame_publishing.py @@ -10,11 +10,10 @@ from displayarray.uid import uid_for_source from typing import Union, Tuple - def pub_cam_loop( - cam_id: Union[int, str], - request_size: Tuple[int, int] = (1280, 720), - high_speed: bool = False, + cam_id: Union[int, str, np.ndarray], + request_size: Tuple[int, int] = (-1, -1), + high_speed: bool = True, fps_limit: float = 240, ) -> bool: """ @@ -82,11 +81,11 @@ def pub_cam_loop( def pub_cam_thread( cam_id: Union[int, str], - request_ize: Tuple[int, int] = (1280, 720), - high_speed: bool = False, + request_ize: Tuple[int, int] = (-1, -1), + high_speed: bool = True, fps_limit: float = 240, ) -> threading.Thread: - """Run pub_cam_loop in a new thread.""" + """Run pub_cam_loop in a new thread. Starts on creation.""" t = threading.Thread( target=pub_cam_loop, args=(cam_id, request_ize, high_speed, fps_limit) ) diff --git a/displayarray/frame/frame_update_thread.py b/displayarray/frame/frame_updater.py similarity index 61% rename from displayarray/frame/frame_update_thread.py rename to displayarray/frame/frame_updater.py index 5e95211..a797530 100644 --- a/displayarray/frame/frame_update_thread.py +++ b/displayarray/frame/frame_updater.py @@ -8,22 +8,23 @@ from displayarray.uid import uid_for_source from displayarray.frame import subscriber_dictionary from displayarray.frame.frame_publishing import pub_cam_thread from displayarray.window import window_commands +from ..effects.select_channels import SelectChannels FrameCallable = Callable[[np.ndarray], Optional[np.ndarray]] -class VideoHandlerThread(threading.Thread): - """Thread for publishing frames from a video source.""" +class FrameUpdater(threading.Thread): + """Thread for updating frames from a video source.""" def __init__( - self, - video_source: Union[int, str, np.ndarray] = 0, - callbacks: Optional[Union[List[FrameCallable], FrameCallable]] = None, - request_size: Tuple[int, int] = (-1, -1), - high_speed: bool = True, - fps_limit: float = 240, + self, + video_source: Union[int, str, np.ndarray] = 0, + callbacks: Optional[Union[List[FrameCallable], FrameCallable]] = None, + request_size: Tuple[int, int] = (-1, -1), + high_speed: bool = True, + fps_limit: float = 240, ): - super(VideoHandlerThread, self).__init__(target=self.loop, args=()) + super(FrameUpdater, self).__init__(target=self.loop, args=()) self.cam_id = uid_for_source(video_source) self.video_source = video_source if callbacks is None: @@ -43,20 +44,27 @@ class VideoHandlerThread(threading.Thread): def __apply_callbacks_to_frame(self, frame): if frame is not None: - frame_c = None - for c in self.callbacks: - try: - frame_c = c(frame) - except Exception as e: - self.exception_raised = e - frame = frame_c = self.exception_raised - subscriber_dictionary.stop_cam(self.cam_id) - window_commands.quit() - raise e - if frame_c is not None: - global_cv_display_callback(frame_c, self.cam_id) - else: - global_cv_display_callback(frame, self.cam_id) + try: + for c in self.callbacks: + frame = c(frame) + if frame.shape[-1] not in [1, 3] and len(frame.shape) != 2: + print(f"Too many channels in output. (Got {frame.shape[-1]} instead of 1 or 3.) " + f"Frame selection callback added.") + print("Ctrl+scroll to change first channel.\n" + "Shift+scroll to change second channel.\n" + "Alt+scroll to change third channel.") + sel = SelectChannels() + sel.enable_mouse_control() + sel.mouse_print_channels = True + self.callbacks.append(sel) + frame = self.callbacks[-1](frame) + except Exception as e: + self.exception_raised = e + frame = self.exception_raised + subscriber_dictionary.stop_cam(self.cam_id) + window_commands.quit() + raise e + global_cv_display_callback(frame, self.cam_id) def loop(self): """Continually get frames from the video publisher, run callbacks on them, and listen to commands.""" diff --git a/displayarray/util.py b/displayarray/util.py new file mode 100644 index 0000000..cb4eff6 --- /dev/null +++ b/displayarray/util.py @@ -0,0 +1,14 @@ +import weakref + + +class WeakMethod(weakref.WeakMethod): + """Pass any method to OpenCV without it keeping a reference forever.""" + + def __call__(self, *args, **kwargs): + """Call the actual method this object was made with.""" + obj = super().__call__() + func = self._func_ref() + if obj is None or func is None: + return None + meth = self._meth_type(func, obj) + meth(*args, **kwargs) diff --git a/displayarray/window/subscriber_windows.py b/displayarray/window/subscriber_windows.py index 97da523..7d3dd64 100644 --- a/displayarray/window/subscriber_windows.py +++ b/displayarray/window/subscriber_windows.py @@ -9,24 +9,12 @@ from localpubsub import NoData from displayarray.callbacks import global_cv_display_callback from displayarray.uid import uid_for_source from displayarray.frame import subscriber_dictionary -from displayarray.frame.frame_update_thread import FrameCallable -from displayarray.frame.frame_update_thread import VideoHandlerThread +from displayarray.frame.frame_updater import FrameCallable +from displayarray.frame.frame_updater import FrameUpdater from displayarray.input import MouseEvent from displayarray.window import window_commands -import weakref - - -class WeakMethod(weakref.WeakMethod): - """Pass any method to OpenCV without it keeping a reference forever.""" - - def __call__(self, *args, **kwargs): - """Call the actual method this object was made with.""" - obj = super().__call__() - func = self._func_ref() - if obj is None or func is None: - return None - meth = self._meth_type(func, obj) - meth(*args, **kwargs) +from ..util import WeakMethod +from ..effects.select_channels import SelectChannels class SubscriberWindows(object): @@ -160,6 +148,20 @@ class SubscriberWindows(object): frame = c(self.frames[f]) if frame is not None: self.frames[f] = frame + for f in range(len(self.frames)): + if self.frames[f].shape[-1] not in [1, 3] and len(self.frames[f].shape) != 2: + print(f"Too many channels in output. (Got {self.frames[f].shape[-1]} instead of 1 or 3.) " + f"Frame selection callback added.") + print("Ctrl+scroll to change first channel.\n" + "Shift+scroll to change second channel.\n" + "Alt+scroll to change third channel.") + sel = SelectChannels() + sel.enable_mouse_control() + sel.mouse_print_channels = True + self.callbacks.append(sel) + for fr in range(len(self.frames)): + self.frames[fr] = self.callbacks[-1](self.frames[fr]) + break win_num = self._display_frames(self.frames, win_num) def update(self, arr=None, id=None): @@ -229,7 +231,7 @@ def _get_video_callback_dict_threads( v_callbacks.append(callbacks[v_name]) if v in callbacks: v_callbacks.append(callbacks[v]) - vid_threads.append(VideoHandlerThread(v, callbacks=v_callbacks, fps_limit=fps, request_size=size)) + vid_threads.append(FrameUpdater(v, callbacks=v_callbacks, fps_limit=fps, request_size=size)) return vid_threads @@ -245,14 +247,14 @@ def _get_video_threads( vid_threads = _get_video_callback_dict_threads(*vids, callbacks=callbacks, fps=fps, size=size) elif isinstance(callbacks, List): for v in vids: - vid_threads.append(VideoHandlerThread(v, callbacks=callbacks, fps_limit=fps, request_size=size)) + vid_threads.append(FrameUpdater(v, callbacks=callbacks, fps_limit=fps, request_size=size)) elif callable(callbacks): for v in vids: - vid_threads.append(VideoHandlerThread(v, callbacks=[callbacks], fps_limit=fps, request_size=size)) + vid_threads.append(FrameUpdater(v, callbacks=[callbacks], fps_limit=fps, request_size=size)) else: for v in vids: if v is not None: - vid_threads.append(VideoHandlerThread(v, fps_limit=fps, request_size=size)) + vid_threads.append(FrameUpdater(v, fps_limit=fps, request_size=size)) return vid_threads diff --git a/examples/effects/select_channels.py b/examples/effects/select_channels.py new file mode 100644 index 0000000..cd3509c --- /dev/null +++ b/examples/effects/select_channels.py @@ -0,0 +1,9 @@ +from displayarray.effects import crop +from displayarray import display +import numpy as np + +# Scroll the mouse wheel and press ctrl, alt, or shift to select which channels are displayed as red, green, or blue. +arr = np.ones((250, 250, 250)) +for x in range(250): + arr[..., x] = x / 250.0 +display(arr).block() diff --git a/examples/looping/random_display.py b/examples/looping/random_display.py index 61beb24..5d7b67b 100644 --- a/examples/looping/random_display.py +++ b/examples/looping/random_display.py @@ -1,9 +1,9 @@ from displayarray import display import numpy as np -arr = np.random.normal(0.5, 0.1, (100, 100, 3)) +arr = np.random.normal(0.5, 0.1, (100, 100, 5)) with display(arr) as displayer: while displayer: - arr[:] += np.random.normal(0.001, 0.0005, (100, 100, 3)) + arr[:] += np.random.normal(0.001, 0.0005, (100, 100, 5)) arr %= 1.0 diff --git a/examples/tensorflow/denoising_autoencoder.py b/examples/tensorflow/denoising_autoencoder.py index 72a00eb..8c26c83 100644 --- a/examples/tensorflow/denoising_autoencoder.py +++ b/examples/tensorflow/denoising_autoencoder.py @@ -39,3 +39,9 @@ while displayer: autoencoder.fit(grab_noise, grab, steps_per_epoch=1, epochs=1) output_image = autoencoder.predict(grab, steps=1) displayer.update((output_image[0] * 255.0).astype(np.uint8), "uid for autoencoder output") + + get_3rd_layer_output = tf.keras.backend.function([autoencoder.layers[0].input], + [autoencoder.layers[1].output]) + layer_output = get_3rd_layer_output([grab_noise])[0] + + displayer.update(layer_output[0], "conv 1") \ No newline at end of file diff --git a/tests/require_graphics/test_pub_cam.py b/tests/require_graphics/test_pub_cam.py index 3490b82..1a9e1eb 100644 --- a/tests/require_graphics/test_pub_cam.py +++ b/tests/require_graphics/test_pub_cam.py @@ -13,7 +13,7 @@ class TestFrameHandler(ut.TestCase): print(frame.shape) self.i += 1 - w.VideoHandlerThread( + w.FrameUpdater( 0, [test_frame_handler], request_size=(1280, 720), diff --git a/tests/require_graphics/test_sub_win.py b/tests/require_graphics/test_sub_win.py index 741df27..256b1f7 100644 --- a/tests/require_graphics/test_sub_win.py +++ b/tests/require_graphics/test_sub_win.py @@ -21,21 +21,21 @@ class TestSubWin(ut.TestCase): def print_key_thread(key_chr): print("key pressed: " + str(key_chr)) - w.VideoHandlerThread().display() + w.FrameUpdater().display() def test_sub(self): - w.VideoHandlerThread().display() + w.FrameUpdater().display() def test_image(self): img = np.random.uniform(0, 1, (300, 300, 3)) - w.VideoHandlerThread(video_source=img).display() + w.FrameUpdater(video_source=img).display() def test_image_args(self): img = np.random.uniform(0, 1, (30, 30, 3)) - w.VideoHandlerThread(video_source=img, request_size=(300, -1)).display() + w.FrameUpdater(video_source=img, request_size=(300, -1)).display() def test_sub_with_args(self): - video_thread = w.VideoHandlerThread( + video_thread = w.FrameUpdater( video_source=0, request_size=(800, 600), high_speed=False, fps_limit=8 ) @@ -46,7 +46,7 @@ class TestSubWin(ut.TestCase): frame[:, :, 0] = 0 frame[:, :, 2] = 0 - w.VideoHandlerThread(callbacks=redden_frame_print_spam).display() + w.FrameUpdater(callbacks=redden_frame_print_spam).display() def test_sub_with_callback_exception(self): def redden_frame_print_spam(frame): @@ -54,7 +54,7 @@ class TestSubWin(ut.TestCase): frame[:, :, 2] = 1 / 0 with self.assertRaises(ZeroDivisionError) as e: - v = w.VideoHandlerThread(callbacks=redden_frame_print_spam) + v = w.FrameUpdater(callbacks=redden_frame_print_spam) v.display() self.assertEqual(v.exception_raised, e) @@ -76,7 +76,7 @@ class TestSubWin(ut.TestCase): frame = np.asarray([[[[[[frame + 1 / 0]]]]], [[[[[frame]]], [[[frame]]]]]]) return frame - v = w.VideoHandlerThread(callbacks=[nest_frame]) + v = w.FrameUpdater(callbacks=[nest_frame]) v.start() with self.assertRaises(ZeroDivisionError) as e: @@ -88,7 +88,7 @@ class TestSubWin(ut.TestCase): v.join() def test_conway_life(self): - from displayarray.frame import VideoHandlerThread + from displayarray.frame import FrameUpdater from displayarray.callbacks import function_display_callback import numpy as np import cv2 @@ -131,15 +131,15 @@ class TestSubWin(ut.TestCase): :, ] = 1.0 - VideoHandlerThread( + FrameUpdater( video_source=img, callbacks=function_display_callback(conway) ).display() def test_double_win(self): vid1 = np.ones((100, 100)) vid2 = np.zeros((100, 100)) - t1 = w.VideoHandlerThread(vid1) - t2 = w.VideoHandlerThread(vid2) + t1 = w.FrameUpdater(vid1) + t2 = w.FrameUpdater(vid2) t1.start() t2.start() SubscriberWindows( diff --git a/tests/unit/frame/test_frame_publishing.py b/tests/unit/frame/test_frame_publishing.py index 2a993f3..db84116 100644 --- a/tests/unit/frame/test_frame_publishing.py +++ b/tests/unit/frame/test_frame_publishing.py @@ -1,5 +1,120 @@ from displayarray.frame.frame_publishing import pub_cam_loop, pub_cam_thread +import displayarray +import mock +import pytest +import cv2 +from displayarray.frame.np_to_opencv import NpCam +import numpy as np +import displayarray.frame.subscriber_dictionary as subd +import displayarray.frame.frame_publishing as fpub -def test_pub_cam_loop(): - pub_cam_loop() \ No newline at end of file +def test_pub_cam_loop_exit(): + not_a_camera = mock.MagicMock() + with pytest.raises(TypeError): + pub_cam_loop(not_a_camera) + + +def test_pub_cam_int(): + img = np.zeros((30, 40)) + with mock.patch.object(cv2, "VideoCapture", new_callable=mock.MagicMock) as mock_cv_capture: + cap = NpCam(img) + mock_cv_capture.return_value = cap + reg_cam = displayarray.frame.frame_publishing.subscriber_dictionary.register_cam = mock.MagicMock() + cam_cmd_sub = displayarray.frame.frame_publishing.subscriber_dictionary.cam_cmd_sub = mock.MagicMock() + mock_sub = cam_cmd_sub.return_value = mock.MagicMock() + mock_sub.get = mock.MagicMock() + mock_sub.get.side_effect = ["", "", "", "quit"] + mock_sub.release = mock.MagicMock() + cap.set = mock.MagicMock() + cap.get = mock.MagicMock() + cap.get.return_value = 2 + cap.release = mock.MagicMock() + + cam_0 = subd.CV_CAMS_DICT['0'] = subd.Cam('0') + cam_pub = cam_0.frame_pub.publish = mock.MagicMock() + + pub_cam_loop(0, high_speed=False) + + cam_pub.assert_has_calls( + [mock.call(img)] * 4 + ) + + reg_cam.assert_called_once_with('0') + cam_cmd_sub.assert_called_once_with('0') + + cap.set.assert_has_calls( + [mock.call(cv2.CAP_PROP_FRAME_WIDTH, 1280), + mock.call(cv2.CAP_PROP_FRAME_HEIGHT, 720)] + ) + cap.get.assert_has_calls( + [mock.call(cv2.CAP_PROP_FRAME_COUNT)] * 8 + ) + mock_sub.get.assert_has_calls([mock.call(), mock.call(), mock.call(), mock.call()]) + mock_sub.release.assert_called_once() + cap.release.assert_called_once() + + +def test_pub_cam_fail(): + img = np.zeros((30, 40)) + with mock.patch.object(cv2, "VideoCapture", new_callable=mock.MagicMock) as mock_cv_capture: + cap = NpCam(img) + mock_cv_capture.side_effect = [cap] + + cap.isOpened = mock.MagicMock() + cap.isOpened.return_value = False + subd.register_cam = mock.MagicMock() + subd.CV_CAMS_DICT['0'] = subd.Cam('0') + + mock_fail_pub = \ + subd.CV_CAMS_DICT['0'].status_pub.publish = \ + mock.MagicMock() + + pub_cam_loop(0, high_speed=False) + + mock_fail_pub.assert_called_once_with("failed") + + +def test_pub_cam_high_speed(): + img = np.zeros((30, 40)) + with mock.patch.object(cv2, "VideoCapture", new_callable=mock.MagicMock) as mock_cv_capture: + cap = NpCam(img) + mock_cv_capture.side_effect = [cap] + + cap.isOpened = mock.MagicMock() + cap.isOpened.return_value = False + cap.set = mock.MagicMock() + + pub_cam_loop(0, request_size=(640, 480), high_speed=True) + + cap.set.assert_has_calls( + [mock.call(cv2.CAP_PROP_FOURCC, cv2.CAP_OPENCV_MJPEG), + mock.call(cv2.CAP_PROP_FRAME_WIDTH, 640), + mock.call(cv2.CAP_PROP_FRAME_HEIGHT, 480)] + ) + + +def test_pub_cam_numpy(): + with mock.patch("displayarray.frame.frame_publishing.uid_for_source", new_callable=mock.MagicMock) as mock_uidfs: + img = np.zeros((30, 40)) + NpCam.read = mock.MagicMock() + NpCam.read.side_effect = [(True, img), (True, img), (True, img), (False, None)] + subd.register_cam = mock.MagicMock() + mock_uidfs.return_value = '0' + cam_0 = subd.CV_CAMS_DICT['0'] = subd.Cam('0') + cam_pub = cam_0.frame_pub.publish = mock.MagicMock() + + pub_cam_loop(img) + cam_pub.assert_has_calls( + [mock.call(img)] * 3 + ) + + +def test_pub_cam_thread(): + with mock.patch("displayarray.frame.frame_publishing.threading.Thread", new_callable=mock.MagicMock) as mock_thread: + thread_instance = mock_thread.return_value = mock.MagicMock() + + pub_cam_thread(5) + + mock_thread.assert_called_once_with(target=fpub.pub_cam_loop, args=(5, (1280, 720), True, 240)) + thread_instance.start.assert_called_once() diff --git a/tests/unit/frame/test_frame_updater.py b/tests/unit/frame/test_frame_updater.py new file mode 100644 index 0000000..51ba2b1 --- /dev/null +++ b/tests/unit/frame/test_frame_updater.py @@ -0,0 +1,123 @@ +import displayarray.frame.frame_updater as fup +import numpy as np +import mock +import pytest +import itertools +from displayarray.effects.select_channels import SelectChannels + + +def test_init_defaults(): + ud = fup.FrameUpdater() + + assert ud.video_source == 0 + assert ud.cam_id == "0" + assert ud.callbacks == [] + assert ud.request_size == (-1, -1) + assert ud.high_speed == True + assert ud.fps_limit == 240 + + +def test_init(): + cb = lambda x: np.zeros((1, 1)) + ud = fup.FrameUpdater("test", cb, (2, 2), False, 30) + + assert ud.video_source == "test" + assert ud.cam_id == "test" + assert ud.callbacks == [cb] + assert ud.request_size == (2, 2) + assert ud.high_speed == False + assert ud.fps_limit == 30 + + +def test_loop(): + with mock.patch("displayarray.frame.frame_updater.pub_cam_thread") as mock_pubcam_thread, \ + mock.patch("displayarray.frame.frame_updater.subscriber_dictionary.CV_CAMS_DICT") as mock_cam_dict, \ + mock.patch("displayarray.frame.frame_updater.subscriber_dictionary.cam_frame_sub") as mock_frame_sub, \ + mock.patch("displayarray.frame.frame_updater.subscriber_dictionary.handler_cmd_sub") as handler_cmd_sub, \ + mock.patch("displayarray.frame.frame_updater.global_cv_display_callback") as mock_global_cb: + mock_cbs = [mock.MagicMock(), mock.MagicMock()] + ud = fup.FrameUpdater(0, callbacks=mock_cbs) + + pub_t = mock_pubcam_thread.return_value = mock.MagicMock() + mock_cam_dict.__contains__.side_effect = itertools.cycle([False, False, True]) + sub_cam = mock_frame_sub.return_value = mock.MagicMock() + sub_cam.get = mock.MagicMock() + frame = sub_cam.get.return_value = mock.MagicMock() + transformed_frame = mock.MagicMock() + mock_cbs[0].return_value = transformed_frame + mock_cbs[1].return_value = transformed_frame + transformed_frame.shape = [1, 2, 3] + mock_sub_owner = handler_cmd_sub.return_value = mock.MagicMock() + mock_sub_owner.get.side_effect = ["", "", "", "quit"] + + ud.loop() + + mock_pubcam_thread.assert_called_once_with(0, (-1, -1), True, 240) + mock_frame_sub.assert_called_once_with("0") + handler_cmd_sub.assert_called_once_with("0") + sub_cam.get.assert_has_calls([mock.call(blocking=True, timeout=1.0)] * 3) + mock_cbs[0].assert_has_calls([mock.call(frame)] * 4) + mock_cbs[1].assert_has_calls([mock.call(transformed_frame)] * 4) + mock_global_cb.assert_has_calls([mock.call(transformed_frame, '0')] * 4) + mock_sub_owner.release.assert_called_once() + sub_cam.release.assert_called_once() + pub_t.join.assert_called_once() + + +def test_callback_exception(): + def redden_frame_print_spam(frame): + frame[:, :, 0] = 0 + frame[:, :, 2] = 1 / 0 + + with pytest.raises(ZeroDivisionError) as e: + v = fup.FrameUpdater(np.zeros((1, 1, 3)), callbacks=redden_frame_print_spam) + v.loop() + assert e.errisinstance(ZeroDivisionError) + + +def test_display(): + with mock.patch("displayarray.window.SubscriberWindows", new_callable=mock.MagicMock) as mock_sub_win: + f = fup.FrameUpdater() + f.start = mock.MagicMock() + f.join = mock.MagicMock() + mock_sub_win_instance = mock_sub_win.return_value = mock.MagicMock() + + f.display() + + mock_sub_win.assert_called_once_with(video_sources=["0"], callbacks=[]) + mock_sub_win_instance.loop.assert_called_once() + f.start.assert_called_once() + f.join.assert_called_once() + + +def test_display_exception(): + with mock.patch("displayarray.window.SubscriberWindows", new_callable=mock.MagicMock) as mock_sub_win: + def redden_frame_print_spam(frame): + frame[:, :, 0] = 0 + frame[:, :, 2] = 1 / 0 + + with pytest.raises(ZeroDivisionError) as e: + v = fup.FrameUpdater(np.zeros((1, 1, 3)), callbacks=redden_frame_print_spam) + v.display() + assert e.errisinstance(ZeroDivisionError) + + +from displayarray.window.window_commands import win_cmd_pub + + +def test_display_many_channels(): + with mock.patch("displayarray.frame.frame_updater.pub_cam_thread"), \ + mock.patch("displayarray.frame.frame_updater.subscriber_dictionary.CV_CAMS_DICT") as mock_cam_dict, \ + mock.patch("displayarray.frame.frame_updater.subscriber_dictionary.handler_cmd_sub") as handler_cmd_sub: + mock_cam_dict.__contains__.side_effect = itertools.cycle([False, False, True]) + mock_sub_owner = handler_cmd_sub.return_value = mock.MagicMock() + mock_sub_owner.get.side_effect = ["", "", "", "quit"] + + arr = np.ones((20, 20, 20)) + + f = fup.FrameUpdater(arr) + + f.loop() + + assert isinstance(f.callbacks[0], SelectChannels) + win_cmd_pub.publish("quit") diff --git a/tests/unit/frame/test_get_frame_ids.py b/tests/unit/frame/test_get_frame_ids.py new file mode 100644 index 0000000..ea7c649 --- /dev/null +++ b/tests/unit/frame/test_get_frame_ids.py @@ -0,0 +1,14 @@ +import displayarray.frame.get_frame_ids as gfi +import mock +import cv2 + + +def test_get_cam_ids(): + with mock.patch.object(cv2, "VideoCapture", new_callable=mock.MagicMock) as mock_cv_capture: + cap = mock.MagicMock() + cap.isOpened.return_value = True + cap_end = mock.MagicMock() + cap_end.isOpened.return_value = False + mock_cv_capture.side_effect = [cap, cap, cap, cap_end] + ids = gfi.get_cam_ids() + assert ids == [0, 1, 2]