From 405ce8f5471fded6b0a87a2909dc2502bb8caca3 Mon Sep 17 00:00:00 2001 From: SimLeek Date: Sun, 4 Jul 2021 22:27:00 -0700 Subject: [PATCH] Initial working zmq pub sub and topic integration --- displayarray/__init__.py | 3 +- displayarray/frame/frame_publishing.py | 127 +--------------------- displayarray/frame/zmq_to_opencv.py | 46 ++++++++ displayarray/window/subscriber_windows.py | 38 +++++++ examples/effects/manual_control.py | 2 +- examples/networking/__init__.py | 0 examples/networking/get.py | 15 +++ examples/networking/get_easy.py | 10 ++ examples/networking/to.py | 26 +++++ examples/networking/to_easy.py | 9 ++ setup.py | 3 +- 11 files changed, 153 insertions(+), 126 deletions(-) create mode 100644 displayarray/frame/zmq_to_opencv.py create mode 100644 examples/networking/__init__.py create mode 100644 examples/networking/get.py create mode 100644 examples/networking/get_easy.py create mode 100644 examples/networking/to.py create mode 100644 examples/networking/to_easy.py diff --git a/displayarray/__init__.py b/displayarray/__init__.py index b9aec02..9b3ba10 100644 --- a/displayarray/__init__.py +++ b/displayarray/__init__.py @@ -6,6 +6,5 @@ display is a function that displays these in their own windows. __version__ = "1.1.1" -from .window.subscriber_windows import display, breakpoint_display, read_updates -from .frame.frame_publishing import publish_updates_zero_mq, publish_updates_ros +from .window.subscriber_windows import display, breakpoint_display, read_updates, publish_updates from . import effects diff --git a/displayarray/frame/frame_publishing.py b/displayarray/frame/frame_publishing.py index 17b0418..ef25d4e 100644 --- a/displayarray/frame/frame_publishing.py +++ b/displayarray/frame/frame_publishing.py @@ -27,6 +27,7 @@ import numpy as np from displayarray.frame import subscriber_dictionary from .np_to_opencv import NpCam +from .zmq_to_opencv import ZmqCam from displayarray._uid import uid_for_source from typing import Union, Tuple, Optional, Dict, Any, List, Callable @@ -127,7 +128,10 @@ def pub_cam_loop_opencv( name = uid_for_source(cam_id) if isinstance(cam_id, (int, str)): - cam: Union[NpCam, cv2.VideoCapture] = cv2.VideoCapture(cam_id) + if isinstance(cam_id, str) and cam_id.startswith('tcp'): + cam = ZmqCam(cam_id) + else: + cam: Union[NpCam,ZmqCam, cv2.VideoCapture] = cv2.VideoCapture(cam_id) elif isinstance(cam_id, np.ndarray): cam = NpCam(cam_id) else: @@ -216,124 +220,3 @@ def pub_cam_thread( uid_dict[name] = t t.start() return t - - -async def publish_updates_zero_mq( - *vids, - callbacks: Optional[ - Union[Dict[Any, FrameCallable], List[FrameCallable], FrameCallable] - ] = None, - fps_limit=float("inf"), - size=(-1, -1), - end_callback: Callable[[], bool] = lambda: False, - blocking=False, - publishing_address="tcp://127.0.0.1:5600", - prepend_topic="", - flags=0, - copy=True, - track=False, -): - """Publish frames to ZeroMQ when they're updated.""" - import zmq - from displayarray import read_updates - - ctx = zmq.Context() - s = ctx.socket(zmq.PUB) - s.bind(publishing_address) - - if not blocking: - flags |= zmq.NOBLOCK - - try: - for v in read_updates(vids, callbacks, fps_limit, size, end_callback, blocking): - if v: - for vid_name, frame in v.items(): - md = dict( - dtype=str(frame.dtype), - shape=frame.shape, - name=prepend_topic + vid_name, - ) - s.send_json(md, flags | zmq.SNDMORE) - s.send(frame, flags, copy=copy, track=track) - if fps_limit: - await asyncio.sleep(1.0 / fps_limit) - else: - await asyncio.sleep(0) - except KeyboardInterrupt: - pass - finally: - vid_names = [uid_for_source(name) for name in vids] - for v in vid_names: - subscriber_dictionary.stop_cam(v) - - -async def publish_updates_ros( - *vids, - callbacks: Optional[ - Union[Dict[Any, FrameCallable], List[FrameCallable], FrameCallable] - ] = None, - fps_limit=float("inf"), - size=(-1, -1), - end_callback: Callable[[], bool] = lambda: False, - blocking=False, - node_name="displayarray", - publisher_name="npy", - rate_hz=None, - dtype=None, -): - """Publish frames to ROS when they're updated.""" - import rospy - from rospy.numpy_msg import numpy_msg - import std_msgs.msg - from displayarray import read_updates - - def get_msg_type(dtype): - if dtype is None: - msg_type = { - np.float32: std_msgs.msg.Float32(), - np.float64: std_msgs.msg.Float64(), - np.bool: std_msgs.msg.Bool(), - np.char: std_msgs.msg.Char(), - np.int16: std_msgs.msg.Int16(), - np.int32: std_msgs.msg.Int32(), - np.int64: std_msgs.msg.Int64(), - np.str: std_msgs.msg.String(), - np.uint16: std_msgs.msg.UInt16(), - np.uint32: std_msgs.msg.UInt32(), - np.uint64: std_msgs.msg.UInt64(), - np.uint8: std_msgs.msg.UInt8(), - }[dtype] - else: - msg_type = ( - dtype # allow users to use their own custom messages in numpy arrays - ) - return msg_type - - publishers: Dict[str, rospy.Publisher] = {} - rospy.init_node(node_name, anonymous=True) - try: - for v in read_updates(vids, callbacks, fps_limit, size, end_callback, blocking): - if v: - if rospy.is_shutdown(): - break - for vid_name, frame in v.items(): - if vid_name not in publishers: - dty = frame.dtype if dtype is None else dtype - publishers[vid_name] = rospy.Publisher( - publisher_name + vid_name, - numpy_msg(get_msg_type(dty)), - queue_size=10, - ) - publishers[vid_name].publish(frame) - if rate_hz: - await asyncio.sleep(1.0 / rate_hz) - else: - await asyncio.sleep(0) - except KeyboardInterrupt: - pass - finally: - vid_names = [uid_for_source(name) for name in vids] - for v in vid_names: - subscriber_dictionary.stop_cam(v) - if rospy.core.is_shutdown(): - raise rospy.exceptions.ROSInterruptException("rospy shutdown") diff --git a/displayarray/frame/zmq_to_opencv.py b/displayarray/frame/zmq_to_opencv.py new file mode 100644 index 0000000..13e4e9b --- /dev/null +++ b/displayarray/frame/zmq_to_opencv.py @@ -0,0 +1,46 @@ +import cv2 +import zmq +from tensorcom.tenbin import decode_buffer + +class ZmqCam(object): + """Add OpenCV camera controls to a numpy array.""" + + def __init__(self, img): + """Create a fake camera for OpenCV based on the initial array.""" + assert isinstance(img, str) + s = img.split('#') + self.__ctx = zmq.Context() + self.__addr = s[0] + self.__sub = self.__ctx.socket(zmq.SUB) + if len(s)>1: + self.__topic = bytes(s[1], 'ascii') + self.__sub.setsockopt(zmq.SUBSCRIBE, self.__topic) + else: + self.__topic = b"" + self.__sub.connect(self.__addr) + + self.__is_opened = True + + def set(self, *args, **kwargs): + """Set CAP_PROP_FRAME_WIDTH or CAP_PROP_FRAME_HEIGHT to scale a numpy array to that size.""" + pass + + @staticmethod + def get(*args, **kwargs): + """Get OpenCV args. Currently only a fake CAP_PROP_FRAME_COUNT to fix detecting video ends.""" + if args[0] == cv2.CAP_PROP_FRAME_COUNT: + return float("inf") + + def read(self): + """Read back the numpy array in standard "did it work", "the array", OpenCV format.""" + r = self.__sub.recv_multipart() + arrs = decode_buffer(r[1])[0] + return True, arrs + + def isOpened(self): # NOSONAR + """Hack to tell OpenCV we're opened until we call release.""" + return self.__is_opened + + def release(self): + """Let OpenCV know we're finished.""" + self.__is_opened = False diff --git a/displayarray/window/subscriber_windows.py b/displayarray/window/subscriber_windows.py index 7139ed6..f7af17f 100644 --- a/displayarray/window/subscriber_windows.py +++ b/displayarray/window/subscriber_windows.py @@ -29,6 +29,12 @@ try: except: get_bus_info_from_camera = None +try: + import zmq + from tensorcom.tenbin import encode_buffer +except: + warnings.warn("Could not import ZMQ and tensorcom. Cannot send messages between programs.") + class SubscriberWindows(object): """Windows that subscribe to updates to cameras, videos, and arrays.""" @@ -52,6 +58,9 @@ class SubscriberWindows(object): self.input_cams: List[str] = [] self.exited = False self.silent = silent + self.ctx = None + self.sock_list = [] + self.top_list = [] if callbacks is None: callbacks = [] @@ -237,6 +246,11 @@ class SubscriberWindows(object): self.update_frames() msg_cmd = sub_cmd.get() key = self.handle_keys(cv2.waitKey(1)) + if self.sock_list: + for s, t in zip(self.sock_list, self.top_list): + f = list(self.frames.values()) + if f: + s.send_multipart([t, encode_buffer(f[0])]) return msg_cmd, key def wait_for_init(self): @@ -277,6 +291,13 @@ class SubscriberWindows(object): window_commands.quit(force_all_read=False) self.__stop_all_cams() + def publish_to(self, address, topic=b""): + if self.ctx==None: + self.ctx = zmq.Context() + self.sock_list.append(self.ctx.socket(zmq.PUB)) + self.sock_list[-1].bind(address) + self.top_list.append(topic) + @property def cams(self): """Get the camera instances. Can be used for OpenCV or V4L2 functions, depending on backend.""" @@ -432,3 +453,20 @@ def breakpoint_display(*args, **kwargs): def read_updates(*args, **kwargs): """Read back all frame updates and yield a list of frames. List is empty if no frames were read.""" return display(*args, **kwargs, silent=True) + + +def publish_updates(*args, address="tcp://127.0.0.1:7880", topic=b"", **kwargs): + """Publish all the updates to the given address and topic""" + if kwargs['blocking']: + block = True + kwargs['blocking'] = False + else: + block = False + + r = read_updates(*args, **kwargs) + r.publish_to(address, topic) + + if block: + r.loop() + for vt in r.close_threads: + vt.join() diff --git a/examples/effects/manual_control.py b/examples/effects/manual_control.py index 758ce86..7a6f69c 100644 --- a/examples/effects/manual_control.py +++ b/examples/effects/manual_control.py @@ -21,7 +21,7 @@ i = 0 while d: if len(d.frames) > 0: i += 1 - frame = d.frames[0] + frame = next(iter(d.frames.values())) center_sin = [(m.sin(m.pi * (i / 70.0))), (m.cos(m.pi * (i / 120.0)))] pre_crop_callback.center = [ center_sin[0] * 720 / 2 + 720 / 2, diff --git a/examples/networking/__init__.py b/examples/networking/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/networking/get.py b/examples/networking/get.py new file mode 100644 index 0000000..d707a64 --- /dev/null +++ b/examples/networking/get.py @@ -0,0 +1,15 @@ +import zmq +from displayarray import display +from tensorcom.tenbin import decode_buffer + +ctx = zmq.Context() +s = ctx.socket(zmq.SUB) +s.setsockopt(zmq.SUBSCRIBE, b"topic") +s.connect("tcp://127.0.0.1:7880") + +d = display() +while True: + r = s.recv_multipart() + # r[0]=="topic" + arr = decode_buffer(r[1]) + d.update(arr[0], '0') diff --git a/examples/networking/get_easy.py b/examples/networking/get_easy.py new file mode 100644 index 0000000..1331c11 --- /dev/null +++ b/examples/networking/get_easy.py @@ -0,0 +1,10 @@ +from displayarray import display +import time + +t0 = time.time() +for up in display("tcp://127.0.0.1:7880#topic"): + if up: + t1 = time.time() + u = next(iter(up.values()))[0] + print(1.0 / (t1 - t0)) + t0 = t1 \ No newline at end of file diff --git a/examples/networking/to.py b/examples/networking/to.py new file mode 100644 index 0000000..a903ee1 --- /dev/null +++ b/examples/networking/to.py @@ -0,0 +1,26 @@ +from displayarray import read_updates +import numpy as np +import zmq +from tensorcom.tenbin import encode_buffer + +def black_and_white(arr): + return (np.sum(arr, axis=-1) / 3).astype(np.uint8) + + +import time + +t0 = t1 = time.time() + +ctx = zmq.Context() +s = ctx.socket(zmq.PUB) +s.bind("tcp://127.0.0.1:7880") + +for up in read_updates(0, size=(9999,9999)): + if up: + t1 = time.time() + u = next(iter(up.values()))[0] + s.send_multipart([b'topic', encode_buffer([u])]) + print(1.0 / (t1 - t0)) + t0 = t1 + + diff --git a/examples/networking/to_easy.py b/examples/networking/to_easy.py new file mode 100644 index 0000000..c2772d4 --- /dev/null +++ b/examples/networking/to_easy.py @@ -0,0 +1,9 @@ +from displayarray import publish_updates + +publish_updates( + 0, + size=(9999, 9999), + address="tcp://127.0.0.1:7880", + topic=b'topic', + blocking=True +) diff --git a/setup.py b/setup.py index 7fd9a58..a6556b3 100644 --- a/setup.py +++ b/setup.py @@ -44,7 +44,8 @@ setup( "localpubsub==0.0.4", "numpy>=1.17.0", "opencv-python==4.*,>=4.0.0", - "pyzmq==18.1.0", + "pyzmq>=22.0.3", + "tensorcom" ], extras_require={ # pypi doesn't allow direct dependencies for security reasons,