From 441246cf3786dfbb7a6d26f4f75a174ede0c8fec Mon Sep 17 00:00:00 2001 From: simleek Date: Mon, 14 Oct 2019 20:25:43 -0700 Subject: [PATCH] added unit tests for frame and window --- displayarray/callbacks.py | 2 +- displayarray/effects/select_channels.py | 2 +- displayarray/frame/frame_updater.py | 4 +- displayarray/window/subscriber_windows.py | 85 +-- .../{require_graphics => effects}/__init__.py | 0 tests/{unit => frame}/__init__.py | 0 tests/frame/test_frame_publishing.py | 120 ++++ tests/{unit => }/frame/test_frame_updater.py | 22 +- tests/{unit => }/frame/test_get_frame_ids.py | 0 tests/{unit => }/frame/test_np_to_cv.py | 0 .../frame/test_subscriber_dictionary.py | 0 tests/require_graphics/test_pub_cam.py | 22 - tests/require_graphics/test_sub_win.py | 152 ------ tests/unit/frame/test_frame_publishing.py | 120 ---- tests/{unit/frame => window}/__init__.py | 0 tests/window/subscriber_windows.py | 514 ++++++++++++++++++ 16 files changed, 695 insertions(+), 348 deletions(-) rename tests/{require_graphics => effects}/__init__.py (100%) rename tests/{unit => frame}/__init__.py (100%) create mode 100644 tests/frame/test_frame_publishing.py rename tests/{unit => }/frame/test_frame_updater.py (86%) rename tests/{unit => }/frame/test_get_frame_ids.py (100%) rename tests/{unit => }/frame/test_np_to_cv.py (100%) rename tests/{unit => }/frame/test_subscriber_dictionary.py (100%) delete mode 100644 tests/require_graphics/test_pub_cam.py delete mode 100644 tests/require_graphics/test_sub_win.py delete mode 100644 tests/unit/frame/test_frame_publishing.py rename tests/{unit/frame => window}/__init__.py (100%) create mode 100644 tests/window/subscriber_windows.py diff --git a/displayarray/callbacks.py b/displayarray/callbacks.py index c4b1d5d..de728ca 100644 --- a/displayarray/callbacks.py +++ b/displayarray/callbacks.py @@ -15,7 +15,7 @@ def global_cv_display_callback(frame: np.ndarray, cam_id: Union[int, str]): """ from displayarray.window import SubscriberWindows - SubscriberWindows.FRAME_DICT[str(cam_id) + "frame"] = frame + SubscriberWindows.FRAME_DICT[str(cam_id)] = frame class function_display_callback(object): # NOSONAR diff --git a/displayarray/effects/select_channels.py b/displayarray/effects/select_channels.py index 1a03993..709ab8f 100644 --- a/displayarray/effects/select_channels.py +++ b/displayarray/effects/select_channels.py @@ -14,7 +14,7 @@ class SelectChannels(object): def __call__(self, arr): self.num_input_channels = arr.shape[-1] - out_arr = [arr[..., min(max(0, x), arr.shape[-1]-1)] for x in self.selected_channels] + out_arr = [arr[..., min(max(0, x), arr.shape[-1] - 1)] for x in self.selected_channels] out_arr = np.stack(out_arr, axis=-1) return out_arr diff --git a/displayarray/frame/frame_updater.py b/displayarray/frame/frame_updater.py index a797530..7bcb3f4 100644 --- a/displayarray/frame/frame_updater.py +++ b/displayarray/frame/frame_updater.py @@ -46,7 +46,9 @@ class FrameUpdater(threading.Thread): if frame is not None: try: for c in self.callbacks: - frame = c(frame) + frame_c = c(frame) + if frame_c is not None: + frame = frame_c if frame.shape[-1] not in [1, 3] and len(frame.shape) != 2: print(f"Too many channels in output. (Got {frame.shape[-1]} instead of 1 or 3.) " f"Frame selection callback added.") diff --git a/displayarray/window/subscriber_windows.py b/displayarray/window/subscriber_windows.py index 7d3dd64..7173b9d 100644 --- a/displayarray/window/subscriber_windows.py +++ b/displayarray/window/subscriber_windows.py @@ -30,7 +30,7 @@ class SubscriberWindows(object): callbacks: Optional[List[Callable[[np.ndarray], Any]]] = None, ): self.source_names: List[Union[str, int]] = [] - self.close_threads: Optional[List[Thread]] = None + self.close_threads: Optional[List[Thread]] = [] self.frames: List[np.ndarray] = [] self.input_vid_global_names: List[str] = [] self.window_names: List[str] = [] @@ -60,7 +60,7 @@ class SubscriberWindows(object): """Add another source for this class to display.""" uid = uid_for_source(name) self.source_names.append(uid) - self.input_vid_global_names.append(uid + "frame") + self.input_vid_global_names.append(uid) self.input_cams.append(name) return self @@ -72,9 +72,6 @@ class SubscriberWindows(object): cv2.setMouseCallback(name + " (press ESC to quit)", m) return self - def del_window(self, name): - cv2.setMouseCallback(name + " (press ESC to quit)", lambda *args: None) - def add_callback(self, callback): """Add a callback for this class to apply to videos.""" self.callbacks.append(callback) @@ -100,11 +97,7 @@ class SubscriberWindows(object): window_commands.key_pub.publish(chr(key_input)) except ValueError: warnings.warn( - RuntimeWarning( - "Unknown key code: [{}]. Please report to cv_pubsubs issue page.".format( - key_input - ) - ) + RuntimeWarning(f"Unknown key code: [{key_input}]. Please report to the displayarray issue page.") ) def handle_mouse(self, event, x, y, flags, param): @@ -112,7 +105,7 @@ class SubscriberWindows(object): mousey = MouseEvent(event, x, y, flags, param) window_commands.mouse_pub.publish(mousey) - def _display_frames(self, frames, win_num, ids=None): + def _display_frames(self, frames, win_num=0, ids=None): if isinstance(frames, Exception): raise frames for f in range(len(frames)): @@ -120,51 +113,57 @@ class SubscriberWindows(object): if ( isinstance(frames[f], (list, tuple)) or frames[f].dtype.num == 17 - or len(frames[f].shape) > 3 + or (len(frames[f].shape) != 2 and (len(frames[f].shape) != 3 or frames[f].shape[-1] != 3)) ): win_num = self._display_frames(frames[f], win_num, ids) else: + if len(self.window_names) <= win_num: + self.add_window(str(win_num)) cv2.imshow( - self.window_names[win_num % len(self.window_names)] + self.window_names[win_num] + " (press ESC to quit)", frames[f], ) win_num += 1 return win_num + def __check_too_many_channels(self): + for f in range(len(self.frames)): + if isinstance(self.frames[f], Exception): + raise self.frames[f] + if self.frames[f].shape[-1] not in [1, 3] and len(self.frames[f].shape) != 2: + print(f"Too many channels in output. (Got {self.frames[f].shape[-1]} instead of 1 or 3.) " + f"Frame selection callback added.") + print("Ctrl+scroll to change first channel.\n" + "Shift+scroll to change second channel.\n" + "Alt+scroll to change third channel.") + sel = SelectChannels() + sel.enable_mouse_control() + sel.mouse_print_channels = True + self.callbacks.append(sel) + for fr in range(len(self.frames)): + self.frames[fr] = self.callbacks[-1](self.frames[fr]) + break + def update_window_frames(self): """Update the windows with the newest data for all frames.""" - win_num = 0 + self.frames = [] for i in range(len(self.input_vid_global_names)): if self.input_vid_global_names[i] in self.FRAME_DICT and not isinstance( self.FRAME_DICT[self.input_vid_global_names[i]], NoData ): - self.frames = self.FRAME_DICT[self.input_vid_global_names[i]] + self.frames.append(self.FRAME_DICT[self.input_vid_global_names[i]]) if isinstance(self.frames, np.ndarray) and len(self.frames.shape) <= 3: self.frames = [self.frames] if len(self.callbacks) > 0: for c in self.callbacks: - for f in range(len(self.frames)): - frame = c(self.frames[f]) - if frame is not None: - self.frames[f] = frame - for f in range(len(self.frames)): - if self.frames[f].shape[-1] not in [1, 3] and len(self.frames[f].shape) != 2: - print(f"Too many channels in output. (Got {self.frames[f].shape[-1]} instead of 1 or 3.) " - f"Frame selection callback added.") - print("Ctrl+scroll to change first channel.\n" - "Shift+scroll to change second channel.\n" - "Alt+scroll to change third channel.") - sel = SelectChannels() - sel.enable_mouse_control() - sel.mouse_print_channels = True - self.callbacks.append(sel) - for fr in range(len(self.frames)): - self.frames[fr] = self.callbacks[-1](self.frames[fr]) - break - win_num = self._display_frames(self.frames, win_num) + frame = c(self.frames[-1]) + if frame is not None: + self.frames[-1] = frame + self.__check_too_many_channels() + self._display_frames(self.frames) - def update(self, arr=None, id=None): + def update(self, arr:np.ndarray=None, id:str=None): """Update window frames once. Optionally add a new input and input id.""" if arr is not None and id is not None: global_cv_display_callback(arr, id) @@ -189,9 +188,8 @@ class SubscriberWindows(object): """Close all threads. Should be used with non-blocking mode.""" window_commands.quit(force_all_read=False) self.__stop_all_cams() - if self.close_threads is not None: - for t in self.close_threads: - t.join() + for t in self.close_threads: + t.join() def __enter__(self): return self @@ -203,7 +201,6 @@ class SubscriberWindows(object): self.end() def __delete__(self, instance): - del self.handle_mouse self.end() def loop(self): @@ -228,9 +225,15 @@ def _get_video_callback_dict_threads( v_name = uid_for_source(v) v_callbacks: List[Callable[[np.ndarray], Any]] = [] if v_name in callbacks: - v_callbacks.append(callbacks[v_name]) + if isinstance(callbacks[v_name], List): + v_callbacks.extend(callbacks[v_name]) + elif callable(callbacks[v_name]): + v_callbacks.append(callbacks[v_name]) if v in callbacks: - v_callbacks.append(callbacks[v]) + if isinstance(callbacks[v], List): + v_callbacks.extend(callbacks[v]) + elif callable(callbacks[v]): + v_callbacks.append(callbacks[v]) vid_threads.append(FrameUpdater(v, callbacks=v_callbacks, fps_limit=fps, request_size=size)) return vid_threads diff --git a/tests/require_graphics/__init__.py b/tests/effects/__init__.py similarity index 100% rename from tests/require_graphics/__init__.py rename to tests/effects/__init__.py diff --git a/tests/unit/__init__.py b/tests/frame/__init__.py similarity index 100% rename from tests/unit/__init__.py rename to tests/frame/__init__.py diff --git a/tests/frame/test_frame_publishing.py b/tests/frame/test_frame_publishing.py new file mode 100644 index 0000000..e4637d7 --- /dev/null +++ b/tests/frame/test_frame_publishing.py @@ -0,0 +1,120 @@ +from displayarray.frame.frame_publishing import pub_cam_loop, pub_cam_thread +import displayarray +import mock +import pytest +import cv2 +from displayarray.frame.np_to_opencv import NpCam +import numpy as np +import displayarray.frame.subscriber_dictionary as subd +import displayarray.frame.frame_publishing as fpub + + +def test_pub_cam_loop_exit(): + not_a_camera = mock.MagicMock() + with pytest.raises(TypeError): + pub_cam_loop(not_a_camera) + + +def test_pub_cam_int(): + img = np.zeros((30, 40)) + with mock.patch.object(cv2, "VideoCapture", new_callable=mock.MagicMock) as mock_cv_capture, \ + mock.patch.object(NpCam, "set"), \ + mock.patch.object(NpCam, "get") as mock_get, \ + mock.patch.object(NpCam, "release"), \ + mock.patch.object(displayarray.frame.frame_publishing.subscriber_dictionary, "register_cam") as reg_cam, \ + mock.patch.object(displayarray.frame.frame_publishing.subscriber_dictionary, "cam_cmd_sub") as cam_cmd_sub: + cap = NpCam(img) + mock_cv_capture.return_value = cap + mock_sub = cam_cmd_sub.return_value = mock.MagicMock() + mock_sub.get = mock.MagicMock() + mock_sub.get.side_effect = ["", "", "", "quit"] + mock_sub.release = mock.MagicMock() + mock_get.return_value = 2 + + cam_0 = subd.CV_CAMS_DICT['0'] = subd.Cam('0') + with mock.patch.object(cam_0.frame_pub, "publish") as cam_pub: + pub_cam_loop(0, high_speed=False) + + cam_pub.assert_has_calls( + [mock.call(img)] * 4 + ) + + reg_cam.assert_called_once_with('0') + cam_cmd_sub.assert_called_once_with('0') + + cap.set.assert_has_calls( + [mock.call(cv2.CAP_PROP_FRAME_WIDTH, -1), + mock.call(cv2.CAP_PROP_FRAME_HEIGHT, -1)] + ) + cap.get.assert_has_calls( + [mock.call(cv2.CAP_PROP_FRAME_COUNT)] * 8 + ) + mock_sub.get.assert_has_calls([mock.call(), mock.call(), mock.call(), mock.call()]) + mock_sub.release.assert_called_once() + cap.release.assert_called_once() + + subd.CV_CAMS_DICT = {} + + +def test_pub_cam_fail(): + img = np.zeros((30, 40)) + with mock.patch.object(cv2, "VideoCapture", new_callable=mock.MagicMock) as mock_cv_capture, \ + mock.patch.object(NpCam, "isOpened") as mock_is_open, \ + mock.patch.object(subd, "register_cam") as mock_reg: + cap = NpCam(img) + mock_cv_capture.side_effect = [cap] + + mock_is_open.return_value = False + subd.CV_CAMS_DICT['0'] = subd.Cam('0') + + with mock.patch.object(subd.CV_CAMS_DICT['0'].status_pub, "publish") as mock_fail_pub: + pub_cam_loop(0, high_speed=False) + + mock_fail_pub.assert_called_once_with("failed") + + subd.CV_CAMS_DICT = {} + + +def test_pub_cam_high_speed(): + img = np.zeros((30, 40)) + with mock.patch.object(cv2, "VideoCapture", new_callable=mock.MagicMock) as mock_cv_capture, \ + mock.patch.object(NpCam, "isOpened") as mock_is_open, \ + mock.patch.object(NpCam, "set") as mock_cam_set: + cap = NpCam(img) + mock_cv_capture.side_effect = [cap] + + mock_is_open.return_value = False + + pub_cam_loop(0, request_size=(640, 480), high_speed=True) + + mock_cam_set.assert_has_calls( + [mock.call(cv2.CAP_PROP_FOURCC, cv2.CAP_OPENCV_MJPEG), + mock.call(cv2.CAP_PROP_FRAME_WIDTH, 640), + mock.call(cv2.CAP_PROP_FRAME_HEIGHT, 480)] + ) + + +def test_pub_cam_numpy(): + with mock.patch("displayarray.frame.frame_publishing.uid_for_source", new_callable=mock.MagicMock) as mock_uidfs, \ + mock.patch.object(NpCam, "read")as mock_np_read, \ + mock.patch.object(subd, "register_cam"): + img = np.zeros((30, 40)) + mock_np_read.side_effect = [(True, img), (True, img), (True, img), (False, None)] + mock_uidfs.return_value = '0' + cam_0 = subd.CV_CAMS_DICT['0'] = subd.Cam('0') + with mock.patch.object(cam_0.frame_pub, "publish") as cam_pub: + pub_cam_loop(img) + cam_pub.assert_has_calls( + [mock.call(img)] * 3 + ) + subd.CV_CAMS_DICT = {} + + +def test_pub_cam_thread(): + with mock.patch("displayarray.frame.frame_publishing.threading.Thread", new_callable=mock.MagicMock) as mock_thread: + thread_instance = mock_thread.return_value = mock.MagicMock() + + pub_cam_thread(5) + + mock_thread.assert_called_once_with(target=fpub.pub_cam_loop, args=(5, (-1, -1), True, 240)) + thread_instance.start.assert_called_once() diff --git a/tests/unit/frame/test_frame_updater.py b/tests/frame/test_frame_updater.py similarity index 86% rename from tests/unit/frame/test_frame_updater.py rename to tests/frame/test_frame_updater.py index 51ba2b1..98fe1a8 100644 --- a/tests/unit/frame/test_frame_updater.py +++ b/tests/frame/test_frame_updater.py @@ -41,7 +41,6 @@ def test_loop(): pub_t = mock_pubcam_thread.return_value = mock.MagicMock() mock_cam_dict.__contains__.side_effect = itertools.cycle([False, False, True]) sub_cam = mock_frame_sub.return_value = mock.MagicMock() - sub_cam.get = mock.MagicMock() frame = sub_cam.get.return_value = mock.MagicMock() transformed_frame = mock.MagicMock() mock_cbs[0].return_value = transformed_frame @@ -78,16 +77,15 @@ def test_callback_exception(): def test_display(): with mock.patch("displayarray.window.SubscriberWindows", new_callable=mock.MagicMock) as mock_sub_win: f = fup.FrameUpdater() - f.start = mock.MagicMock() - f.join = mock.MagicMock() - mock_sub_win_instance = mock_sub_win.return_value = mock.MagicMock() + with mock.patch.object(f, "start"), mock.patch.object(f, "join"): + mock_sub_win_instance = mock_sub_win.return_value = mock.MagicMock() - f.display() + f.display() - mock_sub_win.assert_called_once_with(video_sources=["0"], callbacks=[]) - mock_sub_win_instance.loop.assert_called_once() - f.start.assert_called_once() - f.join.assert_called_once() + mock_sub_win.assert_called_once_with(video_sources=["0"], callbacks=[]) + mock_sub_win_instance.loop.assert_called_once() + f.start.assert_called_once() + f.join.assert_called_once() def test_display_exception(): @@ -107,13 +105,17 @@ from displayarray.window.window_commands import win_cmd_pub def test_display_many_channels(): with mock.patch("displayarray.frame.frame_updater.pub_cam_thread"), \ - mock.patch("displayarray.frame.frame_updater.subscriber_dictionary.CV_CAMS_DICT") as mock_cam_dict, \ + mock.patch.object(fup.subscriber_dictionary, "CV_CAMS_DICT") as mock_cam_dict, \ + mock.patch.object(fup.subscriber_dictionary, "cam_frame_sub") as mock_sub_cam, \ mock.patch("displayarray.frame.frame_updater.subscriber_dictionary.handler_cmd_sub") as handler_cmd_sub: mock_cam_dict.__contains__.side_effect = itertools.cycle([False, False, True]) mock_sub_owner = handler_cmd_sub.return_value = mock.MagicMock() mock_sub_owner.get.side_effect = ["", "", "", "quit"] arr = np.ones((20, 20, 20)) + sub = mock.MagicMock() + sub.get.return_value = arr + mock_sub_cam.return_value = sub f = fup.FrameUpdater(arr) diff --git a/tests/unit/frame/test_get_frame_ids.py b/tests/frame/test_get_frame_ids.py similarity index 100% rename from tests/unit/frame/test_get_frame_ids.py rename to tests/frame/test_get_frame_ids.py diff --git a/tests/unit/frame/test_np_to_cv.py b/tests/frame/test_np_to_cv.py similarity index 100% rename from tests/unit/frame/test_np_to_cv.py rename to tests/frame/test_np_to_cv.py diff --git a/tests/unit/frame/test_subscriber_dictionary.py b/tests/frame/test_subscriber_dictionary.py similarity index 100% rename from tests/unit/frame/test_subscriber_dictionary.py rename to tests/frame/test_subscriber_dictionary.py diff --git a/tests/require_graphics/test_pub_cam.py b/tests/require_graphics/test_pub_cam.py deleted file mode 100644 index 1a9e1eb..0000000 --- a/tests/require_graphics/test_pub_cam.py +++ /dev/null @@ -1,22 +0,0 @@ -import displayarray.frame as w -import unittest as ut - - -class TestFrameHandler(ut.TestCase): - i = 0 - - def test_handler(self): - def test_frame_handler(frame, cam_id): - if self.i == 200: - w.subscriber_dictionary.stop_cam(cam_id) - if self.i % 100 == 0: - print(frame.shape) - self.i += 1 - - w.FrameUpdater( - 0, - [test_frame_handler], - request_size=(1280, 720), - high_speed=True, - fps_limit=240, - ) diff --git a/tests/require_graphics/test_sub_win.py b/tests/require_graphics/test_sub_win.py deleted file mode 100644 index 256b1f7..0000000 --- a/tests/require_graphics/test_sub_win.py +++ /dev/null @@ -1,152 +0,0 @@ -import unittest as ut - -import displayarray.frame as w -from displayarray.window import SubscriberWindows -from displayarray import display -from displayarray.input import mouse_loop, key_loop, MouseEvent - -import numpy as np - - -class TestSubWin(ut.TestCase): - def test_mouse_loop(self): - @mouse_loop - def print_mouse_thread(mouse_event): - print(mouse_event) - - display("fractal test.mp4", blocking=True) - - def test_key_loop(self): - @key_loop - def print_key_thread(key_chr): - print("key pressed: " + str(key_chr)) - - w.FrameUpdater().display() - - def test_sub(self): - w.FrameUpdater().display() - - def test_image(self): - img = np.random.uniform(0, 1, (300, 300, 3)) - w.FrameUpdater(video_source=img).display() - - def test_image_args(self): - img = np.random.uniform(0, 1, (30, 30, 3)) - w.FrameUpdater(video_source=img, request_size=(300, -1)).display() - - def test_sub_with_args(self): - video_thread = w.FrameUpdater( - video_source=0, request_size=(800, 600), high_speed=False, fps_limit=8 - ) - - video_thread.display() - - def test_sub_with_callback(self): - def redden_frame_print_spam(frame): - frame[:, :, 0] = 0 - frame[:, :, 2] = 0 - - w.FrameUpdater(callbacks=redden_frame_print_spam).display() - - def test_sub_with_callback_exception(self): - def redden_frame_print_spam(frame): - frame[:, :, 0] = 0 - frame[:, :, 2] = 1 / 0 - - with self.assertRaises(ZeroDivisionError) as e: - v = w.FrameUpdater(callbacks=redden_frame_print_spam) - v.display() - self.assertEqual(v.exception_raised, e) - - def test_multi_cams_one_source(self): - display(0, window_names=["cammy", "cammy2"], blocking=True) - - def test_multi_cams_multi_source(self): - display(0, np.random.uniform(0.0, 1.0, (500, 500)), blocking=True) - - def test_nested_frames(self): - def nest_frame(frame): - frame = np.asarray([[[[[[frame]]]]], [[[[[frame]]], [[[frame]]]]]]) - return frame - - display(0, callbacks=nest_frame, window_names=["1", "2", "3"], blocking=True) - - def test_nested_frames_exception(self): - def nest_frame(frame): - frame = np.asarray([[[[[[frame + 1 / 0]]]]], [[[[[frame]]], [[[frame]]]]]]) - return frame - - v = w.FrameUpdater(callbacks=[nest_frame]) - v.start() - - with self.assertRaises(ZeroDivisionError) as e: - SubscriberWindows( - window_names=[str(i) for i in range(3)], video_sources=[str(0)] - ).loop() - self.assertEqual(v.exception_raised, e) - - v.join() - - def test_conway_life(self): - from displayarray.frame import FrameUpdater - from displayarray.callbacks import function_display_callback - import numpy as np - import cv2 - - img = np.zeros((50, 50, 1)) - img[0:5, 0:5, :] = 1 - - def conway(array, coords, finished): - neighbors = np.sum( - array[ - max(coords[0] - 1, 0): min(coords[0] + 2, 50), - max(coords[1] - 1, 0): min(coords[1] + 2, 50), - ] - ) - neighbors = max(neighbors - np.sum(array[coords[0:2]]), 0.0) - if array[coords] == 1.0: - if neighbors < 2 or neighbors > 3: - array[coords] = 0.0 - elif 2 <= neighbors <= 3: - array[coords] = 1.0 - else: - if neighbors == 3: - array[coords] = 1.0 - - @mouse_loop - def conway_add( - mouse_event # type:MouseEvent - ): - if 0 <= mouse_event.x < 50 and 0 <= mouse_event.y < 50: - if mouse_event.flags == cv2.EVENT_FLAG_LBUTTON: - img[ - mouse_event.y - 5: mouse_event.y + 10, - mouse_event.x - 5: mouse_event.x + 10, - :, - ] = 0.0 - elif mouse_event.flags == cv2.EVENT_FLAG_RBUTTON: - img[ - mouse_event.y - 5: mouse_event.y + 10, - mouse_event.x - 5: mouse_event.x + 10, - :, - ] = 1.0 - - FrameUpdater( - video_source=img, callbacks=function_display_callback(conway) - ).display() - - def test_double_win(self): - vid1 = np.ones((100, 100)) - vid2 = np.zeros((100, 100)) - t1 = w.FrameUpdater(vid1) - t2 = w.FrameUpdater(vid2) - t1.start() - t2.start() - SubscriberWindows( - window_names=["cammy", "cammy2"], video_sources=[vid1, vid2] - ).loop() - t1.join() - t1.join() - - def test_display(self): - display(np.ones((100, 100)), np.zeros((100, 100)), blocking=True) diff --git a/tests/unit/frame/test_frame_publishing.py b/tests/unit/frame/test_frame_publishing.py deleted file mode 100644 index db84116..0000000 --- a/tests/unit/frame/test_frame_publishing.py +++ /dev/null @@ -1,120 +0,0 @@ -from displayarray.frame.frame_publishing import pub_cam_loop, pub_cam_thread -import displayarray -import mock -import pytest -import cv2 -from displayarray.frame.np_to_opencv import NpCam -import numpy as np -import displayarray.frame.subscriber_dictionary as subd -import displayarray.frame.frame_publishing as fpub - - -def test_pub_cam_loop_exit(): - not_a_camera = mock.MagicMock() - with pytest.raises(TypeError): - pub_cam_loop(not_a_camera) - - -def test_pub_cam_int(): - img = np.zeros((30, 40)) - with mock.patch.object(cv2, "VideoCapture", new_callable=mock.MagicMock) as mock_cv_capture: - cap = NpCam(img) - mock_cv_capture.return_value = cap - reg_cam = displayarray.frame.frame_publishing.subscriber_dictionary.register_cam = mock.MagicMock() - cam_cmd_sub = displayarray.frame.frame_publishing.subscriber_dictionary.cam_cmd_sub = mock.MagicMock() - mock_sub = cam_cmd_sub.return_value = mock.MagicMock() - mock_sub.get = mock.MagicMock() - mock_sub.get.side_effect = ["", "", "", "quit"] - mock_sub.release = mock.MagicMock() - cap.set = mock.MagicMock() - cap.get = mock.MagicMock() - cap.get.return_value = 2 - cap.release = mock.MagicMock() - - cam_0 = subd.CV_CAMS_DICT['0'] = subd.Cam('0') - cam_pub = cam_0.frame_pub.publish = mock.MagicMock() - - pub_cam_loop(0, high_speed=False) - - cam_pub.assert_has_calls( - [mock.call(img)] * 4 - ) - - reg_cam.assert_called_once_with('0') - cam_cmd_sub.assert_called_once_with('0') - - cap.set.assert_has_calls( - [mock.call(cv2.CAP_PROP_FRAME_WIDTH, 1280), - mock.call(cv2.CAP_PROP_FRAME_HEIGHT, 720)] - ) - cap.get.assert_has_calls( - [mock.call(cv2.CAP_PROP_FRAME_COUNT)] * 8 - ) - mock_sub.get.assert_has_calls([mock.call(), mock.call(), mock.call(), mock.call()]) - mock_sub.release.assert_called_once() - cap.release.assert_called_once() - - -def test_pub_cam_fail(): - img = np.zeros((30, 40)) - with mock.patch.object(cv2, "VideoCapture", new_callable=mock.MagicMock) as mock_cv_capture: - cap = NpCam(img) - mock_cv_capture.side_effect = [cap] - - cap.isOpened = mock.MagicMock() - cap.isOpened.return_value = False - subd.register_cam = mock.MagicMock() - subd.CV_CAMS_DICT['0'] = subd.Cam('0') - - mock_fail_pub = \ - subd.CV_CAMS_DICT['0'].status_pub.publish = \ - mock.MagicMock() - - pub_cam_loop(0, high_speed=False) - - mock_fail_pub.assert_called_once_with("failed") - - -def test_pub_cam_high_speed(): - img = np.zeros((30, 40)) - with mock.patch.object(cv2, "VideoCapture", new_callable=mock.MagicMock) as mock_cv_capture: - cap = NpCam(img) - mock_cv_capture.side_effect = [cap] - - cap.isOpened = mock.MagicMock() - cap.isOpened.return_value = False - cap.set = mock.MagicMock() - - pub_cam_loop(0, request_size=(640, 480), high_speed=True) - - cap.set.assert_has_calls( - [mock.call(cv2.CAP_PROP_FOURCC, cv2.CAP_OPENCV_MJPEG), - mock.call(cv2.CAP_PROP_FRAME_WIDTH, 640), - mock.call(cv2.CAP_PROP_FRAME_HEIGHT, 480)] - ) - - -def test_pub_cam_numpy(): - with mock.patch("displayarray.frame.frame_publishing.uid_for_source", new_callable=mock.MagicMock) as mock_uidfs: - img = np.zeros((30, 40)) - NpCam.read = mock.MagicMock() - NpCam.read.side_effect = [(True, img), (True, img), (True, img), (False, None)] - subd.register_cam = mock.MagicMock() - mock_uidfs.return_value = '0' - cam_0 = subd.CV_CAMS_DICT['0'] = subd.Cam('0') - cam_pub = cam_0.frame_pub.publish = mock.MagicMock() - - pub_cam_loop(img) - cam_pub.assert_has_calls( - [mock.call(img)] * 3 - ) - - -def test_pub_cam_thread(): - with mock.patch("displayarray.frame.frame_publishing.threading.Thread", new_callable=mock.MagicMock) as mock_thread: - thread_instance = mock_thread.return_value = mock.MagicMock() - - pub_cam_thread(5) - - mock_thread.assert_called_once_with(target=fpub.pub_cam_loop, args=(5, (1280, 720), True, 240)) - thread_instance.start.assert_called_once() diff --git a/tests/unit/frame/__init__.py b/tests/window/__init__.py similarity index 100% rename from tests/unit/frame/__init__.py rename to tests/window/__init__.py diff --git a/tests/window/subscriber_windows.py b/tests/window/subscriber_windows.py new file mode 100644 index 0000000..53574e6 --- /dev/null +++ b/tests/window/subscriber_windows.py @@ -0,0 +1,514 @@ +import displayarray.window.subscriber_windows as sub_win +from threading import Thread +import mock +import cv2 +import numpy as np +from displayarray.effects.select_channels import SelectChannels +import pytest + + +def test_init_defaults(): + sub_win.SubscriberWindows.FRAME_DICT = {} + with mock.patch.object(cv2, "namedWindow") as window_mock: + sw = sub_win.SubscriberWindows() + + assert sw.source_names == ["0"] + assert sw.input_vid_global_names == ["0"] + assert sw.window_names == ["displayarray"] + assert sw.input_cams == [0] + assert sw.exited == False + + window_mock.assert_called_once_with("displayarray (press ESC to quit)") + + +def test_init(): + sub_win.SubscriberWindows.FRAME_DICT = {} + with mock.patch.object(cv2, "namedWindow") as window_mock: + cb = mock.MagicMock() + sw = sub_win.SubscriberWindows(["test name"], [1], cb) + + assert sw.source_names == ["1"] + assert sw.input_vid_global_names == ["1"] + assert sw.window_names == ["test name"] + assert sw.input_cams == [1] + assert sw.exited == False + + window_mock.assert_called_once_with("test name (press ESC to quit)") + + +def test_bool(): + sub_win.SubscriberWindows.FRAME_DICT = {} + with mock.patch.object(cv2, "namedWindow"), \ + mock.patch.object(sub_win.SubscriberWindows, "update") as mock_update: + sw = sub_win.SubscriberWindows() + + mock_update.assert_called_once() + mock_update.reset_mock() + + assert bool(sw) == True + + mock_update.assert_called_once() + mock_update.reset_mock() + + sw.exited = True + + assert bool(sw) == False + + +def test_block(): + sub_win.SubscriberWindows.FRAME_DICT = {} + with mock.patch.object(cv2, "namedWindow"), \ + mock.patch.object(sub_win.SubscriberWindows, "update"), \ + mock.patch.object(sub_win.SubscriberWindows, "loop") as mock_loop: + sw = sub_win.SubscriberWindows() + sw.close_threads.append(mock.MagicMock()) + + sw.block() + + mock_loop.assert_called_once() + sw.close_threads[0].join.assert_called_once() + + +def test_add_source(): + sub_win.SubscriberWindows.FRAME_DICT = {} + with mock.patch.object(cv2, "namedWindow"): + sw = sub_win.SubscriberWindows().add_source(2) + + assert sw.source_names == ["0", "2"] + assert sw.input_vid_global_names == ["0", "2"] + assert sw.input_cams == [0, 2] + + +def test_add_window(): + sub_win.SubscriberWindows.FRAME_DICT = {} + with mock.patch.object(cv2, "namedWindow") as mock_named_window, \ + mock.patch.object(cv2, "setMouseCallback") as mock_set_mouse, \ + mock.patch("displayarray.window.subscriber_windows.WeakMethod", new_callable=mock.MagicMock) as mock_weak: + weak_method = mock_weak.return_value = mock.MagicMock() + + sw = sub_win.SubscriberWindows().add_window("second window") + + mock_weak.assert_has_calls([ + mock.call(sw.handle_mouse), + mock.call(sw.handle_mouse) + ]) + assert sw.window_names == ['displayarray', 'second window'] + mock_named_window.assert_has_calls( + [ + mock.call('displayarray (press ESC to quit)'), + mock.call('second window (press ESC to quit)') + ] + ) + mock_set_mouse.assert_has_calls( + [ + mock.call('displayarray (press ESC to quit)', weak_method), + mock.call('second window (press ESC to quit)', weak_method), + ] + ) + + +def test_add_callback(): + sub_win.SubscriberWindows.FRAME_DICT = {} + with mock.patch.object(cv2, "namedWindow"): + mock_cb = mock.MagicMock() + + sw = sub_win.SubscriberWindows().add_callback(mock_cb) + + assert sw.callbacks[0] == mock_cb + + +def test_handle_keys(): + sub_win.SubscriberWindows.FRAME_DICT = {} + with mock.patch.object(cv2, "namedWindow"), \ + mock.patch("displayarray.window.subscriber_windows.window_commands") as mock_win_cmd, \ + mock.patch("displayarray.window.subscriber_windows.subscriber_dictionary.stop_cam") as mock_stop, \ + mock.patch("displayarray.window.subscriber_windows.warnings") as mock_warnings, \ + mock.patch("displayarray.window.subscriber_windows.RuntimeWarning") as mock_runtime, \ + mock.patch.object(cv2, "destroyWindow") as mock_destroy: + mock_runtime.return_value = mock_runtime + + # test ordinary + sw = sub_win.SubscriberWindows() + + sw.handle_keys(ord('h')) + + mock_win_cmd.key_pub.publish.assert_called_once_with('h') + + # test bad key + def bad_key(k): + raise ValueError("Bad Key") + + mock_win_cmd.key_pub.publish = bad_key + + sw.handle_keys(ord('b')) + + mock_runtime.assert_called_once_with( + f"Unknown key code: [{ord('b')}]. Please report to the displayarray issue page." + ) + mock_warnings.warn.assert_called_once_with(mock_runtime) + + # test exit key + assert sw.ESC_KEY_CODES == [27] + ret = sw.handle_keys(27) + + mock_destroy.assert_called_once_with("displayarray (press ESC to quit)") + assert sw.exited is True + mock_win_cmd.quit.assert_called_once() + mock_stop.assert_called_once_with("0") + assert ret == "quit" + + +def test_handle_mouse(): + sub_win.SubscriberWindows.FRAME_DICT = {} + with mock.patch.object(cv2, "namedWindow"), \ + mock.patch("displayarray.window.subscriber_windows.window_commands") as mock_win_cmd, \ + mock.patch("displayarray.window.subscriber_windows.MouseEvent") as mock_mouse_event: + mock_mousey = mock_mouse_event.return_value = mock.MagicMock() + + sw = sub_win.SubscriberWindows() + + sw.handle_mouse(1, 2, 3, 4, 5) + mock_mouse_event.assert_called_once_with(1, 2, 3, 4, 5) + mock_win_cmd.mouse_pub.publish.assert_called_once_with(mock_mousey) + + +def test_update_window_frames(): + sub_win.SubscriberWindows.FRAME_DICT = {} + with mock.patch.object(cv2, "namedWindow"), \ + mock.patch.object(cv2, "imshow") as mock_imshow: + sw = sub_win.SubscriberWindows() + + frame = np.ones((100, 100)) + sw.FRAME_DICT['0'] = frame + + sw.update_window_frames() + + assert sw.frames == [frame] + mock_imshow.assert_called_once_with("displayarray (press ESC to quit)", frame) + + +def test_update_window_frames_callback(): + sub_win.SubscriberWindows.FRAME_DICT = {} + with mock.patch.object(cv2, "namedWindow"), \ + mock.patch.object(cv2, "imshow") as mock_imshow: + cb = mock.MagicMock() + cb2 = mock.MagicMock() + frame = np.ones((100, 100)) + frame2 = np.ones((102, 102)) + frame3 = np.ones((103, 103)) + cb.return_value = frame2 + cb2.return_value = frame3 + + sw = sub_win.SubscriberWindows(window_names=["0", "1"], video_sources=[0, 1], callbacks=[cb, cb2]) + + sw.FRAME_DICT['0'] = frame + sw.FRAME_DICT['1'] = frame + + sw.update_window_frames() + + assert sw.frames == [frame3, frame3] + assert np.all(cb.mock_calls[0].args[0] == frame) + assert np.all(cb2.mock_calls[0].args[0] == frame2) + mock_imshow.assert_has_calls([ + mock.call("0 (press ESC to quit)", frame3), + mock.call("1 (press ESC to quit)", frame3), + ]) + + +def test_update_window_frames_too_many_channels(): + sub_win.SubscriberWindows.FRAME_DICT = {} + with mock.patch.object(cv2, "namedWindow"), \ + mock.patch.object(cv2, "imshow") as mock_imshow, \ + mock.patch("displayarray.window.subscriber_windows.print") as mock_print: + sw = sub_win.SubscriberWindows() + + frame = np.ones((100, 100, 100)) + sw.FRAME_DICT['0'] = frame + + sw.update_window_frames() + + mock_print.assert_has_calls([ + mock.call('Too many channels in output. (Got 100 instead of 1 or 3.) Frame selection callback added.'), + mock.call('Ctrl+scroll to change first channel.\n' + 'Shift+scroll to change second channel.\n' + 'Alt+scroll to change third channel.') + ]) + + assert isinstance(sw.callbacks[-1], SelectChannels) + assert sw.callbacks[-1].mouse_control is not None + assert sw.callbacks[-1].mouse_print_channels is True + assert sw.frames[0].shape[-1] == 3 + + +def test_update_window_frames_nested(): + sub_win.SubscriberWindows.FRAME_DICT = {} + with mock.patch.object(cv2, "namedWindow"), \ + mock.patch.object(cv2, "imshow") as mock_imshow, \ + mock.patch("displayarray.window.subscriber_windows.print"): + sw = sub_win.SubscriberWindows() + + frame = np.ones((20, 100, 100, 100)) + sw.FRAME_DICT['0'] = frame + + sw.update_window_frames() + + assert np.all(sw.frames[0] == np.ones((20, 100, 100, 3))) + assert len(sw.frames) == 1 + assert mock_imshow.mock_calls[0].args[0] == "displayarray (press ESC to quit)" + assert np.all(mock_imshow.mock_calls[0].args[1] == np.ones((100, 100, 3))) + assert mock_imshow.mock_calls[1].args[0] == "1 (press ESC to quit)" + assert np.all(mock_imshow.mock_calls[1].args[1] == np.ones((100, 100, 3))) + assert mock_imshow.mock_calls[2].args[0] == "2 (press ESC to quit)" + assert np.all(mock_imshow.mock_calls[2].args[1] == np.ones((100, 100, 3))) + assert mock_imshow.mock_calls[3].args[0] == "3 (press ESC to quit)" + assert np.all(mock_imshow.mock_calls[3].args[1] == np.ones((100, 100, 3))) + assert mock_imshow.mock_calls[4].args[0] == "4 (press ESC to quit)" + assert np.all(mock_imshow.mock_calls[4].args[1] == np.ones((100, 100, 3))) + assert mock_imshow.mock_calls[5].args[0] == "5 (press ESC to quit)" + assert np.all(mock_imshow.mock_calls[5].args[1] == np.ones((100, 100, 3))) + assert mock_imshow.mock_calls[6].args[0] == "6 (press ESC to quit)" + assert np.all(mock_imshow.mock_calls[6].args[1] == np.ones((100, 100, 3))) + assert mock_imshow.mock_calls[7].args[0] == "7 (press ESC to quit)" + assert np.all(mock_imshow.mock_calls[7].args[1] == np.ones((100, 100, 3))) + assert mock_imshow.mock_calls[8].args[0] == "8 (press ESC to quit)" + assert np.all(mock_imshow.mock_calls[8].args[1] == np.ones((100, 100, 3))) + assert mock_imshow.mock_calls[9].args[0] == "9 (press ESC to quit)" + assert np.all(mock_imshow.mock_calls[9].args[1] == np.ones((100, 100, 3))) + assert mock_imshow.mock_calls[10].args[0] == "10 (press ESC to quit)" + assert np.all(mock_imshow.mock_calls[10].args[1] == np.ones((100, 100, 3))) + assert mock_imshow.mock_calls[11].args[0] == "11 (press ESC to quit)" + assert np.all(mock_imshow.mock_calls[11].args[1] == np.ones((100, 100, 3))) + assert mock_imshow.mock_calls[12].args[0] == "12 (press ESC to quit)" + assert np.all(mock_imshow.mock_calls[12].args[1] == np.ones((100, 100, 3))) + assert mock_imshow.mock_calls[13].args[0] == "13 (press ESC to quit)" + assert np.all(mock_imshow.mock_calls[13].args[1] == np.ones((100, 100, 3))) + assert mock_imshow.mock_calls[14].args[0] == "14 (press ESC to quit)" + assert np.all(mock_imshow.mock_calls[14].args[1] == np.ones((100, 100, 3))) + assert mock_imshow.mock_calls[15].args[0] == "15 (press ESC to quit)" + assert np.all(mock_imshow.mock_calls[15].args[1] == np.ones((100, 100, 3))) + assert mock_imshow.mock_calls[16].args[0] == "16 (press ESC to quit)" + assert np.all(mock_imshow.mock_calls[16].args[1] == np.ones((100, 100, 3))) + assert mock_imshow.mock_calls[17].args[0] == "17 (press ESC to quit)" + assert np.all(mock_imshow.mock_calls[17].args[1] == np.ones((100, 100, 3))) + assert mock_imshow.mock_calls[18].args[0] == "18 (press ESC to quit)" + assert np.all(mock_imshow.mock_calls[18].args[1] == np.ones((100, 100, 3))) + assert mock_imshow.mock_calls[19].args[0] == "19 (press ESC to quit)" + assert np.all(mock_imshow.mock_calls[19].args[1] == np.ones((100, 100, 3))) + + +def test_update_window_frames_exception(): + sub_win.SubscriberWindows.FRAME_DICT = {} + with mock.patch.object(cv2, "namedWindow"), \ + mock.patch.object(cv2, "imshow") as mock_imshow: + sw = sub_win.SubscriberWindows() + + frame = RuntimeError("Sent from FrameUpdater") + sw.FRAME_DICT['0'] = frame + + with pytest.raises(RuntimeError) as e: + sw.update_window_frames() + assert e.value == frame + + +def test_update(): + sub_win.SubscriberWindows.FRAME_DICT = {} + with mock.patch.object(cv2, "namedWindow"), \ + mock.patch.object(sub_win.SubscriberWindows, "update_window_frames")as mock_update_win_frames, \ + mock.patch("displayarray.window.subscriber_windows.window_commands") as mock_win_cmd, \ + mock.patch.object(sub_win.SubscriberWindows, "handle_keys")as mock_handle_keys, \ + mock.patch.object(cv2, "waitKey") as key: + sub_cmd = mock_win_cmd.win_cmd_sub.return_value = mock.MagicMock() + key.return_value = 2 + mock_cmd = sub_cmd.get.return_value = mock.MagicMock() + mock_key = mock_handle_keys.return_value = mock.MagicMock() + + sw = sub_win.SubscriberWindows() + + cmd, key = sw.update() + + assert mock_win_cmd.win_cmd_sub.call_count == 2 + assert mock_update_win_frames.call_count == 2 + assert sub_cmd.get.call_count == 2 + assert cmd == mock_cmd + assert key == mock_key + + +def test_update_with_array(): + sub_win.SubscriberWindows.FRAME_DICT = {} + with mock.patch.object(cv2, "namedWindow"), \ + mock.patch.object(sub_win.SubscriberWindows, "update_window_frames")as mock_update_win_frames, \ + mock.patch("displayarray.window.subscriber_windows.window_commands") as mock_win_cmd, \ + mock.patch.object(sub_win.SubscriberWindows, "handle_keys")as mock_handle_keys, \ + mock.patch.object(sub_win.SubscriberWindows, "add_source")as add_source, \ + mock.patch.object(sub_win.SubscriberWindows, "add_window")as add_window, \ + mock.patch("displayarray.window.subscriber_windows.global_cv_display_callback") as mock_cb, \ + mock.patch.object(cv2, "waitKey") as key: + sw = sub_win.SubscriberWindows() + + sw.update(arr=1, id=2) + + mock_cb.assert_called_once_with(1, 2) + add_source.assert_has_calls([mock.call(0), mock.call(2)]) + add_window.assert_has_calls([mock.call('displayarray'), mock.call(2)]) + + +def test_wait_for_init(): + sub_win.SubscriberWindows.FRAME_DICT = {} + with mock.patch.object(cv2, "namedWindow"), \ + mock.patch.object(sub_win.SubscriberWindows, "update")as update: + sw = sub_win.SubscriberWindows() + + def mock_update(): + sw.frames = mock_update.frames[mock_update.i] + mock_update.i += 1 + return "", "" + + mock_update.frames = [[], [], [], [1]] + mock_update.i = 0 + + update.side_effect = mock_update + + sw.wait_for_init() + + assert mock_update.i == 4 + + +def test_end(): + sub_win.SubscriberWindows.FRAME_DICT = {} + with mock.patch.object(cv2, "namedWindow"), \ + mock.patch("displayarray.window.subscriber_windows.window_commands") as mock_win_cmd, \ + mock.patch("displayarray.window.subscriber_windows.subscriber_dictionary.stop_cam") as mock_stop: + sw = sub_win.SubscriberWindows() + + sw.close_threads = [mock.MagicMock(), mock.MagicMock()] + + sw.end() + + mock_win_cmd.quit.assert_called_once_with(force_all_read=False) + mock_stop.assert_called_once_with("0") + sw.close_threads[0].join.assert_called_once() + sw.close_threads[1].join.assert_called_once() + + +def test_enter_exit(): + sub_win.SubscriberWindows.FRAME_DICT = {} + with mock.patch.object(cv2, "namedWindow"), \ + mock.patch.object(sub_win.SubscriberWindows, "end")as end: + with sub_win.SubscriberWindows() as sw: + assert isinstance(sw, sub_win.SubscriberWindows) + + end.assert_called_once() + + +def test_del(): + sub_win.SubscriberWindows.FRAME_DICT = {} + with mock.patch.object(cv2, "namedWindow"), \ + mock.patch.object(sub_win.SubscriberWindows, "end")as end: + sw = sub_win.SubscriberWindows() + + del sw + + end.assert_called_once() + + +def test_loop(): + sub_win.SubscriberWindows.FRAME_DICT = {} + with mock.patch.object(cv2, "namedWindow"), \ + mock.patch("displayarray.window.subscriber_windows.window_commands") as mock_win_cmd, \ + mock.patch("displayarray.window.subscriber_windows.subscriber_dictionary.stop_cam") as mock_stop, \ + mock.patch.object(sub_win.SubscriberWindows, "update")as update: + sub_cmd = mock_win_cmd.win_cmd_sub.return_value = mock.MagicMock() + + sw = sub_win.SubscriberWindows() + + def mock_update(): + mock_update.i += 1 + return "", mock_update.keys[mock_update.i] + + mock_update.keys = ["", "", "", "quit"] + mock_update.i = 0 + + update.side_effect = mock_update + + sw.loop() + + assert mock_update.i == 3 + sub_cmd.release.assert_called_once() + mock_win_cmd.quit.assert_called_with(force_all_read=False) + mock_stop.assert_called_with("0") + + +def test_display(): + sub_win.SubscriberWindows.FRAME_DICT = {} + with mock.patch("displayarray.window.subscriber_windows.FrameUpdater") as fup, \ + mock.patch("displayarray.window.subscriber_windows.SubscriberWindows") as sws: + fup_inst = fup.return_value = mock.MagicMock() + sws_inst = sws.return_value = mock.MagicMock() + + d = sub_win.display(0, 1, size=(50, 50)) + + fup.assert_has_calls([ + mock.call(0, fps_limit=240, request_size=(50, 50)), + mock.call(1, fps_limit=240, request_size=(50, 50)) + ]) + assert fup_inst.start.call_count == 2 + sws.assert_called_once_with(window_names=["window 0", "window 1"], video_sources=(0, 1)) + assert sws_inst.close_threads == [fup_inst, fup_inst] + assert d == sws_inst + + +def test_display_blocking(): + sub_win.SubscriberWindows.FRAME_DICT = {} + with mock.patch("displayarray.window.subscriber_windows.FrameUpdater") as fup, \ + mock.patch("displayarray.window.subscriber_windows.SubscriberWindows") as sws: + fup_inst = fup.return_value = mock.MagicMock() + sws_inst = sws.return_value = mock.MagicMock() + + sub_win.display(0, 1, blocking=True) + + assert fup_inst.start.call_count == 2 + sws.assert_called_once_with(window_names=["window 0", "window 1"], video_sources=(0, 1)) + sws_inst.loop.assert_called_once() + assert fup_inst.join.call_count == 2 + + +def test_display_callbacks(): + sub_win.SubscriberWindows.FRAME_DICT = {} + with mock.patch("displayarray.window.subscriber_windows.FrameUpdater") as fup, \ + mock.patch("displayarray.window.subscriber_windows.SubscriberWindows") as sws: + cb = mock.MagicMock() + + sub_win.display(0, 1, callbacks=cb) + + fup.assert_has_calls([ + mock.call(0, callbacks=[cb], fps_limit=240, request_size=(-1, -1)), + mock.call(1, callbacks=[cb], fps_limit=240, request_size=(-1, -1)) + ]) + + fup.reset_mock() + + cb2 = mock.MagicMock() + + sub_win.display(0, 1, callbacks=[cb, cb2], fps_limit=60) + + fup.assert_has_calls([ + mock.call(0, callbacks=[cb, cb2], fps_limit=60, request_size=(-1, -1)), + mock.call(1, callbacks=[cb, cb2], fps_limit=60, request_size=(-1, -1)) + ]) + + +def test_display_callbacks_dict(): + sub_win.SubscriberWindows.FRAME_DICT = {} + with mock.patch("displayarray.window.subscriber_windows.FrameUpdater") as fup, \ + mock.patch("displayarray.window.subscriber_windows.SubscriberWindows") as sws: + cb1 = mock.MagicMock() + cb2 = mock.MagicMock() + cb3 = mock.MagicMock() + + sub_win.display(0, 1, 2, callbacks={0: cb1, 1: [cb1, cb2], "2": [cb3]}) + + fup.assert_has_calls([ + mock.call(0, callbacks=[cb1], fps_limit=240, request_size=(-1, -1)), + mock.call(1, callbacks=[cb1, cb2], fps_limit=240, request_size=(-1, -1)), + mock.call(2, callbacks=[cb3], fps_limit=240, request_size=(-1, -1)) + ])