Added ability to transmit tensors over zmq. Added zmq pub-sub topic handling. Added ndarray list handling. Renamed 'high_speed' mode to mjpg. Updated setup.py to handle more numpy versions.

This commit is contained in:
SimLeek
2021-08-10 12:31:53 -07:00
parent 405ce8f547
commit c0df2cab4e
6 changed files with 61 additions and 25 deletions

View File

@ -25,12 +25,18 @@ class SelectChannels(object):
def __call__(self, arr):
"""Run the channel selector."""
self.num_input_channels = arr.shape[-1]
out_arr = [
arr[..., min(max(0, x), arr.shape[-1] - 1)] for x in self.selected_channels
]
out_arr = np.stack(out_arr, axis=-1)
return out_arr
if isinstance(arr, list):
ars = []
for a in arr:
ars.append(self.__call__(a))
return ars
else:
self.num_input_channels = arr.shape[-1]
out_arr = [
arr[..., min(max(0, x), arr.shape[-1] - 1)] for x in self.selected_channels
]
out_arr = np.stack(out_arr, axis=-1)
return out_arr
def enable_mouse_control(self):
"""

View File

@ -38,7 +38,7 @@ FrameCallable = Callable[[np.ndarray], Optional[np.ndarray]]
def pub_cam_loop_pyv4l2(
cam_id: Union[int, str, np.ndarray],
request_size: Tuple[int, int] = (-1, -1),
high_speed: bool = True,
mjpg: bool = True,
fps_limit: float = float("inf"),
):
"""
@ -47,7 +47,7 @@ def pub_cam_loop_pyv4l2(
You can send a quit command 'quit' to CVCams.<cam_id>.Cmd
Status information, such as failure to open, will be posted to CVCams.<cam_id>.Status
:param high_speed: Selects mjpeg transferring, which most cameras seem to support, so speed isn't limited
:param mjpg: Selects mjpeg transferring, which most cameras seem to support, so speed isn't limited
:param fps_limit: Limits the frames per second.
:param cam_id: An integer representing which webcam to use, or a string representing a video file.
:param request_size: A tuple with width, then height, to request the video size.
@ -76,7 +76,7 @@ def pub_cam_loop_pyv4l2(
sub.return_on_no_data = ""
msg = ""
if high_speed and cam.pixel_format != "MJPEG":
if mjpg and cam.pixel_format != "MJPEG":
warnings.warn("Camera does not support high speed.")
now = time.time()
@ -109,7 +109,7 @@ def pub_cam_loop_pyv4l2(
def pub_cam_loop_opencv(
cam_id: Union[int, str, np.ndarray],
request_size: Tuple[int, int] = (-1, -1),
high_speed: bool = True,
mjpg: bool = True,
fps_limit: float = float("inf"),
extra: Optional[List[Tuple[int, int]]] = None,
) -> bool:
@ -119,7 +119,7 @@ def pub_cam_loop_opencv(
You can send a quit command 'quit' to CVCams.<cam_id>.Cmd
Status information, such as failure to open, will be posted to CVCams.<cam_id>.Status
:param high_speed: Selects mjpeg transferring, which most cameras seem to support, so speed isn't limited
:param mjpg: Selects mjpeg transferring, which most cameras seem to support, so speed isn't limited
:param fps_limit: Limits the frames per second.
:param cam_id: An integer representing which webcam to use, or a string representing a video file.
:param request_size: A tuple with width, then height, to request the video size.
@ -132,7 +132,7 @@ def pub_cam_loop_opencv(
cam = ZmqCam(cam_id)
else:
cam: Union[NpCam,ZmqCam, cv2.VideoCapture] = cv2.VideoCapture(cam_id)
elif isinstance(cam_id, np.ndarray):
elif isinstance(cam_id, (np.ndarray)):
cam = NpCam(cam_id)
else:
raise TypeError(
@ -147,7 +147,7 @@ def pub_cam_loop_opencv(
sub.return_on_no_data = ""
msg = ""
if high_speed:
if mjpg:
try:
cam.set(cv2.CAP_PROP_FOURCC, cv2.CAP_OPENCV_MJPEG)
except AttributeError:
@ -164,7 +164,7 @@ def pub_cam_loop_opencv(
time.sleep(1.0 / (fps_limit - (time.time() - now)))
now = time.time()
(ret, frame) = cam.read() # type: Tuple[bool, np.ndarray ]
if ret is False or not isinstance(frame, np.ndarray):
if ret is False or not isinstance(frame, (np.ndarray, list)):
cam.release()
subscriber_dictionary.CV_CAMS_DICT[name].status_pub.publish("failed")
return False
@ -187,7 +187,7 @@ uid_dict: Dict[str, threading.Thread] = {}
def pub_cam_thread(
cam_id: Union[int, str],
request_ize: Tuple[int, int] = (-1, -1),
high_speed: bool = True,
mjpg: bool = True,
fps_limit: float = float("inf"),
force_backend="",
) -> threading.Thread:
@ -215,7 +215,7 @@ def pub_cam_thread(
pub_cam_loop = pub_cam_loop_opencv
t = threading.Thread(
target=pub_cam_loop, args=(cam_id, request_ize, high_speed, fps_limit)
target=pub_cam_loop, args=(cam_id, request_ize, mjpg, fps_limit)
)
uid_dict[name] = t
t.start()

View File

@ -25,7 +25,7 @@ class FrameUpdater(threading.Thread):
video_source: Union[int, str, np.ndarray] = 0,
callbacks: Optional[Union[List[FrameCallable], FrameCallable]] = None,
request_size: Tuple[int, int] = (-1, -1),
high_speed: bool = True,
mjpg: bool = True,
fps_limit: float = float("inf"),
force_backend="",
):
@ -40,7 +40,7 @@ class FrameUpdater(threading.Thread):
else:
self.callbacks = callbacks
self.request_size = request_size
self.high_speed = high_speed
self.mjpg = mjpg
self.fps_limit = fps_limit
self.exception_raised = None
self.force_backend = force_backend
@ -88,7 +88,7 @@ class FrameUpdater(threading.Thread):
t = pub_cam_thread(
self.video_source,
self.request_size,
self.high_speed,
self.mjpg,
self.fps_limit,
self.force_backend,
)

View File

@ -34,7 +34,7 @@ class ZmqCam(object):
def read(self):
"""Read back the numpy array in standard "did it work", "the array", OpenCV format."""
r = self.__sub.recv_multipart()
arrs = decode_buffer(r[1])[0]
arrs = [decode_buffer(ri) for ri in r[1:]]
return True, arrs
def isOpened(self): # NOSONAR

View File

@ -234,8 +234,11 @@ class SubscriberWindows(object):
if not self.silent:
self.display_frames(self.frames)
def update(self, arr: np.ndarray = None, id: str = None):
def update(self, arr: Union[List[np.ndarray], np.ndarray] = None, id: Union[List[str],str] = None):
"""Update window frames once. Optionally add a new input and input id."""
if isinstance(arr, list):
return self._update_multiple(arr, id)
if arr is not None and id is not None:
global_cv_display_callback(arr, id)
if id not in self.input_cams:
@ -250,7 +253,28 @@ class SubscriberWindows(object):
for s, t in zip(self.sock_list, self.top_list):
f = list(self.frames.values())
if f:
s.send_multipart([t, encode_buffer(f[0])])
s.send_multipart([t] + [encode_buffer(fr) for fr in f])
return msg_cmd, key
def _update_multiple(self, arr: Union[List[np.ndarray], np.ndarray] = None, id: Union[List[str],str] = None):
if arr is not None and id is not None:
for arr_i, id_i in zip(arr, id):
global_cv_display_callback(arr_i, id_i)
if id_i not in self.input_cams:
self.add_source(id_i)
if not self.silent:
self.add_window(id_i)
sub_cmd = window_commands.win_cmd_sub()
self.update_frames()
msg_cmd = sub_cmd.get()
key = self.handle_keys(cv2.waitKey(1))
if self.sock_list:
for s, t in zip(self.sock_list, self.top_list):
f = list(self.frames.values())
if f:
s.send_multipart([t] + [encode_buffer(fr) for fr in f])
return msg_cmd, key
def wait_for_init(self):
@ -358,6 +382,7 @@ def _get_video_threads(
fps=float("inf"),
size=(-1, -1),
force_backend="",
mjpg=True
):
vid_threads: List[Thread] = []
if isinstance(callbacks, Dict):
@ -373,6 +398,7 @@ def _get_video_threads(
fps_limit=fps,
request_size=size,
force_backend=force_backend,
mjpg=mjpg
)
)
elif callable(callbacks):
@ -384,6 +410,7 @@ def _get_video_threads(
fps_limit=fps,
request_size=size,
force_backend=force_backend,
mjpg=mjpg
)
)
else:
@ -391,7 +418,7 @@ def _get_video_threads(
if v is not None:
vid_threads.append(
FrameUpdater(
v, fps_limit=fps, request_size=size, force_backend=force_backend
v, fps_limit=fps, request_size=size, force_backend=force_backend, mjpg=mjpg
)
)
return vid_threads
@ -412,6 +439,7 @@ def display(
size=(-1, -1),
silent=False,
force_backend="",
mjpg=True
):
"""
Display all the arrays, cameras, and videos passed in.
@ -426,6 +454,7 @@ def display(
fps=fps_limit,
size=size,
force_backend=force_backend,
mjpg=mjpg
)
for v in vid_threads:
v.start()
@ -457,7 +486,7 @@ def read_updates(*args, **kwargs):
def publish_updates(*args, address="tcp://127.0.0.1:7880", topic=b"", **kwargs):
"""Publish all the updates to the given address and topic"""
if kwargs['blocking']:
if 'blocking' in kwargs and kwargs['blocking']:
block = True
kwargs['blocking'] = False
else:
@ -470,3 +499,4 @@ def publish_updates(*args, address="tcp://127.0.0.1:7880", topic=b"", **kwargs):
r.loop()
for vt in r.close_threads:
vt.join()
return r

View File

@ -42,7 +42,7 @@ setup(
install_requires=[
"docopt==0.6.2",
"localpubsub==0.0.4",
"numpy>=1.17.0",
"numpy>=1.14.5",
"opencv-python==4.*,>=4.0.0",
"pyzmq>=22.0.3",
"tensorcom"