"""Get and handle updated frames."""
import threading
import asyncio
from typing import Union, Tuple, Any, Callable, List, Optional, Dict
import numpy as np
from displayarray.callbacks import global_cv_display_callback
from displayarray._uid import uid_for_source
from displayarray.frame import subscriber_dictionary
from displayarray.frame.frame_publishing import pub_cam_thread
from displayarray.window import window_commands
from displayarray.effects.select_channels import SelectChannels
from localpubsub import NoData
FrameCallable = Callable[[np.ndarray], Optional[np.ndarray]]
[docs]class FrameUpdater(threading.Thread):
"""Thread for updating frames from a video source."""
def __init__(
self,
video_source: Union[int, str, np.ndarray] = 0,
callbacks: Optional[Union[List[FrameCallable], FrameCallable]] = None,
request_size: Tuple[int, int] = (-1, -1),
high_speed: bool = True,
fps_limit: float = float("inf"),
):
"""Create the frame updater thread."""
super(FrameUpdater, self).__init__(target=self.loop, args=())
self.cam_id = uid_for_source(video_source)
self.video_source = video_source
if callbacks is None:
callbacks = []
if callable(callbacks):
self.callbacks = [callbacks]
else:
self.callbacks = callbacks
self.request_size = request_size
self.high_speed = high_speed
self.fps_limit = fps_limit
self.exception_raised = None
def __wait_for_cam_id(self):
while str(self.cam_id) not in subscriber_dictionary.CV_CAMS_DICT:
continue
def __apply_callbacks_to_frame(self, frame):
if frame is not None and not isinstance(frame, NoData):
try:
for c in self.callbacks:
frame_c = c(frame)
if frame_c is not None:
frame = frame_c
if frame.shape[-1] not in [1, 3] and len(frame.shape) != 2:
print(
f"Too many channels in output. (Got {frame.shape[-1]} instead of 1 or 3.) "
f"Frame selection callback added."
)
print(
"Ctrl+scroll to change first channel.\n"
"Shift+scroll to change second channel.\n"
"Alt+scroll to change third channel."
)
sel = SelectChannels()
sel.enable_mouse_control()
sel.mouse_print_channels = True
self.callbacks.append(sel)
frame = self.callbacks[-1](frame)
except Exception as e:
self.exception_raised = e
frame = self.exception_raised
subscriber_dictionary.stop_cam(self.cam_id)
window_commands.quit()
raise e
global_cv_display_callback(frame, self.cam_id)
[docs] def loop(self):
"""Continually get frames from the video publisher, run callbacks on them, and listen to commands."""
t = pub_cam_thread(
self.video_source, self.request_size, self.high_speed, self.fps_limit
)
self.__wait_for_cam_id()
sub_cam = subscriber_dictionary.cam_frame_sub(str(self.cam_id))
sub_owner = subscriber_dictionary.handler_cmd_sub(str(self.cam_id))
msg_owner = sub_owner.return_on_no_data = ""
while msg_owner != "quit":
frame = sub_cam.get(blocking=True, timeout=1.0) # type: np.ndarray
self.__apply_callbacks_to_frame(frame)
msg_owner = sub_owner.get()
sub_owner.release()
sub_cam.release()
subscriber_dictionary.stop_cam(self.cam_id)
t.join()
[docs] def display(self, callbacks: List[Callable[[np.ndarray], Any]] = None):
"""
Start default display operation.
For multiple video sources, please use something outside of this class.
:param callbacks: List of callbacks to be run on frames before displaying to the screen.
"""
from displayarray.window import SubscriberWindows
if callbacks is None:
callbacks = []
self.start()
SubscriberWindows(video_sources=[self.cam_id], callbacks=callbacks).loop()
self.join()
if self.exception_raised is not None:
raise self.exception_raised
'''async def read_updates(
*vids,
callbacks: Optional[
Union[
Dict[Any, Union[FrameCallable, List[FrameCallable]]],
List[FrameCallable],
FrameCallable,
]
] = None,
fps_limit=float("inf"),
size=(-1, -1),
end_callback: Callable[[], bool] = lambda: False,
blocking=True,
):
"""
Read back all updates from the requested videos.
Examp#le usage:
.. co#de-block:: python
>>#> from examples.videos import test_video
>>#> f = 0
>>#> for f, r in enumerate(read_updates(test_video, end_callback=lambda :f==2)):
..#. print(f"Frame:{f}. Array:{r}")
"""
from displayarray.window import SubscriberWindows
from displayarray.window.subscriber_windows import _get_video_threads
vid_names = [uid_for_source(name) for name in vids]
vid_threads = _get_video_threads(
*vids, callbacks=callbacks, fps=fps_limit, size=size
)
for v in vid_threads:
v.start()
while not end_callback():
vid_update_dict = {}
dict_was_updated = False
for i in range(len(vid_names)):
if vid_names[i] in SubscriberWindows.FRAME_DICT and not isinstance(
SubscriberWindows.FRAME_DICT[vid_names[i]], NoData
):
vid_update_dict[vid_names[i]] = SubscriberWindows.FRAME_DICT[
vid_names[i]
]
if (
isinstance(vid_update_dict[vid_names[i]], np.ndarray)
and len(vid_update_dict[vid_names[i]].shape) <= 3
):
vid_update_dict[vid_names[i]] = [vid_update_dict[vid_names[i]]]
dict_was_updated = True
if dict_was_updated or not blocking:
yield vid_update_dict
await asyncio.sleep(0)
for v in vid_names:
subscriber_dictionary.stop_cam(v)
for v in vid_threads:
v.join()'''
async def read_updates_zero_mq(
*topic_names,
address: str = "tcp://127.0.0.1:5600",
flags: int = 0,
copy: bool = True,
track: bool = False,
blocking: bool = False,
end_callback: Callable[[Any], bool] = lambda x: False,
):
"""Read updated frames from ZeroMQ."""
import zmq
ctx = zmq.Context()
s = ctx.socket(zmq.SUB)
s.connect(address)
if not blocking:
flags |= zmq.NOBLOCK
for topic in topic_names:
s.setsockopt(zmq.SUBSCRIBE, topic)
cb_val = False
while not cb_val:
try:
md = s.recv_json(flags=flags)
msg = s.recv(flags=flags, copy=copy, track=track)
buf = memoryview(msg)
arr = np.frombuffer(buf, dtype=md["dtype"])
arr.reshape(md["shape"])
name = md["name"]
cb_val = end_callback(md)
yield name, arr
except zmq.ZMQError as e:
if isinstance(e, zmq.Again):
pass # no messages to receive
else:
raise e
finally:
await asyncio.sleep(0)
async def read_updates_ros(
*topic_names,
dtypes=None,
listener_node_name=None,
poll_rate_hz=None,
end_callback: Callable[[Any], bool] = lambda x: False,
):
"""Read updated frames from ROS."""
import rospy
from rospy.numpy_msg import numpy_msg
from rospy.client import _WFM
import std_msgs.msg
import random
import string
if dtypes is None:
raise ValueError(
"ROS cannot automatically determine the types of incoming numpy arrays. Please specify.\n"
"Options are: \n"
"\tfloat32, float64, bool, char, int16, "
"\tint32, int64, str, uint16, uint32, uint64, uint8"
)
if listener_node_name is None:
# https://stackoverflow.com/a/2257449
listener_node_name = "".join(
random.choices(string.ascii_uppercase + string.digits, k=8)
)
rospy.init_node(listener_node_name)
msg_types = [
{
np.float32: std_msgs.msg.Float32(),
np.float64: std_msgs.msg.Float64(),
np.bool: std_msgs.msg.Bool(),
np.char: std_msgs.msg.Char(),
np.int16: std_msgs.msg.Int16(),
np.int32: std_msgs.msg.Int32(),
np.int64: std_msgs.msg.Int64(),
np.str: std_msgs.msg.String(),
np.uint16: std_msgs.msg.UInt16(),
np.uint32: std_msgs.msg.UInt32(),
np.uint64: std_msgs.msg.UInt64(),
np.uint8: std_msgs.msg.UInt8(),
"float32": std_msgs.msg.Float32(),
"float64": std_msgs.msg.Float64(),
"bool": std_msgs.msg.Bool(),
"char": std_msgs.msg.Char(),
"int16": std_msgs.msg.Int16(),
"int32": std_msgs.msg.Int32(),
"int64": std_msgs.msg.Int64(),
"str": std_msgs.msg.String(),
"uint16": std_msgs.msg.UInt16(),
"uint32": std_msgs.msg.UInt32(),
"uint64": std_msgs.msg.UInt64(),
"uint8": std_msgs.msg.UInt8(),
}.get(
dtype, dtype
) # allow users to use their own custom messages in numpy arrays
for dtype in dtypes
]
s = None
cb_val = False
try:
wfms = {t: _WFM() for t in topic_names}
s = {
t: rospy.Subscriber(t, numpy_msg(msg_types[i]), wfms[t].cb)
for i, t in enumerate(topic_names)
}
while not cb_val:
while not rospy.core.is_shutdown():
if poll_rate_hz:
await asyncio.sleep(1.0 / poll_rate_hz)
else:
await asyncio.sleep(0)
for t, w in wfms.items():
if w.msg is not None:
yield t, w.msg
cb_val = end_callback(w.msg)
w.msg = None
except KeyboardInterrupt:
pass
finally:
if s is not None:
for _, sub in s.items():
sub.unregister()
if rospy.core.is_shutdown():
raise rospy.exceptions.ROSInterruptException("rospy shutdown")