From b7baf00a37672c8d2710b382f382fbe6da65555f Mon Sep 17 00:00:00 2001 From: SimLeek Date: Tue, 22 Jan 2019 21:58:49 -0700 Subject: [PATCH] Switched from pubsubs to my own localpubsub library. Set to specific version. --- cvpubsubs/listen_default.py | 19 -------- cvpubsubs/webcam_pub/camctrl.py | 65 ++++++++++++++++++++++++--- cvpubsubs/webcam_pub/frame_handler.py | 22 ++++----- cvpubsubs/webcam_pub/pub_cam.py | 27 +++++++---- cvpubsubs/window_sub/cv_window_sub.py | 32 ++++++------- cvpubsubs/window_sub/winctrl.py | 17 ++++--- requirements.txt | 4 +- setup.py | 3 +- tests/test_sub_win.py | 16 ++++--- 9 files changed, 131 insertions(+), 74 deletions(-) delete mode 100644 cvpubsubs/listen_default.py diff --git a/cvpubsubs/listen_default.py b/cvpubsubs/listen_default.py deleted file mode 100644 index 31a050d..0000000 --- a/cvpubsubs/listen_default.py +++ /dev/null @@ -1,19 +0,0 @@ -import queue -if False: - from typing import Any, Optional, queue - - -def listen_default(sub, # type: queue - block=True, # type: bool - timeout=None, # type: Optional[float] - empty=None # type: Any - ): # type: (...)->Any - try: - msg = (sub.listen(block=block, timeout=timeout)) - try: - msg = next(msg)['data'] - except StopIteration: - msg = empty - except queue.Empty: - msg = empty - return msg diff --git a/cvpubsubs/webcam_pub/camctrl.py b/cvpubsubs/webcam_pub/camctrl.py index 289ebb0..dee6ea6 100644 --- a/cvpubsubs/webcam_pub/camctrl.py +++ b/cvpubsubs/webcam_pub/camctrl.py @@ -1,13 +1,68 @@ -import pubsub +from threading import Lock +from localpubsub import VariablePub, VariableSub if False: - from typing import Union + from typing import Union, Dict -class CamCtrl: +class CamHandler(object): + def __init__(self, name, sub): + self.name = name + self.cmd = None + self.sub = sub # type: VariableSub + self.pub = VariablePub() + self.cmd_pub = VariablePub() + + +class Cam(object): + def __init__(self, name): + self.name = name + self.cmd = None + self.frame_pub = VariablePub() + self.cmd_pub = VariablePub() + self.status_pub = VariablePub() + + +class CamCtrl(object): + cv_cam_handlers_dict = {} # type: Dict[str, CamHandler] + cv_cams_dict = {} # type: Dict[str, Cam] + + @staticmethod + def register_cam(cam_id): + cam = Cam(str(cam_id)) + CamCtrl.cv_cams_dict[str(cam_id)] = cam + CamCtrl.cv_cam_handlers_dict[str(cam_id)] = CamHandler(str(cam_id), cam.frame_pub.make_sub()) @staticmethod def stop_cam(cam_id # type: Union[int, str] ): - pubsub.publish("CVCamHandlers." + str(cam_id) + ".Cmd", 'quit') - pubsub.publish("CVCams." + str(cam_id) + ".Cmd", 'quit') + CamCtrl.cv_cams_dict[str(cam_id)].cmd_pub.publish('quit', blocking=True) + CamCtrl.cv_cam_handlers_dict[str(cam_id)].cmd_pub.publish('quit', blocking=True) + + @staticmethod + def cam_cmd_sub(cam_id, blocking=True): + if blocking: + while cam_id not in CamCtrl.cv_cams_dict: + continue + return CamCtrl.cv_cams_dict[str(cam_id)].cmd_pub.make_sub() + + @staticmethod + def cam_frame_sub(cam_id, blocking=True): + if blocking: + while cam_id not in CamCtrl.cv_cams_dict: + continue + return CamCtrl.cv_cams_dict[str(cam_id)].frame_pub.make_sub() + + @staticmethod + def cam_status_sub(cam_id, blocking=True): + if blocking: + while cam_id not in CamCtrl.cv_cams_dict: + continue + return CamCtrl.cv_cams_dict[str(cam_id)].status_pub.make_sub() + + @staticmethod + def handler_cmd_sub(cam_id, blocking=True): + if blocking: + while cam_id not in CamCtrl.cv_cam_handlers_dict: + continue + return CamCtrl.cv_cam_handlers_dict[str(cam_id)].cmd_pub.make_sub() \ No newline at end of file diff --git a/cvpubsubs/webcam_pub/frame_handler.py b/cvpubsubs/webcam_pub/frame_handler.py index e2a3b4f..87a61c6 100644 --- a/cvpubsubs/webcam_pub/frame_handler.py +++ b/cvpubsubs/webcam_pub/frame_handler.py @@ -1,9 +1,7 @@ import threading import numpy as np -import pubsub -from cvpubsubs.listen_default import listen_default from .pub_cam import pub_cam_thread from cvpubsubs.webcam_pub.camctrl import CamCtrl if False: @@ -22,7 +20,7 @@ def global_cv_display_callback(frame, # type: np.ndarray :param cam_id: The video or image source :type cam_id: Union[int, str] """ - SubscriberWindows.frame_dict[str(cam_id) + "frame"] = (frame,) + SubscriberWindows.frame_dict[str(cam_id) + "frame"] = frame display_callbacks = [global_cv_display_callback] @@ -65,16 +63,20 @@ class VideoHandlerThread(threading.Thread): def loop(self): """Continually gets frames from the video publisher, runs callbacks on them, and listens to commands.""" t = pub_cam_thread(self.video_source, self.request_size, self.high_speed, self.fps_limit) - sub_cam = pubsub.subscribe("CVCams." + str(self.cam_id) + ".Vid") - sub_owner = pubsub.subscribe("CVCamHandlers." + str(self.cam_id) + ".Cmd") - msg_owner = '' + while str(self.cam_id) not in CamCtrl.cv_cams_dict: + continue + sub_cam = CamCtrl.cam_frame_sub(str(self.cam_id)) + sub_owner = CamCtrl.handler_cmd_sub(str(self.cam_id)) + msg_owner = sub_owner.return_on_no_data = '' while msg_owner != 'quit': - frame = listen_default(sub_cam, timeout=.1) # type: np.ndarray + frame = sub_cam.get(blocking=True, timeout=1.0) # type: np.ndarray if frame is not None: - frame = frame[0] + frame = frame for c in self.callbacks: - c(frame, self.cam_id) - msg_owner = listen_default(sub_owner, block=False, empty='') + frame = c(frame, self.cam_id) + msg_owner = sub_owner.get() + sub_owner.release() + sub_cam.release() CamCtrl.stop_cam(self.cam_id) t.join() diff --git a/cvpubsubs/webcam_pub/pub_cam.py b/cvpubsubs/webcam_pub/pub_cam.py index 3e4718e..bc85149 100644 --- a/cvpubsubs/webcam_pub/pub_cam.py +++ b/cvpubsubs/webcam_pub/pub_cam.py @@ -1,11 +1,10 @@ import threading import time + import cv2 import numpy as np -import pubsub -from cvpubsubs.listen_default import listen_default from cvpubsubs.webcam_pub.camctrl import CamCtrl from .np_cam import NpCam @@ -31,17 +30,26 @@ def pub_cam_loop(cam_id, # type: Union[int, str] """ if isinstance(cam_id, (int, str)): - cam = cv2.VideoCapture(cam_id) name = str(cam_id) elif isinstance(cam_id, np.ndarray): - cam = NpCam(cam_id) # type: NpCam name = str(hash(str(cam_id))) else: raise TypeError("Only strings or ints representing cameras, or numpy arrays representing pictures supported.") + + if isinstance(cam_id, (int, str)): + cam = cv2.VideoCapture(cam_id) + elif isinstance(cam_id, np.ndarray): + cam = NpCam(cam_id) # type: NpCam + else: + raise TypeError("Only strings or ints representing cameras, or numpy arrays representing pictures supported.") + + CamCtrl.register_cam(name) + # cam.set(cv2.CAP_PROP_CONVERT_RGB, 0) frame_counter = 0 - sub = pubsub.subscribe("CVCams." + str(name) + ".Cmd") + sub = CamCtrl.cam_cmd_sub(name) + sub.return_on_no_data = '' msg = '' if high_speed: @@ -51,7 +59,7 @@ def pub_cam_loop(cam_id, # type: Union[int, str] cam.set(cv2.CAP_PROP_FRAME_HEIGHT, request_size[1]) if not cam.isOpened(): - pubsub.publish("CVCams." + name + ".Status", "failed") + CamCtrl.cv_cams_dict[name].status_pub.publish("failed") return False now = time.time() while msg != 'quit': @@ -60,15 +68,16 @@ def pub_cam_loop(cam_id, # type: Union[int, str] (ret, frame) = cam.read() # type: Tuple[bool, np.ndarray ] if ret is False or not isinstance(frame, np.ndarray): cam.release() - pubsub.publish("CVCams." + name + ".Status", "failed") + CamCtrl.cv_cams_dict[name].status_pub.publish("failed") return False if cam.get(cv2.CAP_PROP_FRAME_COUNT) > 0: frame_counter += 1 if frame_counter >= cam.get(cv2.CAP_PROP_FRAME_COUNT): frame_counter = 0 cam = cv2.VideoCapture(cam_id) - pubsub.publish("CVCams." + name + ".Vid", (frame,)) - msg = listen_default(sub, block=False, empty='') + CamCtrl.cv_cams_dict[name].frame_pub.publish(frame) + msg = sub.get() + sub.release() cam.release() return True diff --git a/cvpubsubs/window_sub/cv_window_sub.py b/cvpubsubs/window_sub/cv_window_sub.py index 157d842..eede534 100644 --- a/cvpubsubs/window_sub/cv_window_sub.py +++ b/cvpubsubs/window_sub/cv_window_sub.py @@ -1,12 +1,11 @@ import warnings import cv2 -import pubsub import numpy as np from .winctrl import WinCtrl -from ..listen_default import listen_default from ..webcam_pub.camctrl import CamCtrl +from localpubsub import NoData if False: from typing import List, Union, Callable, Any @@ -26,12 +25,12 @@ class SubscriberWindows(object): self.window_names = window_names self.source_names = [] for name in video_sources: - if len(str(name))<=1000: - self.source_names.append(str(name)) - self.input_vid_global_names = [str(name) + "frame" for name in video_sources] - elif isinstance(name, np.ndarray): + if isinstance(name, np.ndarray): self.source_names.append(str(hash(str(name)))) self.input_vid_global_names = [str(hash(str(name))) + "frame" for name in video_sources] + elif len(str(name))<=1000: + self.source_names.append(str(name)) + self.input_vid_global_names = [str(name) + "frame" for name in video_sources] else: raise ValueError("Input window name too long.") @@ -64,7 +63,7 @@ class SubscriberWindows(object): return 'quit' elif key_input not in [-1, 0]: try: - WinCtrl.key_stroke(chr(key_input)) + WinCtrl.key_pub.publish(chr(key_input)) except ValueError: warnings.warn( RuntimeWarning("Unknown key code: [{}]. Please report to cv_pubsubs issue page.".format(key_input)) @@ -73,24 +72,27 @@ class SubscriberWindows(object): def update_window_frames(self): win_num = 0 for i in range(len(self.input_vid_global_names)): - if self.input_vid_global_names[i] in self.frame_dict and self.frame_dict[ - self.input_vid_global_names[i]] is not None: + if self.input_vid_global_names[i] in self.frame_dict and not isinstance(self.frame_dict[ + self.input_vid_global_names[i]], NoData): if len(self.callbacks)>0 and self.callbacks[i % len(self.callbacks)] is not None: frames = self.callbacks[i % len(self.callbacks)](self.frame_dict[self.input_vid_global_names[i]]) else: frames = self.frame_dict[self.input_vid_global_names[i]] + if isinstance(frames, np.ndarray) and len(frames.shape)<=3: + frames = [frames] for f in range(len(frames)): cv2.imshow(self.window_names[win_num % len(self.window_names)] + " (press ESC to quit)", frames[f]) win_num += 1 # todo: figure out how to get the red x button to work. Try: https://stackoverflow.com/a/37881722/782170 def loop(self): - sub_cmd = pubsub.subscribe("CVWinCmd") + sub_cmd = WinCtrl.win_cmd_sub() msg_cmd = '' - while msg_cmd != 'quit': + key = '' + while msg_cmd != 'quit' and key != 'quit': self.update_window_frames() - msg_cmd = self.handle_keys(cv2.waitKey(1)) - if not msg_cmd: - msg_cmd = listen_default(sub_cmd, block=False, empty='') - WinCtrl.quit() + msg_cmd = sub_cmd.get() + key = self.handle_keys(cv2.waitKey(1)) + sub_cmd.release() + WinCtrl.quit(force_all_read=False) self.__stop_all_cams() diff --git a/cvpubsubs/window_sub/winctrl.py b/cvpubsubs/window_sub/winctrl.py index d971664..d5790b7 100644 --- a/cvpubsubs/window_sub/winctrl.py +++ b/cvpubsubs/window_sub/winctrl.py @@ -1,12 +1,17 @@ -import pubsub +import threading +import logging + +from localpubsub import VariablePub, VariableSub -class WinCtrl: +class WinCtrl(object): + key_pub = VariablePub() + win_cmd_pub = VariablePub() @staticmethod - def key_stroke(key_entered): - pubsub.publish("CVKeyStroke", key_entered) + def quit(force_all_read=True): + WinCtrl.win_cmd_pub.publish('quit', force_all_read=force_all_read) @staticmethod - def quit(): - pubsub.publish("CVWinCmd", "quit") + def win_cmd_sub(): # type: ()->VariableSub + return WinCtrl.win_cmd_pub.make_sub() # type: VariableSub diff --git a/requirements.txt b/requirements.txt index d04d9fd..530014e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ -pubsub numpy -opencv_python \ No newline at end of file +opencv_python +localpubsubs==0.0.1 \ No newline at end of file diff --git a/setup.py b/setup.py index 66e0580..187773c 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,8 @@ with open('cvpubsubs/__init__.py', 'r') as f: with open('README.md', 'r', encoding='utf-8') as f: readme = f.read() -REQUIRES = ['pubsub', 'numpy', 'opencv_python'] +with open('requirements.txt', 'r', encoding='utf-8') as f: + REQUIRES = f.readlines() setup( name='CVPubSubs', diff --git a/tests/test_sub_win.py b/tests/test_sub_win.py index efe732d..4a59bd6 100644 --- a/tests/test_sub_win.py +++ b/tests/test_sub_win.py @@ -1,27 +1,28 @@ import threading import unittest as ut -import pubsub import numpy as np import cvpubsubs.webcam_pub as w -from cvpubsubs.listen_default import listen_default from cvpubsubs.window_sub import SubscriberWindows +from cvpubsubs.window_sub.winctrl import WinCtrl if False: import numpy as np def print_keys_thread(): - sub_key = pubsub.subscribe("CVKeyStroke") - sub_cmd = pubsub.subscribe("CVWinCmd") + sub_key = WinCtrl.key_pub.make_sub() + sub_cmd = WinCtrl.win_cmd_pub.make_sub() + sub_cmd.return_on_no_data = '' msg_cmd = '' while msg_cmd != 'quit': - key_chr = listen_default(sub_key, timeout=.1) # type: np.ndarray + key_chr = sub_key.get(sub_key) # type: np.ndarray + WinCtrl.key_pub.publish(None) # consume data if key_chr is not None: print("key pressed: " + str(key_chr)) - msg_cmd = listen_default(sub_cmd, block=False, empty='') - pubsub.publish("CVWinCmd", 'quit') + msg_cmd = sub_cmd.get() + WinCtrl.quit(force_all_read=False) def start_print_keys_thread(): # type: (...) -> threading.Thread @@ -74,6 +75,7 @@ class TestSubWin(ut.TestCase): t.join() + @ut.skip("I don't have stereo cams... :(") def test_multi_cams_multi_source(self): t1 = w.VideoHandlerThread(0, request_size=(1920, 1080)) t2 = w.VideoHandlerThread(1, request_size=(1920, 1080))