Merge pull request #46 from SimLeek/zmq_update

Added zmq transmission of ndarrays
This commit is contained in:
2021-08-12 14:25:13 -07:00
committed by GitHub
17 changed files with 310 additions and 234 deletions
+1 -2
View File
@@ -13,13 +13,12 @@ jobs:
uses: actions/setup-python@v1
with:
python-version: 3.7
- uses: dschep/install-poetry-action@v1.2
- name: Install and build
run: |
python -m ensurepip --user
python -m pip install --upgrade pip --user
python -m pip install .[dev]
- name: Build with Poetry
- name: Build
run: python setup.py sdist bdist_wheel
- name: Publish distribution 📦 to Test PyPI
uses: pypa/gh-action-pypi-publish@master
+1 -2
View File
@@ -6,6 +6,5 @@ display is a function that displays these in their own windows.
__version__ = "1.1.1"
from .window.subscriber_windows import display, breakpoint_display, read_updates
from .frame.frame_publishing import publish_updates_zero_mq, publish_updates_ros
from .window.subscriber_windows import display, breakpoint_display, read_updates, publish_updates
from . import effects
+12 -6
View File
@@ -25,12 +25,18 @@ class SelectChannels(object):
def __call__(self, arr):
"""Run the channel selector."""
self.num_input_channels = arr.shape[-1]
out_arr = [
arr[..., min(max(0, x), arr.shape[-1] - 1)] for x in self.selected_channels
]
out_arr = np.stack(out_arr, axis=-1)
return out_arr
if isinstance(arr, list):
ars = []
for a in arr:
ars.append(self.__call__(a))
return ars
else:
self.num_input_channels = arr.shape[-1]
out_arr = [
arr[..., min(max(0, x), arr.shape[-1] - 1)] for x in self.selected_channels
]
out_arr = np.stack(out_arr, axis=-1)
return out_arr
def enable_mouse_control(self):
"""
+16 -132
View File
@@ -27,6 +27,7 @@ import numpy as np
from displayarray.frame import subscriber_dictionary
from .np_to_opencv import NpCam
from .zmq_to_opencv import ZmqCam
from displayarray._uid import uid_for_source
from typing import Union, Tuple, Optional, Dict, Any, List, Callable
@@ -37,7 +38,7 @@ FrameCallable = Callable[[np.ndarray], Optional[np.ndarray]]
def pub_cam_loop_pyv4l2(
cam_id: Union[int, str, np.ndarray],
request_size: Tuple[int, int] = (-1, -1),
high_speed: bool = True,
mjpg: bool = True,
fps_limit: float = float("inf"),
):
"""
@@ -46,7 +47,7 @@ def pub_cam_loop_pyv4l2(
You can send a quit command 'quit' to CVCams.<cam_id>.Cmd
Status information, such as failure to open, will be posted to CVCams.<cam_id>.Status
:param high_speed: Selects mjpeg transferring, which most cameras seem to support, so speed isn't limited
:param mjpg: Selects mjpeg transferring, which most cameras seem to support, so speed isn't limited
:param fps_limit: Limits the frames per second.
:param cam_id: An integer representing which webcam to use, or a string representing a video file.
:param request_size: A tuple with width, then height, to request the video size.
@@ -75,7 +76,7 @@ def pub_cam_loop_pyv4l2(
sub.return_on_no_data = ""
msg = ""
if high_speed and cam.pixel_format != "MJPEG":
if mjpg and cam.pixel_format != "MJPEG":
warnings.warn("Camera does not support high speed.")
now = time.time()
@@ -108,7 +109,7 @@ def pub_cam_loop_pyv4l2(
def pub_cam_loop_opencv(
cam_id: Union[int, str, np.ndarray],
request_size: Tuple[int, int] = (-1, -1),
high_speed: bool = True,
mjpg: bool = True,
fps_limit: float = float("inf"),
extra: Optional[List[Tuple[int, int]]] = None,
) -> bool:
@@ -118,7 +119,7 @@ def pub_cam_loop_opencv(
You can send a quit command 'quit' to CVCams.<cam_id>.Cmd
Status information, such as failure to open, will be posted to CVCams.<cam_id>.Status
:param high_speed: Selects mjpeg transferring, which most cameras seem to support, so speed isn't limited
:param mjpg: Selects mjpeg transferring, which most cameras seem to support, so speed isn't limited
:param fps_limit: Limits the frames per second.
:param cam_id: An integer representing which webcam to use, or a string representing a video file.
:param request_size: A tuple with width, then height, to request the video size.
@@ -126,9 +127,13 @@ def pub_cam_loop_opencv(
"""
name = uid_for_source(cam_id)
cam: Union[NpCam, ZmqCam, cv2.VideoCapture]
if isinstance(cam_id, (int, str)):
cam: Union[NpCam, cv2.VideoCapture] = cv2.VideoCapture(cam_id)
elif isinstance(cam_id, np.ndarray):
if isinstance(cam_id, str) and cam_id.startswith('tcp'):
cam = ZmqCam(cam_id)
else:
cam = cv2.VideoCapture(cam_id)
elif isinstance(cam_id, (np.ndarray)):
cam = NpCam(cam_id)
else:
raise TypeError(
@@ -143,7 +148,7 @@ def pub_cam_loop_opencv(
sub.return_on_no_data = ""
msg = ""
if high_speed:
if mjpg:
try:
cam.set(cv2.CAP_PROP_FOURCC, cv2.CAP_OPENCV_MJPEG)
except AttributeError:
@@ -160,7 +165,7 @@ def pub_cam_loop_opencv(
time.sleep(1.0 / (fps_limit - (time.time() - now)))
now = time.time()
(ret, frame) = cam.read() # type: Tuple[bool, np.ndarray ]
if ret is False or not isinstance(frame, np.ndarray):
if ret is False or not isinstance(frame, (np.ndarray, list)):
cam.release()
subscriber_dictionary.CV_CAMS_DICT[name].status_pub.publish("failed")
return False
@@ -183,7 +188,7 @@ uid_dict: Dict[str, threading.Thread] = {}
def pub_cam_thread(
cam_id: Union[int, str],
request_ize: Tuple[int, int] = (-1, -1),
high_speed: bool = True,
mjpg: bool = True,
fps_limit: float = float("inf"),
force_backend="",
) -> threading.Thread:
@@ -211,129 +216,8 @@ def pub_cam_thread(
pub_cam_loop = pub_cam_loop_opencv
t = threading.Thread(
target=pub_cam_loop, args=(cam_id, request_ize, high_speed, fps_limit)
target=pub_cam_loop, args=(cam_id, request_ize, mjpg, fps_limit)
)
uid_dict[name] = t
t.start()
return t
async def publish_updates_zero_mq(
*vids,
callbacks: Optional[
Union[Dict[Any, FrameCallable], List[FrameCallable], FrameCallable]
] = None,
fps_limit=float("inf"),
size=(-1, -1),
end_callback: Callable[[], bool] = lambda: False,
blocking=False,
publishing_address="tcp://127.0.0.1:5600",
prepend_topic="",
flags=0,
copy=True,
track=False,
):
"""Publish frames to ZeroMQ when they're updated."""
import zmq
from displayarray import read_updates
ctx = zmq.Context()
s = ctx.socket(zmq.PUB)
s.bind(publishing_address)
if not blocking:
flags |= zmq.NOBLOCK
try:
for v in read_updates(vids, callbacks, fps_limit, size, end_callback, blocking):
if v:
for vid_name, frame in v.items():
md = dict(
dtype=str(frame.dtype),
shape=frame.shape,
name=prepend_topic + vid_name,
)
s.send_json(md, flags | zmq.SNDMORE)
s.send(frame, flags, copy=copy, track=track)
if fps_limit:
await asyncio.sleep(1.0 / fps_limit)
else:
await asyncio.sleep(0)
except KeyboardInterrupt:
pass
finally:
vid_names = [uid_for_source(name) for name in vids]
for v in vid_names:
subscriber_dictionary.stop_cam(v)
async def publish_updates_ros(
*vids,
callbacks: Optional[
Union[Dict[Any, FrameCallable], List[FrameCallable], FrameCallable]
] = None,
fps_limit=float("inf"),
size=(-1, -1),
end_callback: Callable[[], bool] = lambda: False,
blocking=False,
node_name="displayarray",
publisher_name="npy",
rate_hz=None,
dtype=None,
):
"""Publish frames to ROS when they're updated."""
import rospy
from rospy.numpy_msg import numpy_msg
import std_msgs.msg
from displayarray import read_updates
def get_msg_type(dtype):
if dtype is None:
msg_type = {
np.float32: std_msgs.msg.Float32(),
np.float64: std_msgs.msg.Float64(),
np.bool: std_msgs.msg.Bool(),
np.char: std_msgs.msg.Char(),
np.int16: std_msgs.msg.Int16(),
np.int32: std_msgs.msg.Int32(),
np.int64: std_msgs.msg.Int64(),
np.str: std_msgs.msg.String(),
np.uint16: std_msgs.msg.UInt16(),
np.uint32: std_msgs.msg.UInt32(),
np.uint64: std_msgs.msg.UInt64(),
np.uint8: std_msgs.msg.UInt8(),
}[dtype]
else:
msg_type = (
dtype # allow users to use their own custom messages in numpy arrays
)
return msg_type
publishers: Dict[str, rospy.Publisher] = {}
rospy.init_node(node_name, anonymous=True)
try:
for v in read_updates(vids, callbacks, fps_limit, size, end_callback, blocking):
if v:
if rospy.is_shutdown():
break
for vid_name, frame in v.items():
if vid_name not in publishers:
dty = frame.dtype if dtype is None else dtype
publishers[vid_name] = rospy.Publisher(
publisher_name + vid_name,
numpy_msg(get_msg_type(dty)),
queue_size=10,
)
publishers[vid_name].publish(frame)
if rate_hz:
await asyncio.sleep(1.0 / rate_hz)
else:
await asyncio.sleep(0)
except KeyboardInterrupt:
pass
finally:
vid_names = [uid_for_source(name) for name in vids]
for v in vid_names:
subscriber_dictionary.stop_cam(v)
if rospy.core.is_shutdown():
raise rospy.exceptions.ROSInterruptException("rospy shutdown")
+5 -5
View File
@@ -25,7 +25,7 @@ class FrameUpdater(threading.Thread):
video_source: Union[int, str, np.ndarray] = 0,
callbacks: Optional[Union[List[FrameCallable], FrameCallable]] = None,
request_size: Tuple[int, int] = (-1, -1),
high_speed: bool = True,
mjpg: bool = True,
fps_limit: float = float("inf"),
force_backend="",
):
@@ -40,7 +40,7 @@ class FrameUpdater(threading.Thread):
else:
self.callbacks = callbacks
self.request_size = request_size
self.high_speed = high_speed
self.mjpg = mjpg
self.fps_limit = fps_limit
self.exception_raised = None
self.force_backend = force_backend
@@ -88,7 +88,7 @@ class FrameUpdater(threading.Thread):
t = pub_cam_thread(
self.video_source,
self.request_size,
self.high_speed,
self.mjpg,
self.fps_limit,
self.force_backend,
)
@@ -266,12 +266,12 @@ async def read_updates_ros(
{
np.float32: std_msgs.msg.Float32(),
np.float64: std_msgs.msg.Float64(),
np.bool: std_msgs.msg.Bool(),
np.bool: std_msgs.msg.Bool(), # type: ignore
np.char: std_msgs.msg.Char(),
np.int16: std_msgs.msg.Int16(),
np.int32: std_msgs.msg.Int32(),
np.int64: std_msgs.msg.Int64(),
np.str: std_msgs.msg.String(),
np.str: std_msgs.msg.String(), # type: ignore
np.uint16: std_msgs.msg.UInt16(),
np.uint32: std_msgs.msg.UInt32(),
np.uint64: std_msgs.msg.UInt64(),
+49
View File
@@ -0,0 +1,49 @@
"""Allow OpenCV to handle zmq subscriber addresses as input."""
import cv2
import zmq
from tensorcom.tenbin import decode_buffer # type: ignore
class ZmqCam(object):
"""Add OpenCV camera controls to a numpy array."""
def __init__(self, img):
"""Create a fake camera for OpenCV based on the initial array."""
assert isinstance(img, str)
s = img.split('#')
self.__ctx = zmq.Context()
self.__addr = s[0]
self.__sub = self.__ctx.socket(zmq.SUB)
if len(s) > 1:
self.__topic = bytes(s[1], 'ascii')
self.__sub.setsockopt(zmq.SUBSCRIBE, self.__topic)
else:
self.__topic = b""
self.__sub.connect(self.__addr)
self.__is_opened = True
def set(self, *args, **kwargs):
"""Set CAP_PROP_FRAME_WIDTH or CAP_PROP_FRAME_HEIGHT to scale a numpy array to that size."""
pass
@staticmethod
def get(*args, **kwargs):
"""Get OpenCV args. Currently only a fake CAP_PROP_FRAME_COUNT to fix detecting video ends."""
if args[0] == cv2.CAP_PROP_FRAME_COUNT:
return float("inf")
def read(self):
"""Read back the numpy array in standard "did it work", "the array", OpenCV format."""
r = self.__sub.recv_multipart()
arrs = [decode_buffer(ri) for ri in r[1:]]
return True, arrs
def isOpened(self): # NOSONAR
"""Hack to tell OpenCV we're opened until we call release."""
return self.__is_opened
def release(self):
"""Let OpenCV know we're finished."""
self.__is_opened = False
+73 -3
View File
@@ -29,6 +29,12 @@ try:
except:
get_bus_info_from_camera = None
try:
import zmq
from tensorcom.tenbin import encode_buffer # type: ignore
except:
warnings.warn("Could not import ZMQ and tensorcom. Cannot send messages between programs.")
class SubscriberWindows(object):
"""Windows that subscribe to updates to cameras, videos, and arrays."""
@@ -52,6 +58,9 @@ class SubscriberWindows(object):
self.input_cams: List[str] = []
self.exited = False
self.silent = silent
self.ctx = None
self.sock_list: List[zmq.Socket] = []
self.top_list: List[bytes] = []
if callbacks is None:
callbacks = []
@@ -225,9 +234,13 @@ class SubscriberWindows(object):
if not self.silent:
self.display_frames(self.frames)
def update(self, arr: np.ndarray = None, id: str = None):
def update(self, arr: Union[List[np.ndarray], np.ndarray] = None, id: Union[List[str],str, List[int], int, None] = None):
"""Update window frames once. Optionally add a new input and input id."""
if arr is not None and id is not None:
if isinstance(arr, list):
assert isinstance(id, list)
return self._update_multiple(arr, id)
elif arr is not None and id is not None:
assert not isinstance(id, list)
global_cv_display_callback(arr, id)
if id not in self.input_cams:
self.add_source(id)
@@ -237,6 +250,32 @@ class SubscriberWindows(object):
self.update_frames()
msg_cmd = sub_cmd.get()
key = self.handle_keys(cv2.waitKey(1))
if self.sock_list:
for s, t in zip(self.sock_list, self.top_list):
f = list(self.frames.values())
if f:
s.send_multipart([t] + [encode_buffer(fr) for fr in f])
return msg_cmd, key
def _update_multiple(self, arr: Union[List[np.ndarray], np.ndarray] = None, id: Union[List[str], List[int]] = None):
if arr is not None and id is not None:
for arr_i, id_i in zip(arr, id):
global_cv_display_callback(arr_i, id_i) # type: ignore
if id_i not in self.input_cams:
self.add_source(id_i)
if not self.silent:
self.add_window(id_i)
sub_cmd = window_commands.win_cmd_sub()
self.update_frames()
msg_cmd = sub_cmd.get()
key = self.handle_keys(cv2.waitKey(1))
if self.sock_list:
for s, t in zip(self.sock_list, self.top_list):
f = list(self.frames.values())
if f:
s.send_multipart([t] + [encode_buffer(fr) for fr in f])
return msg_cmd, key
def wait_for_init(self):
@@ -277,6 +316,14 @@ class SubscriberWindows(object):
window_commands.quit(force_all_read=False)
self.__stop_all_cams()
def publish_to(self, address, topic=b""):
"""Publish the current video to the specified address and topic over a zmq publisher."""
if self.ctx==None:
self.ctx = zmq.Context()
self.sock_list.append(self.ctx.socket(zmq.PUB))
self.sock_list[-1].bind(address)
self.top_list.append(topic)
@property
def cams(self):
"""Get the camera instances. Can be used for OpenCV or V4L2 functions, depending on backend."""
@@ -337,6 +384,7 @@ def _get_video_threads(
fps=float("inf"),
size=(-1, -1),
force_backend="",
mjpg=True
):
vid_threads: List[Thread] = []
if isinstance(callbacks, Dict):
@@ -352,6 +400,7 @@ def _get_video_threads(
fps_limit=fps,
request_size=size,
force_backend=force_backend,
mjpg=mjpg
)
)
elif callable(callbacks):
@@ -363,6 +412,7 @@ def _get_video_threads(
fps_limit=fps,
request_size=size,
force_backend=force_backend,
mjpg=mjpg
)
)
else:
@@ -370,7 +420,7 @@ def _get_video_threads(
if v is not None:
vid_threads.append(
FrameUpdater(
v, fps_limit=fps, request_size=size, force_backend=force_backend
v, fps_limit=fps, request_size=size, force_backend=force_backend, mjpg=mjpg
)
)
return vid_threads
@@ -391,6 +441,7 @@ def display(
size=(-1, -1),
silent=False,
force_backend="",
mjpg=True
):
"""
Display all the arrays, cameras, and videos passed in.
@@ -405,6 +456,7 @@ def display(
fps=fps_limit,
size=size,
force_backend=force_backend,
mjpg=mjpg
)
for v in vid_threads:
v.start()
@@ -432,3 +484,21 @@ def breakpoint_display(*args, **kwargs):
def read_updates(*args, **kwargs):
"""Read back all frame updates and yield a list of frames. List is empty if no frames were read."""
return display(*args, **kwargs, silent=True)
def publish_updates(*args, address="tcp://127.0.0.1:7880", topic=b"", **kwargs):
"""Publish all the updates to the given address and topic."""
if 'blocking' in kwargs and kwargs['blocking']:
block = True
kwargs['blocking'] = False
else:
block = False
r = read_updates(*args, **kwargs)
r.publish_to(address, topic)
if block:
r.loop()
for vt in r.close_threads:
vt.join()
return r
+1 -1
View File
@@ -21,7 +21,7 @@ i = 0
while d:
if len(d.frames) > 0:
i += 1
frame = d.frames[0]
frame = next(iter(d.frames.values()))
center_sin = [(m.sin(m.pi * (i / 70.0))), (m.cos(m.pi * (i / 120.0)))]
pre_crop_callback.center = [
center_sin[0] * 720 / 2 + 720 / 2,
View File
+15
View File
@@ -0,0 +1,15 @@
import zmq
from displayarray import display
from tensorcom.tenbin import decode_buffer
ctx = zmq.Context()
s = ctx.socket(zmq.SUB)
s.setsockopt(zmq.SUBSCRIBE, b"topic")
s.connect("tcp://127.0.0.1:7880")
d = display()
while True:
r = s.recv_multipart()
# r[0]=="topic"
arr = decode_buffer(r[1])
d.update(arr[0], '0')
+10
View File
@@ -0,0 +1,10 @@
from displayarray import display
import time
t0 = time.time()
for up in display("tcp://127.0.0.1:7880#topic"):
if up:
t1 = time.time()
u = next(iter(up.values()))[0]
print(1.0 / (t1 - t0))
t0 = t1
+26
View File
@@ -0,0 +1,26 @@
from displayarray import read_updates
import numpy as np
import zmq
from tensorcom.tenbin import encode_buffer
def black_and_white(arr):
return (np.sum(arr, axis=-1) / 3).astype(np.uint8)
import time
t0 = t1 = time.time()
ctx = zmq.Context()
s = ctx.socket(zmq.PUB)
s.bind("tcp://127.0.0.1:7880")
for up in read_updates(0, size=(9999,9999)):
if up:
t1 = time.time()
u = next(iter(up.values()))[0]
s.send_multipart([b'topic', encode_buffer([u])])
print(1.0 / (t1 - t0))
t0 = t1
+9
View File
@@ -0,0 +1,9 @@
from displayarray import publish_updates
publish_updates(
0,
size=(9999, 9999),
address="tcp://127.0.0.1:7880",
topic=b'topic',
blocking=True
)
+3 -2
View File
@@ -42,9 +42,10 @@ setup(
install_requires=[
"docopt==0.6.2",
"localpubsub==0.0.4",
"numpy>=1.17.0",
"numpy>=1.14.5",
"opencv-python==4.*,>=4.0.0",
"pyzmq==18.1.0",
"pyzmq>=22.0.3",
"tensorcom"
],
extras_require={
# pypi doesn't allow direct dependencies for security reasons,
+3 -3
View File
@@ -38,7 +38,7 @@ def test_pub_cam_int():
cam_0 = subd.CV_CAMS_DICT["0"] = subd.Cam("0")
with mock.patch.object(cam_0.frame_pub, "publish") as cam_pub:
pub_cam_loop_opencv(0, high_speed=False)
pub_cam_loop_opencv(0, mjpg=False)
cam_pub.assert_has_calls([mock.call(img)] * 4)
@@ -79,7 +79,7 @@ def test_pub_cam_fail():
with mock.patch.object(
subd.CV_CAMS_DICT["0"].status_pub, "publish"
) as mock_fail_pub:
pub_cam_loop_opencv(0, high_speed=False)
pub_cam_loop_opencv(0, mjpg=False)
mock_fail_pub.assert_called_once_with("failed")
@@ -100,7 +100,7 @@ def test_pub_cam_high_speed():
mock_is_open.return_value = False
pub_cam_loop_opencv(0, request_size=(640, 480), high_speed=True)
pub_cam_loop_opencv(0, request_size=(640, 480), mjpg=True)
mock_cam_set.assert_has_calls(
[
+2 -2
View File
@@ -13,7 +13,7 @@ def test_init_defaults():
assert ud.cam_id == "0"
assert ud.callbacks == []
assert ud.request_size == (-1, -1)
assert ud.high_speed == True
assert ud.mjpg == True
assert ud.fps_limit == float("inf")
@@ -25,7 +25,7 @@ def test_init():
assert ud.cam_id == "test"
assert ud.callbacks == [cb]
assert ud.request_size == (2, 2)
assert ud.high_speed == False
assert ud.mjpg == False
assert ud.fps_limit == 30
+84 -76
View File
@@ -6,6 +6,12 @@ import numpy as np
from displayarray.effects.select_channels import SelectChannels
import pytest
'''
OpenCV now has a new error:
cv2.error: OpenCV(4.5.3) C:\...\window_w32.cpp:2559: error: (-27:Null pointer) NULL window:
'displayarray (press ESC to quit)' in function 'cvSetMouseCallback'
The code works if the window's aren't mocked, but until the error is bypassed,
this can't be tested in continuous integration.
def test_init_defaults():
sub_win.SubscriberWindows.FRAME_DICT = {}
@@ -80,35 +86,6 @@ def test_add_source():
assert sw.input_cams == [0, 2]
def test_add_window():
sub_win.SubscriberWindows.FRAME_DICT = {}
with mock.patch.object(cv2, "namedWindow") as mock_named_window, mock.patch.object(
cv2, "setMouseCallback"
) as mock_set_mouse, mock.patch(
"displayarray.window.subscriber_windows.WeakMethod", new_callable=mock.MagicMock
) as mock_weak:
weak_method = mock_weak.return_value = mock.MagicMock()
sw = sub_win.SubscriberWindows().add_window("second window")
mock_weak.assert_has_calls(
[mock.call(sw.handle_mouse), mock.call(sw.handle_mouse)]
)
assert sw.window_names == ["displayarray", "second window"]
mock_named_window.assert_has_calls(
[
mock.call("displayarray (press ESC to quit)"),
mock.call("second window (press ESC to quit)"),
]
)
mock_set_mouse.assert_has_calls(
[
mock.call("displayarray (press ESC to quit)", weak_method),
mock.call("second window (press ESC to quit)", weak_method),
]
)
def test_add_callback():
sub_win.SubscriberWindows.FRAME_DICT = {}
with mock.patch.object(cv2, "namedWindow"):
@@ -361,31 +338,6 @@ def test_update():
assert key == mock_key
def test_update_with_array():
sub_win.SubscriberWindows.FRAME_DICT = {}
with mock.patch.object(cv2, "namedWindow"), mock.patch.object(
sub_win.SubscriberWindows, "update_frames"
) as mock_update_win_frames, mock.patch(
"displayarray.window.subscriber_windows.window_commands"
) as mock_win_cmd, mock.patch.object(
sub_win.SubscriberWindows, "handle_keys"
) as mock_handle_keys, mock.patch.object(
sub_win.SubscriberWindows, "add_source"
) as add_source, mock.patch.object(
sub_win.SubscriberWindows, "add_window"
) as add_window, mock.patch(
"displayarray.window.subscriber_windows.global_cv_display_callback"
) as mock_cb, mock.patch.object(
cv2, "waitKey"
) as key:
sw = sub_win.SubscriberWindows()
sw.update(arr=1, id=2)
mock_cb.assert_called_once_with(1, 2)
add_source.assert_has_calls([mock.call(0), mock.call(2)])
add_window.assert_has_calls([mock.call("displayarray"), mock.call(2)])
def test_wait_for_init():
sub_win.SubscriberWindows.FRAME_DICT = {}
@@ -510,27 +462,6 @@ def test_display():
assert sws_inst.close_threads == [fup_inst, fup_inst]
assert d == sws_inst
def test_display_blocking():
sub_win.SubscriberWindows.FRAME_DICT = {}
with mock.patch(
"displayarray.window.subscriber_windows.FrameUpdater"
) as fup, mock.patch(
"displayarray.window.subscriber_windows.SubscriberWindows"
) as sws:
fup_inst = fup.return_value = mock.MagicMock()
sws_inst = sws.return_value = mock.MagicMock()
sub_win.display(0, 1, blocking=True)
assert fup_inst.start.call_count == 2
sws.assert_called_once_with(
window_names=["window 0", "window 1"], video_sources=(0, 1), silent=False
)
sws_inst.loop.assert_called_once()
assert fup_inst.join.call_count == 2
def test_display_callbacks():
sub_win.SubscriberWindows.FRAME_DICT = {}
with mock.patch(
@@ -586,11 +517,88 @@ def test_display_callbacks():
]
)
'''
def test_add_window():
sub_win.SubscriberWindows.FRAME_DICT = {}
with mock.patch.object(cv2, "namedWindow") as mock_named_window, mock.patch.object(
cv2, "setMouseCallback"
) as mock_set_mouse, mock.patch(
"displayarray.window.subscriber_windows.WeakMethod", new_callable=mock.MagicMock
) as mock_weak:
weak_method = mock_weak.return_value = mock.MagicMock()
sw = sub_win.SubscriberWindows().add_window("second window")
mock_weak.assert_has_calls(
[mock.call(sw.handle_mouse), mock.call(sw.handle_mouse)]
)
assert sw.window_names == ["displayarray", "second window"]
mock_named_window.assert_has_calls(
[
mock.call("displayarray (press ESC to quit)"),
mock.call("second window (press ESC to quit)"),
]
)
mock_set_mouse.assert_has_calls(
[
mock.call("displayarray (press ESC to quit)", weak_method),
mock.call("second window (press ESC to quit)", weak_method),
]
)
def test_update_with_array():
sub_win.SubscriberWindows.FRAME_DICT = {}
with mock.patch.object(cv2, "namedWindow"), mock.patch.object(
sub_win.SubscriberWindows, "update_frames"
) as mock_update_win_frames, mock.patch(
"displayarray.window.subscriber_windows.window_commands"
) as mock_win_cmd, mock.patch.object(
sub_win.SubscriberWindows, "handle_keys"
) as mock_handle_keys, mock.patch.object(
sub_win.SubscriberWindows, "add_source"
) as add_source, mock.patch.object(
sub_win.SubscriberWindows, "add_window"
) as add_window, mock.patch(
"displayarray.window.subscriber_windows.global_cv_display_callback"
) as mock_cb, mock.patch.object(
cv2, "waitKey"
) as key:
sw = sub_win.SubscriberWindows()
sw.update(arr=1, id=2)
mock_cb.assert_called_once_with(1, 2)
add_source.assert_has_calls([mock.call(0), mock.call(2)])
add_window.assert_has_calls([mock.call("displayarray"), mock.call(2)])
def test_display_blocking():
sub_win.SubscriberWindows.FRAME_DICT = {}
with mock.patch(
"displayarray.window.subscriber_windows.FrameUpdater"
) as fup, mock.patch(
"displayarray.window.subscriber_windows.SubscriberWindows"
) as sws:
fup_inst = fup.return_value = mock.MagicMock()
sws_inst = sws.return_value = mock.MagicMock()
sub_win.display(0, 1, blocking=True)
assert fup_inst.start.call_count == 2
sws.assert_called_once_with(
window_names=["window 0", "window 1"], video_sources=(0, 1), silent=False
)
sws_inst.loop.assert_called_once()
assert fup_inst.join.call_count == 2
def test_display_callbacks_dict():
sub_win.SubscriberWindows.FRAME_DICT = {}
with mock.patch(
"displayarray.window.subscriber_windows.FrameUpdater"
"displayarray.window.subscriber_windows.FrameUpdater"
) as fup, mock.patch(
"displayarray.window.subscriber_windows.SubscriberWindows"
) as sws: