added unit tests for frame and window
This commit is contained in:
@@ -15,7 +15,7 @@ def global_cv_display_callback(frame: np.ndarray, cam_id: Union[int, str]):
|
||||
"""
|
||||
from displayarray.window import SubscriberWindows
|
||||
|
||||
SubscriberWindows.FRAME_DICT[str(cam_id) + "frame"] = frame
|
||||
SubscriberWindows.FRAME_DICT[str(cam_id)] = frame
|
||||
|
||||
|
||||
class function_display_callback(object): # NOSONAR
|
||||
|
||||
@@ -14,7 +14,7 @@ class SelectChannels(object):
|
||||
|
||||
def __call__(self, arr):
|
||||
self.num_input_channels = arr.shape[-1]
|
||||
out_arr = [arr[..., min(max(0, x), arr.shape[-1]-1)] for x in self.selected_channels]
|
||||
out_arr = [arr[..., min(max(0, x), arr.shape[-1] - 1)] for x in self.selected_channels]
|
||||
out_arr = np.stack(out_arr, axis=-1)
|
||||
return out_arr
|
||||
|
||||
|
||||
@@ -46,7 +46,9 @@ class FrameUpdater(threading.Thread):
|
||||
if frame is not None:
|
||||
try:
|
||||
for c in self.callbacks:
|
||||
frame = c(frame)
|
||||
frame_c = c(frame)
|
||||
if frame_c is not None:
|
||||
frame = frame_c
|
||||
if frame.shape[-1] not in [1, 3] and len(frame.shape) != 2:
|
||||
print(f"Too many channels in output. (Got {frame.shape[-1]} instead of 1 or 3.) "
|
||||
f"Frame selection callback added.")
|
||||
|
||||
@@ -30,7 +30,7 @@ class SubscriberWindows(object):
|
||||
callbacks: Optional[List[Callable[[np.ndarray], Any]]] = None,
|
||||
):
|
||||
self.source_names: List[Union[str, int]] = []
|
||||
self.close_threads: Optional[List[Thread]] = None
|
||||
self.close_threads: Optional[List[Thread]] = []
|
||||
self.frames: List[np.ndarray] = []
|
||||
self.input_vid_global_names: List[str] = []
|
||||
self.window_names: List[str] = []
|
||||
@@ -60,7 +60,7 @@ class SubscriberWindows(object):
|
||||
"""Add another source for this class to display."""
|
||||
uid = uid_for_source(name)
|
||||
self.source_names.append(uid)
|
||||
self.input_vid_global_names.append(uid + "frame")
|
||||
self.input_vid_global_names.append(uid)
|
||||
self.input_cams.append(name)
|
||||
return self
|
||||
|
||||
@@ -72,9 +72,6 @@ class SubscriberWindows(object):
|
||||
cv2.setMouseCallback(name + " (press ESC to quit)", m)
|
||||
return self
|
||||
|
||||
def del_window(self, name):
|
||||
cv2.setMouseCallback(name + " (press ESC to quit)", lambda *args: None)
|
||||
|
||||
def add_callback(self, callback):
|
||||
"""Add a callback for this class to apply to videos."""
|
||||
self.callbacks.append(callback)
|
||||
@@ -100,11 +97,7 @@ class SubscriberWindows(object):
|
||||
window_commands.key_pub.publish(chr(key_input))
|
||||
except ValueError:
|
||||
warnings.warn(
|
||||
RuntimeWarning(
|
||||
"Unknown key code: [{}]. Please report to cv_pubsubs issue page.".format(
|
||||
key_input
|
||||
)
|
||||
)
|
||||
RuntimeWarning(f"Unknown key code: [{key_input}]. Please report to the displayarray issue page.")
|
||||
)
|
||||
|
||||
def handle_mouse(self, event, x, y, flags, param):
|
||||
@@ -112,7 +105,7 @@ class SubscriberWindows(object):
|
||||
mousey = MouseEvent(event, x, y, flags, param)
|
||||
window_commands.mouse_pub.publish(mousey)
|
||||
|
||||
def _display_frames(self, frames, win_num, ids=None):
|
||||
def _display_frames(self, frames, win_num=0, ids=None):
|
||||
if isinstance(frames, Exception):
|
||||
raise frames
|
||||
for f in range(len(frames)):
|
||||
@@ -120,51 +113,57 @@ class SubscriberWindows(object):
|
||||
if (
|
||||
isinstance(frames[f], (list, tuple))
|
||||
or frames[f].dtype.num == 17
|
||||
or len(frames[f].shape) > 3
|
||||
or (len(frames[f].shape) != 2 and (len(frames[f].shape) != 3 or frames[f].shape[-1] != 3))
|
||||
):
|
||||
win_num = self._display_frames(frames[f], win_num, ids)
|
||||
else:
|
||||
if len(self.window_names) <= win_num:
|
||||
self.add_window(str(win_num))
|
||||
cv2.imshow(
|
||||
self.window_names[win_num % len(self.window_names)]
|
||||
self.window_names[win_num]
|
||||
+ " (press ESC to quit)",
|
||||
frames[f],
|
||||
)
|
||||
win_num += 1
|
||||
return win_num
|
||||
|
||||
def __check_too_many_channels(self):
|
||||
for f in range(len(self.frames)):
|
||||
if isinstance(self.frames[f], Exception):
|
||||
raise self.frames[f]
|
||||
if self.frames[f].shape[-1] not in [1, 3] and len(self.frames[f].shape) != 2:
|
||||
print(f"Too many channels in output. (Got {self.frames[f].shape[-1]} instead of 1 or 3.) "
|
||||
f"Frame selection callback added.")
|
||||
print("Ctrl+scroll to change first channel.\n"
|
||||
"Shift+scroll to change second channel.\n"
|
||||
"Alt+scroll to change third channel.")
|
||||
sel = SelectChannels()
|
||||
sel.enable_mouse_control()
|
||||
sel.mouse_print_channels = True
|
||||
self.callbacks.append(sel)
|
||||
for fr in range(len(self.frames)):
|
||||
self.frames[fr] = self.callbacks[-1](self.frames[fr])
|
||||
break
|
||||
|
||||
def update_window_frames(self):
|
||||
"""Update the windows with the newest data for all frames."""
|
||||
win_num = 0
|
||||
self.frames = []
|
||||
for i in range(len(self.input_vid_global_names)):
|
||||
if self.input_vid_global_names[i] in self.FRAME_DICT and not isinstance(
|
||||
self.FRAME_DICT[self.input_vid_global_names[i]], NoData
|
||||
):
|
||||
self.frames = self.FRAME_DICT[self.input_vid_global_names[i]]
|
||||
self.frames.append(self.FRAME_DICT[self.input_vid_global_names[i]])
|
||||
if isinstance(self.frames, np.ndarray) and len(self.frames.shape) <= 3:
|
||||
self.frames = [self.frames]
|
||||
if len(self.callbacks) > 0:
|
||||
for c in self.callbacks:
|
||||
for f in range(len(self.frames)):
|
||||
frame = c(self.frames[f])
|
||||
if frame is not None:
|
||||
self.frames[f] = frame
|
||||
for f in range(len(self.frames)):
|
||||
if self.frames[f].shape[-1] not in [1, 3] and len(self.frames[f].shape) != 2:
|
||||
print(f"Too many channels in output. (Got {self.frames[f].shape[-1]} instead of 1 or 3.) "
|
||||
f"Frame selection callback added.")
|
||||
print("Ctrl+scroll to change first channel.\n"
|
||||
"Shift+scroll to change second channel.\n"
|
||||
"Alt+scroll to change third channel.")
|
||||
sel = SelectChannels()
|
||||
sel.enable_mouse_control()
|
||||
sel.mouse_print_channels = True
|
||||
self.callbacks.append(sel)
|
||||
for fr in range(len(self.frames)):
|
||||
self.frames[fr] = self.callbacks[-1](self.frames[fr])
|
||||
break
|
||||
win_num = self._display_frames(self.frames, win_num)
|
||||
frame = c(self.frames[-1])
|
||||
if frame is not None:
|
||||
self.frames[-1] = frame
|
||||
self.__check_too_many_channels()
|
||||
self._display_frames(self.frames)
|
||||
|
||||
def update(self, arr=None, id=None):
|
||||
def update(self, arr:np.ndarray=None, id:str=None):
|
||||
"""Update window frames once. Optionally add a new input and input id."""
|
||||
if arr is not None and id is not None:
|
||||
global_cv_display_callback(arr, id)
|
||||
@@ -189,9 +188,8 @@ class SubscriberWindows(object):
|
||||
"""Close all threads. Should be used with non-blocking mode."""
|
||||
window_commands.quit(force_all_read=False)
|
||||
self.__stop_all_cams()
|
||||
if self.close_threads is not None:
|
||||
for t in self.close_threads:
|
||||
t.join()
|
||||
for t in self.close_threads:
|
||||
t.join()
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
@@ -203,7 +201,6 @@ class SubscriberWindows(object):
|
||||
self.end()
|
||||
|
||||
def __delete__(self, instance):
|
||||
del self.handle_mouse
|
||||
self.end()
|
||||
|
||||
def loop(self):
|
||||
@@ -228,9 +225,15 @@ def _get_video_callback_dict_threads(
|
||||
v_name = uid_for_source(v)
|
||||
v_callbacks: List[Callable[[np.ndarray], Any]] = []
|
||||
if v_name in callbacks:
|
||||
v_callbacks.append(callbacks[v_name])
|
||||
if isinstance(callbacks[v_name], List):
|
||||
v_callbacks.extend(callbacks[v_name])
|
||||
elif callable(callbacks[v_name]):
|
||||
v_callbacks.append(callbacks[v_name])
|
||||
if v in callbacks:
|
||||
v_callbacks.append(callbacks[v])
|
||||
if isinstance(callbacks[v], List):
|
||||
v_callbacks.extend(callbacks[v])
|
||||
elif callable(callbacks[v]):
|
||||
v_callbacks.append(callbacks[v])
|
||||
vid_threads.append(FrameUpdater(v, callbacks=v_callbacks, fps_limit=fps, request_size=size))
|
||||
return vid_threads
|
||||
|
||||
|
||||
@@ -0,0 +1,120 @@
|
||||
from displayarray.frame.frame_publishing import pub_cam_loop, pub_cam_thread
|
||||
import displayarray
|
||||
import mock
|
||||
import pytest
|
||||
import cv2
|
||||
from displayarray.frame.np_to_opencv import NpCam
|
||||
import numpy as np
|
||||
import displayarray.frame.subscriber_dictionary as subd
|
||||
import displayarray.frame.frame_publishing as fpub
|
||||
|
||||
|
||||
def test_pub_cam_loop_exit():
|
||||
not_a_camera = mock.MagicMock()
|
||||
with pytest.raises(TypeError):
|
||||
pub_cam_loop(not_a_camera)
|
||||
|
||||
|
||||
def test_pub_cam_int():
|
||||
img = np.zeros((30, 40))
|
||||
with mock.patch.object(cv2, "VideoCapture", new_callable=mock.MagicMock) as mock_cv_capture, \
|
||||
mock.patch.object(NpCam, "set"), \
|
||||
mock.patch.object(NpCam, "get") as mock_get, \
|
||||
mock.patch.object(NpCam, "release"), \
|
||||
mock.patch.object(displayarray.frame.frame_publishing.subscriber_dictionary, "register_cam") as reg_cam, \
|
||||
mock.patch.object(displayarray.frame.frame_publishing.subscriber_dictionary, "cam_cmd_sub") as cam_cmd_sub:
|
||||
cap = NpCam(img)
|
||||
mock_cv_capture.return_value = cap
|
||||
mock_sub = cam_cmd_sub.return_value = mock.MagicMock()
|
||||
mock_sub.get = mock.MagicMock()
|
||||
mock_sub.get.side_effect = ["", "", "", "quit"]
|
||||
mock_sub.release = mock.MagicMock()
|
||||
mock_get.return_value = 2
|
||||
|
||||
cam_0 = subd.CV_CAMS_DICT['0'] = subd.Cam('0')
|
||||
with mock.patch.object(cam_0.frame_pub, "publish") as cam_pub:
|
||||
pub_cam_loop(0, high_speed=False)
|
||||
|
||||
cam_pub.assert_has_calls(
|
||||
[mock.call(img)] * 4
|
||||
)
|
||||
|
||||
reg_cam.assert_called_once_with('0')
|
||||
cam_cmd_sub.assert_called_once_with('0')
|
||||
|
||||
cap.set.assert_has_calls(
|
||||
[mock.call(cv2.CAP_PROP_FRAME_WIDTH, -1),
|
||||
mock.call(cv2.CAP_PROP_FRAME_HEIGHT, -1)]
|
||||
)
|
||||
cap.get.assert_has_calls(
|
||||
[mock.call(cv2.CAP_PROP_FRAME_COUNT)] * 8
|
||||
)
|
||||
mock_sub.get.assert_has_calls([mock.call(), mock.call(), mock.call(), mock.call()])
|
||||
mock_sub.release.assert_called_once()
|
||||
cap.release.assert_called_once()
|
||||
|
||||
subd.CV_CAMS_DICT = {}
|
||||
|
||||
|
||||
def test_pub_cam_fail():
|
||||
img = np.zeros((30, 40))
|
||||
with mock.patch.object(cv2, "VideoCapture", new_callable=mock.MagicMock) as mock_cv_capture, \
|
||||
mock.patch.object(NpCam, "isOpened") as mock_is_open, \
|
||||
mock.patch.object(subd, "register_cam") as mock_reg:
|
||||
cap = NpCam(img)
|
||||
mock_cv_capture.side_effect = [cap]
|
||||
|
||||
mock_is_open.return_value = False
|
||||
subd.CV_CAMS_DICT['0'] = subd.Cam('0')
|
||||
|
||||
with mock.patch.object(subd.CV_CAMS_DICT['0'].status_pub, "publish") as mock_fail_pub:
|
||||
pub_cam_loop(0, high_speed=False)
|
||||
|
||||
mock_fail_pub.assert_called_once_with("failed")
|
||||
|
||||
subd.CV_CAMS_DICT = {}
|
||||
|
||||
|
||||
def test_pub_cam_high_speed():
|
||||
img = np.zeros((30, 40))
|
||||
with mock.patch.object(cv2, "VideoCapture", new_callable=mock.MagicMock) as mock_cv_capture, \
|
||||
mock.patch.object(NpCam, "isOpened") as mock_is_open, \
|
||||
mock.patch.object(NpCam, "set") as mock_cam_set:
|
||||
cap = NpCam(img)
|
||||
mock_cv_capture.side_effect = [cap]
|
||||
|
||||
mock_is_open.return_value = False
|
||||
|
||||
pub_cam_loop(0, request_size=(640, 480), high_speed=True)
|
||||
|
||||
mock_cam_set.assert_has_calls(
|
||||
[mock.call(cv2.CAP_PROP_FOURCC, cv2.CAP_OPENCV_MJPEG),
|
||||
mock.call(cv2.CAP_PROP_FRAME_WIDTH, 640),
|
||||
mock.call(cv2.CAP_PROP_FRAME_HEIGHT, 480)]
|
||||
)
|
||||
|
||||
|
||||
def test_pub_cam_numpy():
|
||||
with mock.patch("displayarray.frame.frame_publishing.uid_for_source", new_callable=mock.MagicMock) as mock_uidfs, \
|
||||
mock.patch.object(NpCam, "read")as mock_np_read, \
|
||||
mock.patch.object(subd, "register_cam"):
|
||||
img = np.zeros((30, 40))
|
||||
mock_np_read.side_effect = [(True, img), (True, img), (True, img), (False, None)]
|
||||
mock_uidfs.return_value = '0'
|
||||
cam_0 = subd.CV_CAMS_DICT['0'] = subd.Cam('0')
|
||||
with mock.patch.object(cam_0.frame_pub, "publish") as cam_pub:
|
||||
pub_cam_loop(img)
|
||||
cam_pub.assert_has_calls(
|
||||
[mock.call(img)] * 3
|
||||
)
|
||||
subd.CV_CAMS_DICT = {}
|
||||
|
||||
|
||||
def test_pub_cam_thread():
|
||||
with mock.patch("displayarray.frame.frame_publishing.threading.Thread", new_callable=mock.MagicMock) as mock_thread:
|
||||
thread_instance = mock_thread.return_value = mock.MagicMock()
|
||||
|
||||
pub_cam_thread(5)
|
||||
|
||||
mock_thread.assert_called_once_with(target=fpub.pub_cam_loop, args=(5, (-1, -1), True, 240))
|
||||
thread_instance.start.assert_called_once()
|
||||
@@ -41,7 +41,6 @@ def test_loop():
|
||||
pub_t = mock_pubcam_thread.return_value = mock.MagicMock()
|
||||
mock_cam_dict.__contains__.side_effect = itertools.cycle([False, False, True])
|
||||
sub_cam = mock_frame_sub.return_value = mock.MagicMock()
|
||||
sub_cam.get = mock.MagicMock()
|
||||
frame = sub_cam.get.return_value = mock.MagicMock()
|
||||
transformed_frame = mock.MagicMock()
|
||||
mock_cbs[0].return_value = transformed_frame
|
||||
@@ -78,16 +77,15 @@ def test_callback_exception():
|
||||
def test_display():
|
||||
with mock.patch("displayarray.window.SubscriberWindows", new_callable=mock.MagicMock) as mock_sub_win:
|
||||
f = fup.FrameUpdater()
|
||||
f.start = mock.MagicMock()
|
||||
f.join = mock.MagicMock()
|
||||
mock_sub_win_instance = mock_sub_win.return_value = mock.MagicMock()
|
||||
with mock.patch.object(f, "start"), mock.patch.object(f, "join"):
|
||||
mock_sub_win_instance = mock_sub_win.return_value = mock.MagicMock()
|
||||
|
||||
f.display()
|
||||
f.display()
|
||||
|
||||
mock_sub_win.assert_called_once_with(video_sources=["0"], callbacks=[])
|
||||
mock_sub_win_instance.loop.assert_called_once()
|
||||
f.start.assert_called_once()
|
||||
f.join.assert_called_once()
|
||||
mock_sub_win.assert_called_once_with(video_sources=["0"], callbacks=[])
|
||||
mock_sub_win_instance.loop.assert_called_once()
|
||||
f.start.assert_called_once()
|
||||
f.join.assert_called_once()
|
||||
|
||||
|
||||
def test_display_exception():
|
||||
@@ -107,13 +105,17 @@ from displayarray.window.window_commands import win_cmd_pub
|
||||
|
||||
def test_display_many_channels():
|
||||
with mock.patch("displayarray.frame.frame_updater.pub_cam_thread"), \
|
||||
mock.patch("displayarray.frame.frame_updater.subscriber_dictionary.CV_CAMS_DICT") as mock_cam_dict, \
|
||||
mock.patch.object(fup.subscriber_dictionary, "CV_CAMS_DICT") as mock_cam_dict, \
|
||||
mock.patch.object(fup.subscriber_dictionary, "cam_frame_sub") as mock_sub_cam, \
|
||||
mock.patch("displayarray.frame.frame_updater.subscriber_dictionary.handler_cmd_sub") as handler_cmd_sub:
|
||||
mock_cam_dict.__contains__.side_effect = itertools.cycle([False, False, True])
|
||||
mock_sub_owner = handler_cmd_sub.return_value = mock.MagicMock()
|
||||
mock_sub_owner.get.side_effect = ["", "", "", "quit"]
|
||||
|
||||
arr = np.ones((20, 20, 20))
|
||||
sub = mock.MagicMock()
|
||||
sub.get.return_value = arr
|
||||
mock_sub_cam.return_value = sub
|
||||
|
||||
f = fup.FrameUpdater(arr)
|
||||
|
||||
@@ -1,22 +0,0 @@
|
||||
import displayarray.frame as w
|
||||
import unittest as ut
|
||||
|
||||
|
||||
class TestFrameHandler(ut.TestCase):
|
||||
i = 0
|
||||
|
||||
def test_handler(self):
|
||||
def test_frame_handler(frame, cam_id):
|
||||
if self.i == 200:
|
||||
w.subscriber_dictionary.stop_cam(cam_id)
|
||||
if self.i % 100 == 0:
|
||||
print(frame.shape)
|
||||
self.i += 1
|
||||
|
||||
w.FrameUpdater(
|
||||
0,
|
||||
[test_frame_handler],
|
||||
request_size=(1280, 720),
|
||||
high_speed=True,
|
||||
fps_limit=240,
|
||||
)
|
||||
@@ -1,152 +0,0 @@
|
||||
import unittest as ut
|
||||
|
||||
import displayarray.frame as w
|
||||
from displayarray.window import SubscriberWindows
|
||||
from displayarray import display
|
||||
from displayarray.input import mouse_loop, key_loop, MouseEvent
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
class TestSubWin(ut.TestCase):
|
||||
def test_mouse_loop(self):
|
||||
@mouse_loop
|
||||
def print_mouse_thread(mouse_event):
|
||||
print(mouse_event)
|
||||
|
||||
display("fractal test.mp4", blocking=True)
|
||||
|
||||
def test_key_loop(self):
|
||||
@key_loop
|
||||
def print_key_thread(key_chr):
|
||||
print("key pressed: " + str(key_chr))
|
||||
|
||||
w.FrameUpdater().display()
|
||||
|
||||
def test_sub(self):
|
||||
w.FrameUpdater().display()
|
||||
|
||||
def test_image(self):
|
||||
img = np.random.uniform(0, 1, (300, 300, 3))
|
||||
w.FrameUpdater(video_source=img).display()
|
||||
|
||||
def test_image_args(self):
|
||||
img = np.random.uniform(0, 1, (30, 30, 3))
|
||||
w.FrameUpdater(video_source=img, request_size=(300, -1)).display()
|
||||
|
||||
def test_sub_with_args(self):
|
||||
video_thread = w.FrameUpdater(
|
||||
video_source=0, request_size=(800, 600), high_speed=False, fps_limit=8
|
||||
)
|
||||
|
||||
video_thread.display()
|
||||
|
||||
def test_sub_with_callback(self):
|
||||
def redden_frame_print_spam(frame):
|
||||
frame[:, :, 0] = 0
|
||||
frame[:, :, 2] = 0
|
||||
|
||||
w.FrameUpdater(callbacks=redden_frame_print_spam).display()
|
||||
|
||||
def test_sub_with_callback_exception(self):
|
||||
def redden_frame_print_spam(frame):
|
||||
frame[:, :, 0] = 0
|
||||
frame[:, :, 2] = 1 / 0
|
||||
|
||||
with self.assertRaises(ZeroDivisionError) as e:
|
||||
v = w.FrameUpdater(callbacks=redden_frame_print_spam)
|
||||
v.display()
|
||||
self.assertEqual(v.exception_raised, e)
|
||||
|
||||
def test_multi_cams_one_source(self):
|
||||
display(0, window_names=["cammy", "cammy2"], blocking=True)
|
||||
|
||||
def test_multi_cams_multi_source(self):
|
||||
display(0, np.random.uniform(0.0, 1.0, (500, 500)), blocking=True)
|
||||
|
||||
def test_nested_frames(self):
|
||||
def nest_frame(frame):
|
||||
frame = np.asarray([[[[[[frame]]]]], [[[[[frame]]], [[[frame]]]]]])
|
||||
return frame
|
||||
|
||||
display(0, callbacks=nest_frame, window_names=["1", "2", "3"], blocking=True)
|
||||
|
||||
def test_nested_frames_exception(self):
|
||||
def nest_frame(frame):
|
||||
frame = np.asarray([[[[[[frame + 1 / 0]]]]], [[[[[frame]]], [[[frame]]]]]])
|
||||
return frame
|
||||
|
||||
v = w.FrameUpdater(callbacks=[nest_frame])
|
||||
v.start()
|
||||
|
||||
with self.assertRaises(ZeroDivisionError) as e:
|
||||
SubscriberWindows(
|
||||
window_names=[str(i) for i in range(3)], video_sources=[str(0)]
|
||||
).loop()
|
||||
self.assertEqual(v.exception_raised, e)
|
||||
|
||||
v.join()
|
||||
|
||||
def test_conway_life(self):
|
||||
from displayarray.frame import FrameUpdater
|
||||
from displayarray.callbacks import function_display_callback
|
||||
import numpy as np
|
||||
import cv2
|
||||
|
||||
img = np.zeros((50, 50, 1))
|
||||
img[0:5, 0:5, :] = 1
|
||||
|
||||
def conway(array, coords, finished):
|
||||
neighbors = np.sum(
|
||||
array[
|
||||
max(coords[0] - 1, 0): min(coords[0] + 2, 50),
|
||||
max(coords[1] - 1, 0): min(coords[1] + 2, 50),
|
||||
]
|
||||
)
|
||||
neighbors = max(neighbors - np.sum(array[coords[0:2]]), 0.0)
|
||||
if array[coords] == 1.0:
|
||||
if neighbors < 2 or neighbors > 3:
|
||||
array[coords] = 0.0
|
||||
elif 2 <= neighbors <= 3:
|
||||
array[coords] = 1.0
|
||||
else:
|
||||
if neighbors == 3:
|
||||
array[coords] = 1.0
|
||||
|
||||
@mouse_loop
|
||||
def conway_add(
|
||||
mouse_event # type:MouseEvent
|
||||
):
|
||||
if 0 <= mouse_event.x < 50 and 0 <= mouse_event.y < 50:
|
||||
if mouse_event.flags == cv2.EVENT_FLAG_LBUTTON:
|
||||
img[
|
||||
mouse_event.y - 5: mouse_event.y + 10,
|
||||
mouse_event.x - 5: mouse_event.x + 10,
|
||||
:,
|
||||
] = 0.0
|
||||
elif mouse_event.flags == cv2.EVENT_FLAG_RBUTTON:
|
||||
img[
|
||||
mouse_event.y - 5: mouse_event.y + 10,
|
||||
mouse_event.x - 5: mouse_event.x + 10,
|
||||
:,
|
||||
] = 1.0
|
||||
|
||||
FrameUpdater(
|
||||
video_source=img, callbacks=function_display_callback(conway)
|
||||
).display()
|
||||
|
||||
def test_double_win(self):
|
||||
vid1 = np.ones((100, 100))
|
||||
vid2 = np.zeros((100, 100))
|
||||
t1 = w.FrameUpdater(vid1)
|
||||
t2 = w.FrameUpdater(vid2)
|
||||
t1.start()
|
||||
t2.start()
|
||||
SubscriberWindows(
|
||||
window_names=["cammy", "cammy2"], video_sources=[vid1, vid2]
|
||||
).loop()
|
||||
t1.join()
|
||||
t1.join()
|
||||
|
||||
def test_display(self):
|
||||
display(np.ones((100, 100)), np.zeros((100, 100)), blocking=True)
|
||||
@@ -1,120 +0,0 @@
|
||||
from displayarray.frame.frame_publishing import pub_cam_loop, pub_cam_thread
|
||||
import displayarray
|
||||
import mock
|
||||
import pytest
|
||||
import cv2
|
||||
from displayarray.frame.np_to_opencv import NpCam
|
||||
import numpy as np
|
||||
import displayarray.frame.subscriber_dictionary as subd
|
||||
import displayarray.frame.frame_publishing as fpub
|
||||
|
||||
|
||||
def test_pub_cam_loop_exit():
|
||||
not_a_camera = mock.MagicMock()
|
||||
with pytest.raises(TypeError):
|
||||
pub_cam_loop(not_a_camera)
|
||||
|
||||
|
||||
def test_pub_cam_int():
|
||||
img = np.zeros((30, 40))
|
||||
with mock.patch.object(cv2, "VideoCapture", new_callable=mock.MagicMock) as mock_cv_capture:
|
||||
cap = NpCam(img)
|
||||
mock_cv_capture.return_value = cap
|
||||
reg_cam = displayarray.frame.frame_publishing.subscriber_dictionary.register_cam = mock.MagicMock()
|
||||
cam_cmd_sub = displayarray.frame.frame_publishing.subscriber_dictionary.cam_cmd_sub = mock.MagicMock()
|
||||
mock_sub = cam_cmd_sub.return_value = mock.MagicMock()
|
||||
mock_sub.get = mock.MagicMock()
|
||||
mock_sub.get.side_effect = ["", "", "", "quit"]
|
||||
mock_sub.release = mock.MagicMock()
|
||||
cap.set = mock.MagicMock()
|
||||
cap.get = mock.MagicMock()
|
||||
cap.get.return_value = 2
|
||||
cap.release = mock.MagicMock()
|
||||
|
||||
cam_0 = subd.CV_CAMS_DICT['0'] = subd.Cam('0')
|
||||
cam_pub = cam_0.frame_pub.publish = mock.MagicMock()
|
||||
|
||||
pub_cam_loop(0, high_speed=False)
|
||||
|
||||
cam_pub.assert_has_calls(
|
||||
[mock.call(img)] * 4
|
||||
)
|
||||
|
||||
reg_cam.assert_called_once_with('0')
|
||||
cam_cmd_sub.assert_called_once_with('0')
|
||||
|
||||
cap.set.assert_has_calls(
|
||||
[mock.call(cv2.CAP_PROP_FRAME_WIDTH, 1280),
|
||||
mock.call(cv2.CAP_PROP_FRAME_HEIGHT, 720)]
|
||||
)
|
||||
cap.get.assert_has_calls(
|
||||
[mock.call(cv2.CAP_PROP_FRAME_COUNT)] * 8
|
||||
)
|
||||
mock_sub.get.assert_has_calls([mock.call(), mock.call(), mock.call(), mock.call()])
|
||||
mock_sub.release.assert_called_once()
|
||||
cap.release.assert_called_once()
|
||||
|
||||
|
||||
def test_pub_cam_fail():
|
||||
img = np.zeros((30, 40))
|
||||
with mock.patch.object(cv2, "VideoCapture", new_callable=mock.MagicMock) as mock_cv_capture:
|
||||
cap = NpCam(img)
|
||||
mock_cv_capture.side_effect = [cap]
|
||||
|
||||
cap.isOpened = mock.MagicMock()
|
||||
cap.isOpened.return_value = False
|
||||
subd.register_cam = mock.MagicMock()
|
||||
subd.CV_CAMS_DICT['0'] = subd.Cam('0')
|
||||
|
||||
mock_fail_pub = \
|
||||
subd.CV_CAMS_DICT['0'].status_pub.publish = \
|
||||
mock.MagicMock()
|
||||
|
||||
pub_cam_loop(0, high_speed=False)
|
||||
|
||||
mock_fail_pub.assert_called_once_with("failed")
|
||||
|
||||
|
||||
def test_pub_cam_high_speed():
|
||||
img = np.zeros((30, 40))
|
||||
with mock.patch.object(cv2, "VideoCapture", new_callable=mock.MagicMock) as mock_cv_capture:
|
||||
cap = NpCam(img)
|
||||
mock_cv_capture.side_effect = [cap]
|
||||
|
||||
cap.isOpened = mock.MagicMock()
|
||||
cap.isOpened.return_value = False
|
||||
cap.set = mock.MagicMock()
|
||||
|
||||
pub_cam_loop(0, request_size=(640, 480), high_speed=True)
|
||||
|
||||
cap.set.assert_has_calls(
|
||||
[mock.call(cv2.CAP_PROP_FOURCC, cv2.CAP_OPENCV_MJPEG),
|
||||
mock.call(cv2.CAP_PROP_FRAME_WIDTH, 640),
|
||||
mock.call(cv2.CAP_PROP_FRAME_HEIGHT, 480)]
|
||||
)
|
||||
|
||||
|
||||
def test_pub_cam_numpy():
|
||||
with mock.patch("displayarray.frame.frame_publishing.uid_for_source", new_callable=mock.MagicMock) as mock_uidfs:
|
||||
img = np.zeros((30, 40))
|
||||
NpCam.read = mock.MagicMock()
|
||||
NpCam.read.side_effect = [(True, img), (True, img), (True, img), (False, None)]
|
||||
subd.register_cam = mock.MagicMock()
|
||||
mock_uidfs.return_value = '0'
|
||||
cam_0 = subd.CV_CAMS_DICT['0'] = subd.Cam('0')
|
||||
cam_pub = cam_0.frame_pub.publish = mock.MagicMock()
|
||||
|
||||
pub_cam_loop(img)
|
||||
cam_pub.assert_has_calls(
|
||||
[mock.call(img)] * 3
|
||||
)
|
||||
|
||||
|
||||
def test_pub_cam_thread():
|
||||
with mock.patch("displayarray.frame.frame_publishing.threading.Thread", new_callable=mock.MagicMock) as mock_thread:
|
||||
thread_instance = mock_thread.return_value = mock.MagicMock()
|
||||
|
||||
pub_cam_thread(5)
|
||||
|
||||
mock_thread.assert_called_once_with(target=fpub.pub_cam_loop, args=(5, (1280, 720), True, 240))
|
||||
thread_instance.start.assert_called_once()
|
||||
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user