Initial working zmq pub sub and topic integration

This commit is contained in:
SimLeek
2021-07-04 22:27:00 -07:00
parent 2b2b5d353d
commit 405ce8f547
11 changed files with 153 additions and 126 deletions

View File

@ -6,6 +6,5 @@ display is a function that displays these in their own windows.
__version__ = "1.1.1"
from .window.subscriber_windows import display, breakpoint_display, read_updates
from .frame.frame_publishing import publish_updates_zero_mq, publish_updates_ros
from .window.subscriber_windows import display, breakpoint_display, read_updates, publish_updates
from . import effects

View File

@ -27,6 +27,7 @@ import numpy as np
from displayarray.frame import subscriber_dictionary
from .np_to_opencv import NpCam
from .zmq_to_opencv import ZmqCam
from displayarray._uid import uid_for_source
from typing import Union, Tuple, Optional, Dict, Any, List, Callable
@ -127,7 +128,10 @@ def pub_cam_loop_opencv(
name = uid_for_source(cam_id)
if isinstance(cam_id, (int, str)):
cam: Union[NpCam, cv2.VideoCapture] = cv2.VideoCapture(cam_id)
if isinstance(cam_id, str) and cam_id.startswith('tcp'):
cam = ZmqCam(cam_id)
else:
cam: Union[NpCam,ZmqCam, cv2.VideoCapture] = cv2.VideoCapture(cam_id)
elif isinstance(cam_id, np.ndarray):
cam = NpCam(cam_id)
else:
@ -216,124 +220,3 @@ def pub_cam_thread(
uid_dict[name] = t
t.start()
return t
async def publish_updates_zero_mq(
*vids,
callbacks: Optional[
Union[Dict[Any, FrameCallable], List[FrameCallable], FrameCallable]
] = None,
fps_limit=float("inf"),
size=(-1, -1),
end_callback: Callable[[], bool] = lambda: False,
blocking=False,
publishing_address="tcp://127.0.0.1:5600",
prepend_topic="",
flags=0,
copy=True,
track=False,
):
"""Publish frames to ZeroMQ when they're updated."""
import zmq
from displayarray import read_updates
ctx = zmq.Context()
s = ctx.socket(zmq.PUB)
s.bind(publishing_address)
if not blocking:
flags |= zmq.NOBLOCK
try:
for v in read_updates(vids, callbacks, fps_limit, size, end_callback, blocking):
if v:
for vid_name, frame in v.items():
md = dict(
dtype=str(frame.dtype),
shape=frame.shape,
name=prepend_topic + vid_name,
)
s.send_json(md, flags | zmq.SNDMORE)
s.send(frame, flags, copy=copy, track=track)
if fps_limit:
await asyncio.sleep(1.0 / fps_limit)
else:
await asyncio.sleep(0)
except KeyboardInterrupt:
pass
finally:
vid_names = [uid_for_source(name) for name in vids]
for v in vid_names:
subscriber_dictionary.stop_cam(v)
async def publish_updates_ros(
*vids,
callbacks: Optional[
Union[Dict[Any, FrameCallable], List[FrameCallable], FrameCallable]
] = None,
fps_limit=float("inf"),
size=(-1, -1),
end_callback: Callable[[], bool] = lambda: False,
blocking=False,
node_name="displayarray",
publisher_name="npy",
rate_hz=None,
dtype=None,
):
"""Publish frames to ROS when they're updated."""
import rospy
from rospy.numpy_msg import numpy_msg
import std_msgs.msg
from displayarray import read_updates
def get_msg_type(dtype):
if dtype is None:
msg_type = {
np.float32: std_msgs.msg.Float32(),
np.float64: std_msgs.msg.Float64(),
np.bool: std_msgs.msg.Bool(),
np.char: std_msgs.msg.Char(),
np.int16: std_msgs.msg.Int16(),
np.int32: std_msgs.msg.Int32(),
np.int64: std_msgs.msg.Int64(),
np.str: std_msgs.msg.String(),
np.uint16: std_msgs.msg.UInt16(),
np.uint32: std_msgs.msg.UInt32(),
np.uint64: std_msgs.msg.UInt64(),
np.uint8: std_msgs.msg.UInt8(),
}[dtype]
else:
msg_type = (
dtype # allow users to use their own custom messages in numpy arrays
)
return msg_type
publishers: Dict[str, rospy.Publisher] = {}
rospy.init_node(node_name, anonymous=True)
try:
for v in read_updates(vids, callbacks, fps_limit, size, end_callback, blocking):
if v:
if rospy.is_shutdown():
break
for vid_name, frame in v.items():
if vid_name not in publishers:
dty = frame.dtype if dtype is None else dtype
publishers[vid_name] = rospy.Publisher(
publisher_name + vid_name,
numpy_msg(get_msg_type(dty)),
queue_size=10,
)
publishers[vid_name].publish(frame)
if rate_hz:
await asyncio.sleep(1.0 / rate_hz)
else:
await asyncio.sleep(0)
except KeyboardInterrupt:
pass
finally:
vid_names = [uid_for_source(name) for name in vids]
for v in vid_names:
subscriber_dictionary.stop_cam(v)
if rospy.core.is_shutdown():
raise rospy.exceptions.ROSInterruptException("rospy shutdown")

View File

@ -0,0 +1,46 @@
import cv2
import zmq
from tensorcom.tenbin import decode_buffer
class ZmqCam(object):
"""Add OpenCV camera controls to a numpy array."""
def __init__(self, img):
"""Create a fake camera for OpenCV based on the initial array."""
assert isinstance(img, str)
s = img.split('#')
self.__ctx = zmq.Context()
self.__addr = s[0]
self.__sub = self.__ctx.socket(zmq.SUB)
if len(s)>1:
self.__topic = bytes(s[1], 'ascii')
self.__sub.setsockopt(zmq.SUBSCRIBE, self.__topic)
else:
self.__topic = b""
self.__sub.connect(self.__addr)
self.__is_opened = True
def set(self, *args, **kwargs):
"""Set CAP_PROP_FRAME_WIDTH or CAP_PROP_FRAME_HEIGHT to scale a numpy array to that size."""
pass
@staticmethod
def get(*args, **kwargs):
"""Get OpenCV args. Currently only a fake CAP_PROP_FRAME_COUNT to fix detecting video ends."""
if args[0] == cv2.CAP_PROP_FRAME_COUNT:
return float("inf")
def read(self):
"""Read back the numpy array in standard "did it work", "the array", OpenCV format."""
r = self.__sub.recv_multipart()
arrs = decode_buffer(r[1])[0]
return True, arrs
def isOpened(self): # NOSONAR
"""Hack to tell OpenCV we're opened until we call release."""
return self.__is_opened
def release(self):
"""Let OpenCV know we're finished."""
self.__is_opened = False

View File

@ -29,6 +29,12 @@ try:
except:
get_bus_info_from_camera = None
try:
import zmq
from tensorcom.tenbin import encode_buffer
except:
warnings.warn("Could not import ZMQ and tensorcom. Cannot send messages between programs.")
class SubscriberWindows(object):
"""Windows that subscribe to updates to cameras, videos, and arrays."""
@ -52,6 +58,9 @@ class SubscriberWindows(object):
self.input_cams: List[str] = []
self.exited = False
self.silent = silent
self.ctx = None
self.sock_list = []
self.top_list = []
if callbacks is None:
callbacks = []
@ -237,6 +246,11 @@ class SubscriberWindows(object):
self.update_frames()
msg_cmd = sub_cmd.get()
key = self.handle_keys(cv2.waitKey(1))
if self.sock_list:
for s, t in zip(self.sock_list, self.top_list):
f = list(self.frames.values())
if f:
s.send_multipart([t, encode_buffer(f[0])])
return msg_cmd, key
def wait_for_init(self):
@ -277,6 +291,13 @@ class SubscriberWindows(object):
window_commands.quit(force_all_read=False)
self.__stop_all_cams()
def publish_to(self, address, topic=b""):
if self.ctx==None:
self.ctx = zmq.Context()
self.sock_list.append(self.ctx.socket(zmq.PUB))
self.sock_list[-1].bind(address)
self.top_list.append(topic)
@property
def cams(self):
"""Get the camera instances. Can be used for OpenCV or V4L2 functions, depending on backend."""
@ -432,3 +453,20 @@ def breakpoint_display(*args, **kwargs):
def read_updates(*args, **kwargs):
"""Read back all frame updates and yield a list of frames. List is empty if no frames were read."""
return display(*args, **kwargs, silent=True)
def publish_updates(*args, address="tcp://127.0.0.1:7880", topic=b"", **kwargs):
"""Publish all the updates to the given address and topic"""
if kwargs['blocking']:
block = True
kwargs['blocking'] = False
else:
block = False
r = read_updates(*args, **kwargs)
r.publish_to(address, topic)
if block:
r.loop()
for vt in r.close_threads:
vt.join()

View File

@ -21,7 +21,7 @@ i = 0
while d:
if len(d.frames) > 0:
i += 1
frame = d.frames[0]
frame = next(iter(d.frames.values()))
center_sin = [(m.sin(m.pi * (i / 70.0))), (m.cos(m.pi * (i / 120.0)))]
pre_crop_callback.center = [
center_sin[0] * 720 / 2 + 720 / 2,

View File

View File

@ -0,0 +1,15 @@
import zmq
from displayarray import display
from tensorcom.tenbin import decode_buffer
ctx = zmq.Context()
s = ctx.socket(zmq.SUB)
s.setsockopt(zmq.SUBSCRIBE, b"topic")
s.connect("tcp://127.0.0.1:7880")
d = display()
while True:
r = s.recv_multipart()
# r[0]=="topic"
arr = decode_buffer(r[1])
d.update(arr[0], '0')

View File

@ -0,0 +1,10 @@
from displayarray import display
import time
t0 = time.time()
for up in display("tcp://127.0.0.1:7880#topic"):
if up:
t1 = time.time()
u = next(iter(up.values()))[0]
print(1.0 / (t1 - t0))
t0 = t1

26
examples/networking/to.py Normal file
View File

@ -0,0 +1,26 @@
from displayarray import read_updates
import numpy as np
import zmq
from tensorcom.tenbin import encode_buffer
def black_and_white(arr):
return (np.sum(arr, axis=-1) / 3).astype(np.uint8)
import time
t0 = t1 = time.time()
ctx = zmq.Context()
s = ctx.socket(zmq.PUB)
s.bind("tcp://127.0.0.1:7880")
for up in read_updates(0, size=(9999,9999)):
if up:
t1 = time.time()
u = next(iter(up.values()))[0]
s.send_multipart([b'topic', encode_buffer([u])])
print(1.0 / (t1 - t0))
t0 = t1

View File

@ -0,0 +1,9 @@
from displayarray import publish_updates
publish_updates(
0,
size=(9999, 9999),
address="tcp://127.0.0.1:7880",
topic=b'topic',
blocking=True
)

View File

@ -44,7 +44,8 @@ setup(
"localpubsub==0.0.4",
"numpy>=1.17.0",
"opencv-python==4.*,>=4.0.0",
"pyzmq==18.1.0",
"pyzmq>=22.0.3",
"tensorcom"
],
extras_require={
# pypi doesn't allow direct dependencies for security reasons,