Added actual tensor displaying. Added more tests. Added select channels.

This commit is contained in:
simleek
2019-10-11 22:58:42 -07:00
parent 60a29ddd9c
commit a3ba16d56c
15 changed files with 413 additions and 70 deletions

View File

@ -23,12 +23,12 @@ class function_display_callback(object): # NOSONAR
Used for running arbitrary functions on pixels.
>>> import random
>>> from displayarray.frame import VideoHandlerThread
>>> from displayarray.frame import FrameUpdater
>>> img = np.zeros((300, 300, 3))
>>> def fun(array, coords, finished):
... r,g,b = random.random()/20.0, random.random()/20.0, random.random()/20.0
... array[coords[0:2]] = (array[coords[0:2]] + [r,g,b])%1.0
>>> VideoHandlerThread(video_source=img, callbacks=function_display_callback(fun)).display()
>>> FrameUpdater(video_source=img, callbacks=function_display_callback(fun)).display()
:param display_function: a function to run on the input image.
:param finish_function: a function to run on the input image when the other function finishes.

View File

@ -0,0 +1,53 @@
import numpy as np
from ..input import mouse_loop
import cv2
class SelectChannels(object):
def __init__(self, selected_channels=None):
if selected_channels is None:
selected_channels = [0, 0, 0]
self.selected_channels = selected_channels
self.mouse_control = None
self.mouse_print_channels = False
self.num_input_channels = None
def __call__(self, arr):
self.num_input_channels = arr.shape[-1]
out_arr = [arr[..., min(max(0, x), arr.shape[-1]-1)] for x in self.selected_channels]
out_arr = np.stack(out_arr, axis=-1)
return out_arr
def enable_mouse_control(self):
@mouse_loop
def m_loop(me):
if me.event == cv2.EVENT_MOUSEWHEEL:
if me.flags & cv2.EVENT_FLAG_CTRLKEY:
if me.flags > 0:
self.selected_channels[0] += 1
self.selected_channels[0] = min(self.selected_channels[0], self.num_input_channels - 1)
else:
self.selected_channels[0] -= 1
self.selected_channels[0] = max(self.selected_channels[0], 0)
if self.mouse_print_channels:
print(f"Channel 0 now maps to {self.selected_channels[0]}.")
elif me.flags & cv2.EVENT_FLAG_SHIFTKEY:
if me.flags > 0:
self.selected_channels[1] += 1
self.selected_channels[1] = min(self.selected_channels[1], self.num_input_channels - 1)
else:
self.selected_channels[1] -= 1
self.selected_channels[1] = max(self.selected_channels[1], 0)
if self.mouse_print_channels:
print(f"Channel 1 now maps to {self.selected_channels[1]}.")
elif me.flags & cv2.EVENT_FLAG_ALTKEY:
if me.flags > 0:
self.selected_channels[2] += 1
self.selected_channels[2] = min(self.selected_channels[2], self.num_input_channels - 1)
else:
self.selected_channels[2] -= 1
self.selected_channels[2] = max(self.selected_channels[2], 0)
if self.mouse_print_channels:
print(f"Channel 2 now maps to {self.selected_channels[2]}.")
self.mouse_control = m_loop

View File

@ -9,7 +9,7 @@ np_cam simulates numpy arrays as OpenCV cameras
"""
from . import subscriber_dictionary
from .frame_update_thread import VideoHandlerThread
from .frame_updater import FrameUpdater
from .get_frame_ids import get_cam_ids
from .np_to_opencv import NpCam
from .frame_publishing import pub_cam_thread

View File

@ -10,11 +10,10 @@ from displayarray.uid import uid_for_source
from typing import Union, Tuple
def pub_cam_loop(
cam_id: Union[int, str],
request_size: Tuple[int, int] = (1280, 720),
high_speed: bool = False,
cam_id: Union[int, str, np.ndarray],
request_size: Tuple[int, int] = (-1, -1),
high_speed: bool = True,
fps_limit: float = 240,
) -> bool:
"""
@ -82,11 +81,11 @@ def pub_cam_loop(
def pub_cam_thread(
cam_id: Union[int, str],
request_ize: Tuple[int, int] = (1280, 720),
high_speed: bool = False,
request_ize: Tuple[int, int] = (-1, -1),
high_speed: bool = True,
fps_limit: float = 240,
) -> threading.Thread:
"""Run pub_cam_loop in a new thread."""
"""Run pub_cam_loop in a new thread. Starts on creation."""
t = threading.Thread(
target=pub_cam_loop, args=(cam_id, request_ize, high_speed, fps_limit)
)

View File

@ -8,22 +8,23 @@ from displayarray.uid import uid_for_source
from displayarray.frame import subscriber_dictionary
from displayarray.frame.frame_publishing import pub_cam_thread
from displayarray.window import window_commands
from ..effects.select_channels import SelectChannels
FrameCallable = Callable[[np.ndarray], Optional[np.ndarray]]
class VideoHandlerThread(threading.Thread):
"""Thread for publishing frames from a video source."""
class FrameUpdater(threading.Thread):
"""Thread for updating frames from a video source."""
def __init__(
self,
video_source: Union[int, str, np.ndarray] = 0,
callbacks: Optional[Union[List[FrameCallable], FrameCallable]] = None,
request_size: Tuple[int, int] = (-1, -1),
high_speed: bool = True,
fps_limit: float = 240,
self,
video_source: Union[int, str, np.ndarray] = 0,
callbacks: Optional[Union[List[FrameCallable], FrameCallable]] = None,
request_size: Tuple[int, int] = (-1, -1),
high_speed: bool = True,
fps_limit: float = 240,
):
super(VideoHandlerThread, self).__init__(target=self.loop, args=())
super(FrameUpdater, self).__init__(target=self.loop, args=())
self.cam_id = uid_for_source(video_source)
self.video_source = video_source
if callbacks is None:
@ -43,20 +44,27 @@ class VideoHandlerThread(threading.Thread):
def __apply_callbacks_to_frame(self, frame):
if frame is not None:
frame_c = None
for c in self.callbacks:
try:
frame_c = c(frame)
except Exception as e:
self.exception_raised = e
frame = frame_c = self.exception_raised
subscriber_dictionary.stop_cam(self.cam_id)
window_commands.quit()
raise e
if frame_c is not None:
global_cv_display_callback(frame_c, self.cam_id)
else:
global_cv_display_callback(frame, self.cam_id)
try:
for c in self.callbacks:
frame = c(frame)
if frame.shape[-1] not in [1, 3] and len(frame.shape) != 2:
print(f"Too many channels in output. (Got {frame.shape[-1]} instead of 1 or 3.) "
f"Frame selection callback added.")
print("Ctrl+scroll to change first channel.\n"
"Shift+scroll to change second channel.\n"
"Alt+scroll to change third channel.")
sel = SelectChannels()
sel.enable_mouse_control()
sel.mouse_print_channels = True
self.callbacks.append(sel)
frame = self.callbacks[-1](frame)
except Exception as e:
self.exception_raised = e
frame = self.exception_raised
subscriber_dictionary.stop_cam(self.cam_id)
window_commands.quit()
raise e
global_cv_display_callback(frame, self.cam_id)
def loop(self):
"""Continually get frames from the video publisher, run callbacks on them, and listen to commands."""

14
displayarray/util.py Normal file
View File

@ -0,0 +1,14 @@
import weakref
class WeakMethod(weakref.WeakMethod):
"""Pass any method to OpenCV without it keeping a reference forever."""
def __call__(self, *args, **kwargs):
"""Call the actual method this object was made with."""
obj = super().__call__()
func = self._func_ref()
if obj is None or func is None:
return None
meth = self._meth_type(func, obj)
meth(*args, **kwargs)

View File

@ -9,24 +9,12 @@ from localpubsub import NoData
from displayarray.callbacks import global_cv_display_callback
from displayarray.uid import uid_for_source
from displayarray.frame import subscriber_dictionary
from displayarray.frame.frame_update_thread import FrameCallable
from displayarray.frame.frame_update_thread import VideoHandlerThread
from displayarray.frame.frame_updater import FrameCallable
from displayarray.frame.frame_updater import FrameUpdater
from displayarray.input import MouseEvent
from displayarray.window import window_commands
import weakref
class WeakMethod(weakref.WeakMethod):
"""Pass any method to OpenCV without it keeping a reference forever."""
def __call__(self, *args, **kwargs):
"""Call the actual method this object was made with."""
obj = super().__call__()
func = self._func_ref()
if obj is None or func is None:
return None
meth = self._meth_type(func, obj)
meth(*args, **kwargs)
from ..util import WeakMethod
from ..effects.select_channels import SelectChannels
class SubscriberWindows(object):
@ -160,6 +148,20 @@ class SubscriberWindows(object):
frame = c(self.frames[f])
if frame is not None:
self.frames[f] = frame
for f in range(len(self.frames)):
if self.frames[f].shape[-1] not in [1, 3] and len(self.frames[f].shape) != 2:
print(f"Too many channels in output. (Got {self.frames[f].shape[-1]} instead of 1 or 3.) "
f"Frame selection callback added.")
print("Ctrl+scroll to change first channel.\n"
"Shift+scroll to change second channel.\n"
"Alt+scroll to change third channel.")
sel = SelectChannels()
sel.enable_mouse_control()
sel.mouse_print_channels = True
self.callbacks.append(sel)
for fr in range(len(self.frames)):
self.frames[fr] = self.callbacks[-1](self.frames[fr])
break
win_num = self._display_frames(self.frames, win_num)
def update(self, arr=None, id=None):
@ -229,7 +231,7 @@ def _get_video_callback_dict_threads(
v_callbacks.append(callbacks[v_name])
if v in callbacks:
v_callbacks.append(callbacks[v])
vid_threads.append(VideoHandlerThread(v, callbacks=v_callbacks, fps_limit=fps, request_size=size))
vid_threads.append(FrameUpdater(v, callbacks=v_callbacks, fps_limit=fps, request_size=size))
return vid_threads
@ -245,14 +247,14 @@ def _get_video_threads(
vid_threads = _get_video_callback_dict_threads(*vids, callbacks=callbacks, fps=fps, size=size)
elif isinstance(callbacks, List):
for v in vids:
vid_threads.append(VideoHandlerThread(v, callbacks=callbacks, fps_limit=fps, request_size=size))
vid_threads.append(FrameUpdater(v, callbacks=callbacks, fps_limit=fps, request_size=size))
elif callable(callbacks):
for v in vids:
vid_threads.append(VideoHandlerThread(v, callbacks=[callbacks], fps_limit=fps, request_size=size))
vid_threads.append(FrameUpdater(v, callbacks=[callbacks], fps_limit=fps, request_size=size))
else:
for v in vids:
if v is not None:
vid_threads.append(VideoHandlerThread(v, fps_limit=fps, request_size=size))
vid_threads.append(FrameUpdater(v, fps_limit=fps, request_size=size))
return vid_threads

View File

@ -0,0 +1,9 @@
from displayarray.effects import crop
from displayarray import display
import numpy as np
# Scroll the mouse wheel and press ctrl, alt, or shift to select which channels are displayed as red, green, or blue.
arr = np.ones((250, 250, 250))
for x in range(250):
arr[..., x] = x / 250.0
display(arr).block()

View File

@ -1,9 +1,9 @@
from displayarray import display
import numpy as np
arr = np.random.normal(0.5, 0.1, (100, 100, 3))
arr = np.random.normal(0.5, 0.1, (100, 100, 5))
with display(arr) as displayer:
while displayer:
arr[:] += np.random.normal(0.001, 0.0005, (100, 100, 3))
arr[:] += np.random.normal(0.001, 0.0005, (100, 100, 5))
arr %= 1.0

View File

@ -39,3 +39,9 @@ while displayer:
autoencoder.fit(grab_noise, grab, steps_per_epoch=1, epochs=1)
output_image = autoencoder.predict(grab, steps=1)
displayer.update((output_image[0] * 255.0).astype(np.uint8), "uid for autoencoder output")
get_3rd_layer_output = tf.keras.backend.function([autoencoder.layers[0].input],
[autoencoder.layers[1].output])
layer_output = get_3rd_layer_output([grab_noise])[0]
displayer.update(layer_output[0], "conv 1")

View File

@ -13,7 +13,7 @@ class TestFrameHandler(ut.TestCase):
print(frame.shape)
self.i += 1
w.VideoHandlerThread(
w.FrameUpdater(
0,
[test_frame_handler],
request_size=(1280, 720),

View File

@ -21,21 +21,21 @@ class TestSubWin(ut.TestCase):
def print_key_thread(key_chr):
print("key pressed: " + str(key_chr))
w.VideoHandlerThread().display()
w.FrameUpdater().display()
def test_sub(self):
w.VideoHandlerThread().display()
w.FrameUpdater().display()
def test_image(self):
img = np.random.uniform(0, 1, (300, 300, 3))
w.VideoHandlerThread(video_source=img).display()
w.FrameUpdater(video_source=img).display()
def test_image_args(self):
img = np.random.uniform(0, 1, (30, 30, 3))
w.VideoHandlerThread(video_source=img, request_size=(300, -1)).display()
w.FrameUpdater(video_source=img, request_size=(300, -1)).display()
def test_sub_with_args(self):
video_thread = w.VideoHandlerThread(
video_thread = w.FrameUpdater(
video_source=0, request_size=(800, 600), high_speed=False, fps_limit=8
)
@ -46,7 +46,7 @@ class TestSubWin(ut.TestCase):
frame[:, :, 0] = 0
frame[:, :, 2] = 0
w.VideoHandlerThread(callbacks=redden_frame_print_spam).display()
w.FrameUpdater(callbacks=redden_frame_print_spam).display()
def test_sub_with_callback_exception(self):
def redden_frame_print_spam(frame):
@ -54,7 +54,7 @@ class TestSubWin(ut.TestCase):
frame[:, :, 2] = 1 / 0
with self.assertRaises(ZeroDivisionError) as e:
v = w.VideoHandlerThread(callbacks=redden_frame_print_spam)
v = w.FrameUpdater(callbacks=redden_frame_print_spam)
v.display()
self.assertEqual(v.exception_raised, e)
@ -76,7 +76,7 @@ class TestSubWin(ut.TestCase):
frame = np.asarray([[[[[[frame + 1 / 0]]]]], [[[[[frame]]], [[[frame]]]]]])
return frame
v = w.VideoHandlerThread(callbacks=[nest_frame])
v = w.FrameUpdater(callbacks=[nest_frame])
v.start()
with self.assertRaises(ZeroDivisionError) as e:
@ -88,7 +88,7 @@ class TestSubWin(ut.TestCase):
v.join()
def test_conway_life(self):
from displayarray.frame import VideoHandlerThread
from displayarray.frame import FrameUpdater
from displayarray.callbacks import function_display_callback
import numpy as np
import cv2
@ -131,15 +131,15 @@ class TestSubWin(ut.TestCase):
:,
] = 1.0
VideoHandlerThread(
FrameUpdater(
video_source=img, callbacks=function_display_callback(conway)
).display()
def test_double_win(self):
vid1 = np.ones((100, 100))
vid2 = np.zeros((100, 100))
t1 = w.VideoHandlerThread(vid1)
t2 = w.VideoHandlerThread(vid2)
t1 = w.FrameUpdater(vid1)
t2 = w.FrameUpdater(vid2)
t1.start()
t2.start()
SubscriberWindows(

View File

@ -1,5 +1,120 @@
from displayarray.frame.frame_publishing import pub_cam_loop, pub_cam_thread
import displayarray
import mock
import pytest
import cv2
from displayarray.frame.np_to_opencv import NpCam
import numpy as np
import displayarray.frame.subscriber_dictionary as subd
import displayarray.frame.frame_publishing as fpub
def test_pub_cam_loop():
pub_cam_loop()
def test_pub_cam_loop_exit():
not_a_camera = mock.MagicMock()
with pytest.raises(TypeError):
pub_cam_loop(not_a_camera)
def test_pub_cam_int():
img = np.zeros((30, 40))
with mock.patch.object(cv2, "VideoCapture", new_callable=mock.MagicMock) as mock_cv_capture:
cap = NpCam(img)
mock_cv_capture.return_value = cap
reg_cam = displayarray.frame.frame_publishing.subscriber_dictionary.register_cam = mock.MagicMock()
cam_cmd_sub = displayarray.frame.frame_publishing.subscriber_dictionary.cam_cmd_sub = mock.MagicMock()
mock_sub = cam_cmd_sub.return_value = mock.MagicMock()
mock_sub.get = mock.MagicMock()
mock_sub.get.side_effect = ["", "", "", "quit"]
mock_sub.release = mock.MagicMock()
cap.set = mock.MagicMock()
cap.get = mock.MagicMock()
cap.get.return_value = 2
cap.release = mock.MagicMock()
cam_0 = subd.CV_CAMS_DICT['0'] = subd.Cam('0')
cam_pub = cam_0.frame_pub.publish = mock.MagicMock()
pub_cam_loop(0, high_speed=False)
cam_pub.assert_has_calls(
[mock.call(img)] * 4
)
reg_cam.assert_called_once_with('0')
cam_cmd_sub.assert_called_once_with('0')
cap.set.assert_has_calls(
[mock.call(cv2.CAP_PROP_FRAME_WIDTH, 1280),
mock.call(cv2.CAP_PROP_FRAME_HEIGHT, 720)]
)
cap.get.assert_has_calls(
[mock.call(cv2.CAP_PROP_FRAME_COUNT)] * 8
)
mock_sub.get.assert_has_calls([mock.call(), mock.call(), mock.call(), mock.call()])
mock_sub.release.assert_called_once()
cap.release.assert_called_once()
def test_pub_cam_fail():
img = np.zeros((30, 40))
with mock.patch.object(cv2, "VideoCapture", new_callable=mock.MagicMock) as mock_cv_capture:
cap = NpCam(img)
mock_cv_capture.side_effect = [cap]
cap.isOpened = mock.MagicMock()
cap.isOpened.return_value = False
subd.register_cam = mock.MagicMock()
subd.CV_CAMS_DICT['0'] = subd.Cam('0')
mock_fail_pub = \
subd.CV_CAMS_DICT['0'].status_pub.publish = \
mock.MagicMock()
pub_cam_loop(0, high_speed=False)
mock_fail_pub.assert_called_once_with("failed")
def test_pub_cam_high_speed():
img = np.zeros((30, 40))
with mock.patch.object(cv2, "VideoCapture", new_callable=mock.MagicMock) as mock_cv_capture:
cap = NpCam(img)
mock_cv_capture.side_effect = [cap]
cap.isOpened = mock.MagicMock()
cap.isOpened.return_value = False
cap.set = mock.MagicMock()
pub_cam_loop(0, request_size=(640, 480), high_speed=True)
cap.set.assert_has_calls(
[mock.call(cv2.CAP_PROP_FOURCC, cv2.CAP_OPENCV_MJPEG),
mock.call(cv2.CAP_PROP_FRAME_WIDTH, 640),
mock.call(cv2.CAP_PROP_FRAME_HEIGHT, 480)]
)
def test_pub_cam_numpy():
with mock.patch("displayarray.frame.frame_publishing.uid_for_source", new_callable=mock.MagicMock) as mock_uidfs:
img = np.zeros((30, 40))
NpCam.read = mock.MagicMock()
NpCam.read.side_effect = [(True, img), (True, img), (True, img), (False, None)]
subd.register_cam = mock.MagicMock()
mock_uidfs.return_value = '0'
cam_0 = subd.CV_CAMS_DICT['0'] = subd.Cam('0')
cam_pub = cam_0.frame_pub.publish = mock.MagicMock()
pub_cam_loop(img)
cam_pub.assert_has_calls(
[mock.call(img)] * 3
)
def test_pub_cam_thread():
with mock.patch("displayarray.frame.frame_publishing.threading.Thread", new_callable=mock.MagicMock) as mock_thread:
thread_instance = mock_thread.return_value = mock.MagicMock()
pub_cam_thread(5)
mock_thread.assert_called_once_with(target=fpub.pub_cam_loop, args=(5, (1280, 720), True, 240))
thread_instance.start.assert_called_once()

View File

@ -0,0 +1,123 @@
import displayarray.frame.frame_updater as fup
import numpy as np
import mock
import pytest
import itertools
from displayarray.effects.select_channels import SelectChannels
def test_init_defaults():
ud = fup.FrameUpdater()
assert ud.video_source == 0
assert ud.cam_id == "0"
assert ud.callbacks == []
assert ud.request_size == (-1, -1)
assert ud.high_speed == True
assert ud.fps_limit == 240
def test_init():
cb = lambda x: np.zeros((1, 1))
ud = fup.FrameUpdater("test", cb, (2, 2), False, 30)
assert ud.video_source == "test"
assert ud.cam_id == "test"
assert ud.callbacks == [cb]
assert ud.request_size == (2, 2)
assert ud.high_speed == False
assert ud.fps_limit == 30
def test_loop():
with mock.patch("displayarray.frame.frame_updater.pub_cam_thread") as mock_pubcam_thread, \
mock.patch("displayarray.frame.frame_updater.subscriber_dictionary.CV_CAMS_DICT") as mock_cam_dict, \
mock.patch("displayarray.frame.frame_updater.subscriber_dictionary.cam_frame_sub") as mock_frame_sub, \
mock.patch("displayarray.frame.frame_updater.subscriber_dictionary.handler_cmd_sub") as handler_cmd_sub, \
mock.patch("displayarray.frame.frame_updater.global_cv_display_callback") as mock_global_cb:
mock_cbs = [mock.MagicMock(), mock.MagicMock()]
ud = fup.FrameUpdater(0, callbacks=mock_cbs)
pub_t = mock_pubcam_thread.return_value = mock.MagicMock()
mock_cam_dict.__contains__.side_effect = itertools.cycle([False, False, True])
sub_cam = mock_frame_sub.return_value = mock.MagicMock()
sub_cam.get = mock.MagicMock()
frame = sub_cam.get.return_value = mock.MagicMock()
transformed_frame = mock.MagicMock()
mock_cbs[0].return_value = transformed_frame
mock_cbs[1].return_value = transformed_frame
transformed_frame.shape = [1, 2, 3]
mock_sub_owner = handler_cmd_sub.return_value = mock.MagicMock()
mock_sub_owner.get.side_effect = ["", "", "", "quit"]
ud.loop()
mock_pubcam_thread.assert_called_once_with(0, (-1, -1), True, 240)
mock_frame_sub.assert_called_once_with("0")
handler_cmd_sub.assert_called_once_with("0")
sub_cam.get.assert_has_calls([mock.call(blocking=True, timeout=1.0)] * 3)
mock_cbs[0].assert_has_calls([mock.call(frame)] * 4)
mock_cbs[1].assert_has_calls([mock.call(transformed_frame)] * 4)
mock_global_cb.assert_has_calls([mock.call(transformed_frame, '0')] * 4)
mock_sub_owner.release.assert_called_once()
sub_cam.release.assert_called_once()
pub_t.join.assert_called_once()
def test_callback_exception():
def redden_frame_print_spam(frame):
frame[:, :, 0] = 0
frame[:, :, 2] = 1 / 0
with pytest.raises(ZeroDivisionError) as e:
v = fup.FrameUpdater(np.zeros((1, 1, 3)), callbacks=redden_frame_print_spam)
v.loop()
assert e.errisinstance(ZeroDivisionError)
def test_display():
with mock.patch("displayarray.window.SubscriberWindows", new_callable=mock.MagicMock) as mock_sub_win:
f = fup.FrameUpdater()
f.start = mock.MagicMock()
f.join = mock.MagicMock()
mock_sub_win_instance = mock_sub_win.return_value = mock.MagicMock()
f.display()
mock_sub_win.assert_called_once_with(video_sources=["0"], callbacks=[])
mock_sub_win_instance.loop.assert_called_once()
f.start.assert_called_once()
f.join.assert_called_once()
def test_display_exception():
with mock.patch("displayarray.window.SubscriberWindows", new_callable=mock.MagicMock) as mock_sub_win:
def redden_frame_print_spam(frame):
frame[:, :, 0] = 0
frame[:, :, 2] = 1 / 0
with pytest.raises(ZeroDivisionError) as e:
v = fup.FrameUpdater(np.zeros((1, 1, 3)), callbacks=redden_frame_print_spam)
v.display()
assert e.errisinstance(ZeroDivisionError)
from displayarray.window.window_commands import win_cmd_pub
def test_display_many_channels():
with mock.patch("displayarray.frame.frame_updater.pub_cam_thread"), \
mock.patch("displayarray.frame.frame_updater.subscriber_dictionary.CV_CAMS_DICT") as mock_cam_dict, \
mock.patch("displayarray.frame.frame_updater.subscriber_dictionary.handler_cmd_sub") as handler_cmd_sub:
mock_cam_dict.__contains__.side_effect = itertools.cycle([False, False, True])
mock_sub_owner = handler_cmd_sub.return_value = mock.MagicMock()
mock_sub_owner.get.side_effect = ["", "", "", "quit"]
arr = np.ones((20, 20, 20))
f = fup.FrameUpdater(arr)
f.loop()
assert isinstance(f.callbacks[0], SelectChannels)
win_cmd_pub.publish("quit")

View File

@ -0,0 +1,14 @@
import displayarray.frame.get_frame_ids as gfi
import mock
import cv2
def test_get_cam_ids():
with mock.patch.object(cv2, "VideoCapture", new_callable=mock.MagicMock) as mock_cv_capture:
cap = mock.MagicMock()
cap.isOpened.return_value = True
cap_end = mock.MagicMock()
cap_end.isOpened.return_value = False
mock_cv_capture.side_effect = [cap, cap, cap, cap_end]
ids = gfi.get_cam_ids()
assert ids == [0, 1, 2]