"""Publish frames so any function within this program can find them."""
import threading
import time
import asyncio
import cv2
import numpy as np
from displayarray.frame import subscriber_dictionary
from .np_to_opencv import NpCam
from displayarray._uid import uid_for_source
from typing import Union, Tuple, Optional, Dict, Any, List, Callable
FrameCallable = Callable[[np.ndarray], Optional[np.ndarray]]
def pub_cam_loop(
cam_id: Union[int, str, np.ndarray],
request_size: Tuple[int, int] = (-1, -1),
high_speed: bool = True,
fps_limit: float = float("inf"),
) -> bool:
"""
Publish whichever camera you select to CVCams.<cam_id>.Vid.
You can send a quit command 'quit' to CVCams.<cam_id>.Cmd
Status information, such as failure to open, will be posted to CVCams.<cam_id>.Status
:param high_speed: Selects mjpeg transferring, which most cameras seem to support, so speed isn't limited
:param fps_limit: Limits the frames per second.
:param cam_id: An integer representing which webcam to use, or a string representing a video file.
:param request_size: A tuple with width, then height, to request the video size.
:return: True if loop ended normally, False if it failed somehow.
"""
name = uid_for_source(cam_id)
if isinstance(cam_id, (int, str)):
cam: Union[NpCam, cv2.VideoCapture] = cv2.VideoCapture(cam_id)
elif isinstance(cam_id, np.ndarray):
cam = NpCam(cam_id)
else:
raise TypeError(
"Only strings or ints representing cameras, or numpy arrays representing pictures supported."
)
subscriber_dictionary.register_cam(name)
frame_counter = 0
sub = subscriber_dictionary.cam_cmd_sub(name)
sub.return_on_no_data = ""
msg = ""
if high_speed:
cam.set(cv2.CAP_PROP_FOURCC, cv2.CAP_OPENCV_MJPEG)
cam.set(cv2.CAP_PROP_FRAME_WIDTH, request_size[0])
cam.set(cv2.CAP_PROP_FRAME_HEIGHT, request_size[1])
if not cam.isOpened():
subscriber_dictionary.CV_CAMS_DICT[name].status_pub.publish("failed")
return False
now = time.time()
while msg != "quit":
time.sleep(1.0 / (fps_limit - (time.time() - now)))
now = time.time()
(ret, frame) = cam.read() # type: Tuple[bool, np.ndarray ]
if ret is False or not isinstance(frame, np.ndarray):
cam.release()
subscriber_dictionary.CV_CAMS_DICT[name].status_pub.publish("failed")
return False
if cam.get(cv2.CAP_PROP_FRAME_COUNT) > 0:
frame_counter += 1
if frame_counter >= cam.get(cv2.CAP_PROP_FRAME_COUNT):
frame_counter = 0
cam = cv2.VideoCapture(cam_id)
subscriber_dictionary.CV_CAMS_DICT[name].frame_pub.publish(frame)
msg = sub.get()
sub.release()
cam.release()
return True
[docs]def pub_cam_thread(
cam_id: Union[int, str],
request_ize: Tuple[int, int] = (-1, -1),
high_speed: bool = True,
fps_limit: float = float("inf"),
) -> threading.Thread:
"""Run pub_cam_loop in a new thread. Starts on creation."""
t = threading.Thread(
target=pub_cam_loop, args=(cam_id, request_ize, high_speed, fps_limit)
)
t.start()
return t
async def publish_updates_zero_mq(
*vids,
callbacks: Optional[
Union[Dict[Any, FrameCallable], List[FrameCallable], FrameCallable]
] = None,
fps_limit=float("inf"),
size=(-1, -1),
end_callback: Callable[[], bool] = lambda: False,
blocking=False,
publishing_address="tcp://127.0.0.1:5600",
prepend_topic="",
flags=0,
copy=True,
track=False
):
"""Publish frames to ZeroMQ when they're updated."""
import zmq
from displayarray import read_updates
ctx = zmq.Context()
s = ctx.socket(zmq.PUB)
s.bind(publishing_address)
if not blocking:
flags |= zmq.NOBLOCK
try:
for v in read_updates(vids, callbacks, fps_limit, size, end_callback, blocking):
if v:
for vid_name, frame in v.items():
md = dict(
dtype=str(frame.dtype),
shape=frame.shape,
name=prepend_topic + vid_name,
)
s.send_json(md, flags | zmq.SNDMORE)
s.send(frame, flags, copy=copy, track=track)
if fps_limit:
await asyncio.sleep(1.0 / fps_limit)
else:
await asyncio.sleep(0)
except KeyboardInterrupt:
pass
finally:
vid_names = [uid_for_source(name) for name in vids]
for v in vid_names:
subscriber_dictionary.stop_cam(v)
async def publish_updates_ros(
*vids,
callbacks: Optional[
Union[Dict[Any, FrameCallable], List[FrameCallable], FrameCallable]
] = None,
fps_limit=float("inf"),
size=(-1, -1),
end_callback: Callable[[], bool] = lambda: False,
blocking=False,
node_name="displayarray",
publisher_name="npy",
rate_hz=None,
dtype=None
):
"""Publish frames to ROS when they're updated."""
import rospy
from rospy.numpy_msg import numpy_msg
import std_msgs.msg
from displayarray import read_updates
def get_msg_type(dtype):
if dtype is None:
msg_type = {
np.float32: std_msgs.msg.Float32(),
np.float64: std_msgs.msg.Float64(),
np.bool: std_msgs.msg.Bool(),
np.char: std_msgs.msg.Char(),
np.int16: std_msgs.msg.Int16(),
np.int32: std_msgs.msg.Int32(),
np.int64: std_msgs.msg.Int64(),
np.str: std_msgs.msg.String(),
np.uint16: std_msgs.msg.UInt16(),
np.uint32: std_msgs.msg.UInt32(),
np.uint64: std_msgs.msg.UInt64(),
np.uint8: std_msgs.msg.UInt8(),
}[dtype]
else:
msg_type = (
dtype # allow users to use their own custom messages in numpy arrays
)
return msg_type
publishers: Dict[str, rospy.Publisher] = {}
rospy.init_node(node_name, anonymous=True)
try:
for v in read_updates(vids, callbacks, fps_limit, size, end_callback, blocking):
if v:
if rospy.is_shutdown():
break
for vid_name, frame in v.items():
if vid_name not in publishers:
dty = frame.dtype if dtype is None else dtype
publishers[vid_name] = rospy.Publisher(
publisher_name + vid_name,
numpy_msg(get_msg_type(dty)),
queue_size=10,
)
publishers[vid_name].publish(frame)
if rate_hz:
await asyncio.sleep(1.0 / rate_hz)
else:
await asyncio.sleep(0)
except KeyboardInterrupt:
pass
finally:
vid_names = [uid_for_source(name) for name in vids]
for v in vid_names:
subscriber_dictionary.stop_cam(v)
if rospy.core.is_shutdown():
raise rospy.exceptions.ROSInterruptException("rospy shutdown")