Switched from pubsubs to my own localpubsub library. Set to specific version.

This commit is contained in:
SimLeek
2019-01-22 21:58:49 -07:00
parent 8825b3af11
commit 58eeb73e3d
9 changed files with 131 additions and 74 deletions

View File

@ -1,19 +0,0 @@
import queue
if False:
from typing import Any, Optional, queue
def listen_default(sub, # type: queue
block=True, # type: bool
timeout=None, # type: Optional[float]
empty=None # type: Any
): # type: (...)->Any
try:
msg = (sub.listen(block=block, timeout=timeout))
try:
msg = next(msg)['data']
except StopIteration:
msg = empty
except queue.Empty:
msg = empty
return msg

View File

@ -1,13 +1,68 @@
import pubsub
from threading import Lock
from localpubsub import VariablePub, VariableSub
if False:
from typing import Union
from typing import Union, Dict
class CamCtrl:
class CamHandler(object):
def __init__(self, name, sub):
self.name = name
self.cmd = None
self.sub = sub # type: VariableSub
self.pub = VariablePub()
self.cmd_pub = VariablePub()
class Cam(object):
def __init__(self, name):
self.name = name
self.cmd = None
self.frame_pub = VariablePub()
self.cmd_pub = VariablePub()
self.status_pub = VariablePub()
class CamCtrl(object):
cv_cam_handlers_dict = {} # type: Dict[str, CamHandler]
cv_cams_dict = {} # type: Dict[str, Cam]
@staticmethod
def register_cam(cam_id):
cam = Cam(str(cam_id))
CamCtrl.cv_cams_dict[str(cam_id)] = cam
CamCtrl.cv_cam_handlers_dict[str(cam_id)] = CamHandler(str(cam_id), cam.frame_pub.make_sub())
@staticmethod
def stop_cam(cam_id # type: Union[int, str]
):
pubsub.publish("CVCamHandlers." + str(cam_id) + ".Cmd", 'quit')
pubsub.publish("CVCams." + str(cam_id) + ".Cmd", 'quit')
CamCtrl.cv_cams_dict[str(cam_id)].cmd_pub.publish('quit', blocking=True)
CamCtrl.cv_cam_handlers_dict[str(cam_id)].cmd_pub.publish('quit', blocking=True)
@staticmethod
def cam_cmd_sub(cam_id, blocking=True):
if blocking:
while cam_id not in CamCtrl.cv_cams_dict:
continue
return CamCtrl.cv_cams_dict[str(cam_id)].cmd_pub.make_sub()
@staticmethod
def cam_frame_sub(cam_id, blocking=True):
if blocking:
while cam_id not in CamCtrl.cv_cams_dict:
continue
return CamCtrl.cv_cams_dict[str(cam_id)].frame_pub.make_sub()
@staticmethod
def cam_status_sub(cam_id, blocking=True):
if blocking:
while cam_id not in CamCtrl.cv_cams_dict:
continue
return CamCtrl.cv_cams_dict[str(cam_id)].status_pub.make_sub()
@staticmethod
def handler_cmd_sub(cam_id, blocking=True):
if blocking:
while cam_id not in CamCtrl.cv_cam_handlers_dict:
continue
return CamCtrl.cv_cam_handlers_dict[str(cam_id)].cmd_pub.make_sub()

View File

@ -1,9 +1,7 @@
import threading
import numpy as np
import pubsub
from cvpubsubs.listen_default import listen_default
from .pub_cam import pub_cam_thread
from cvpubsubs.webcam_pub.camctrl import CamCtrl
if False:
@ -22,7 +20,7 @@ def global_cv_display_callback(frame, # type: np.ndarray
:param cam_id: The video or image source
:type cam_id: Union[int, str]
"""
SubscriberWindows.frame_dict[str(cam_id) + "frame"] = (frame,)
SubscriberWindows.frame_dict[str(cam_id) + "frame"] = frame
display_callbacks = [global_cv_display_callback]
@ -65,16 +63,20 @@ class VideoHandlerThread(threading.Thread):
def loop(self):
"""Continually gets frames from the video publisher, runs callbacks on them, and listens to commands."""
t = pub_cam_thread(self.video_source, self.request_size, self.high_speed, self.fps_limit)
sub_cam = pubsub.subscribe("CVCams." + str(self.cam_id) + ".Vid")
sub_owner = pubsub.subscribe("CVCamHandlers." + str(self.cam_id) + ".Cmd")
msg_owner = ''
while str(self.cam_id) not in CamCtrl.cv_cams_dict:
continue
sub_cam = CamCtrl.cam_frame_sub(str(self.cam_id))
sub_owner = CamCtrl.handler_cmd_sub(str(self.cam_id))
msg_owner = sub_owner.return_on_no_data = ''
while msg_owner != 'quit':
frame = listen_default(sub_cam, timeout=.1) # type: np.ndarray
frame = sub_cam.get(blocking=True, timeout=1.0) # type: np.ndarray
if frame is not None:
frame = frame[0]
frame = frame
for c in self.callbacks:
c(frame, self.cam_id)
msg_owner = listen_default(sub_owner, block=False, empty='')
frame = c(frame, self.cam_id)
msg_owner = sub_owner.get()
sub_owner.release()
sub_cam.release()
CamCtrl.stop_cam(self.cam_id)
t.join()

View File

@ -1,11 +1,10 @@
import threading
import time
import cv2
import numpy as np
import pubsub
from cvpubsubs.listen_default import listen_default
from cvpubsubs.webcam_pub.camctrl import CamCtrl
from .np_cam import NpCam
@ -31,17 +30,26 @@ def pub_cam_loop(cam_id, # type: Union[int, str]
"""
if isinstance(cam_id, (int, str)):
cam = cv2.VideoCapture(cam_id)
name = str(cam_id)
elif isinstance(cam_id, np.ndarray):
cam = NpCam(cam_id) # type: NpCam
name = str(hash(str(cam_id)))
else:
raise TypeError("Only strings or ints representing cameras, or numpy arrays representing pictures supported.")
if isinstance(cam_id, (int, str)):
cam = cv2.VideoCapture(cam_id)
elif isinstance(cam_id, np.ndarray):
cam = NpCam(cam_id) # type: NpCam
else:
raise TypeError("Only strings or ints representing cameras, or numpy arrays representing pictures supported.")
CamCtrl.register_cam(name)
# cam.set(cv2.CAP_PROP_CONVERT_RGB, 0)
frame_counter = 0
sub = pubsub.subscribe("CVCams." + str(name) + ".Cmd")
sub = CamCtrl.cam_cmd_sub(name)
sub.return_on_no_data = ''
msg = ''
if high_speed:
@ -51,7 +59,7 @@ def pub_cam_loop(cam_id, # type: Union[int, str]
cam.set(cv2.CAP_PROP_FRAME_HEIGHT, request_size[1])
if not cam.isOpened():
pubsub.publish("CVCams." + name + ".Status", "failed")
CamCtrl.cv_cams_dict[name].status_pub.publish("failed")
return False
now = time.time()
while msg != 'quit':
@ -60,15 +68,16 @@ def pub_cam_loop(cam_id, # type: Union[int, str]
(ret, frame) = cam.read() # type: Tuple[bool, np.ndarray ]
if ret is False or not isinstance(frame, np.ndarray):
cam.release()
pubsub.publish("CVCams." + name + ".Status", "failed")
CamCtrl.cv_cams_dict[name].status_pub.publish("failed")
return False
if cam.get(cv2.CAP_PROP_FRAME_COUNT) > 0:
frame_counter += 1
if frame_counter >= cam.get(cv2.CAP_PROP_FRAME_COUNT):
frame_counter = 0
cam = cv2.VideoCapture(cam_id)
pubsub.publish("CVCams." + name + ".Vid", (frame,))
msg = listen_default(sub, block=False, empty='')
CamCtrl.cv_cams_dict[name].frame_pub.publish(frame)
msg = sub.get()
sub.release()
cam.release()
return True

View File

@ -1,12 +1,11 @@
import warnings
import cv2
import pubsub
import numpy as np
from .winctrl import WinCtrl
from ..listen_default import listen_default
from ..webcam_pub.camctrl import CamCtrl
from localpubsub import NoData
if False:
from typing import List, Union, Callable, Any
@ -26,12 +25,12 @@ class SubscriberWindows(object):
self.window_names = window_names
self.source_names = []
for name in video_sources:
if len(str(name))<=1000:
self.source_names.append(str(name))
self.input_vid_global_names = [str(name) + "frame" for name in video_sources]
elif isinstance(name, np.ndarray):
if isinstance(name, np.ndarray):
self.source_names.append(str(hash(str(name))))
self.input_vid_global_names = [str(hash(str(name))) + "frame" for name in video_sources]
elif len(str(name))<=1000:
self.source_names.append(str(name))
self.input_vid_global_names = [str(name) + "frame" for name in video_sources]
else:
raise ValueError("Input window name too long.")
@ -64,7 +63,7 @@ class SubscriberWindows(object):
return 'quit'
elif key_input not in [-1, 0]:
try:
WinCtrl.key_stroke(chr(key_input))
WinCtrl.key_pub.publish(chr(key_input))
except ValueError:
warnings.warn(
RuntimeWarning("Unknown key code: [{}]. Please report to cv_pubsubs issue page.".format(key_input))
@ -73,24 +72,27 @@ class SubscriberWindows(object):
def update_window_frames(self):
win_num = 0
for i in range(len(self.input_vid_global_names)):
if self.input_vid_global_names[i] in self.frame_dict and self.frame_dict[
self.input_vid_global_names[i]] is not None:
if self.input_vid_global_names[i] in self.frame_dict and not isinstance(self.frame_dict[
self.input_vid_global_names[i]], NoData):
if len(self.callbacks)>0 and self.callbacks[i % len(self.callbacks)] is not None:
frames = self.callbacks[i % len(self.callbacks)](self.frame_dict[self.input_vid_global_names[i]])
else:
frames = self.frame_dict[self.input_vid_global_names[i]]
if isinstance(frames, np.ndarray) and len(frames.shape)<=3:
frames = [frames]
for f in range(len(frames)):
cv2.imshow(self.window_names[win_num % len(self.window_names)] + " (press ESC to quit)", frames[f])
win_num += 1
# todo: figure out how to get the red x button to work. Try: https://stackoverflow.com/a/37881722/782170
def loop(self):
sub_cmd = pubsub.subscribe("CVWinCmd")
sub_cmd = WinCtrl.win_cmd_sub()
msg_cmd = ''
while msg_cmd != 'quit':
key = ''
while msg_cmd != 'quit' and key != 'quit':
self.update_window_frames()
msg_cmd = self.handle_keys(cv2.waitKey(1))
if not msg_cmd:
msg_cmd = listen_default(sub_cmd, block=False, empty='')
WinCtrl.quit()
msg_cmd = sub_cmd.get()
key = self.handle_keys(cv2.waitKey(1))
sub_cmd.release()
WinCtrl.quit(force_all_read=False)
self.__stop_all_cams()

View File

@ -1,12 +1,17 @@
import pubsub
import threading
import logging
from localpubsub import VariablePub, VariableSub
class WinCtrl:
class WinCtrl(object):
key_pub = VariablePub()
win_cmd_pub = VariablePub()
@staticmethod
def key_stroke(key_entered):
pubsub.publish("CVKeyStroke", key_entered)
def quit(force_all_read=True):
WinCtrl.win_cmd_pub.publish('quit', force_all_read=force_all_read)
@staticmethod
def quit():
pubsub.publish("CVWinCmd", "quit")
def win_cmd_sub(): # type: ()->VariableSub
return WinCtrl.win_cmd_pub.make_sub() # type: VariableSub

View File

@ -1,3 +1,3 @@
pubsub
numpy
opencv_python
opencv_python
localpubsubs==0.0.1

View File

@ -13,7 +13,8 @@ with open('cvpubsubs/__init__.py', 'r') as f:
with open('README.md', 'r', encoding='utf-8') as f:
readme = f.read()
REQUIRES = ['pubsub', 'numpy', 'opencv_python']
with open('requirements.txt', 'r', encoding='utf-8') as f:
REQUIRES = f.readlines()
setup(
name='CVPubSubs',

View File

@ -1,27 +1,28 @@
import threading
import unittest as ut
import pubsub
import numpy as np
import cvpubsubs.webcam_pub as w
from cvpubsubs.listen_default import listen_default
from cvpubsubs.window_sub import SubscriberWindows
from cvpubsubs.window_sub.winctrl import WinCtrl
if False:
import numpy as np
def print_keys_thread():
sub_key = pubsub.subscribe("CVKeyStroke")
sub_cmd = pubsub.subscribe("CVWinCmd")
sub_key = WinCtrl.key_pub.make_sub()
sub_cmd = WinCtrl.win_cmd_pub.make_sub()
sub_cmd.return_on_no_data = ''
msg_cmd = ''
while msg_cmd != 'quit':
key_chr = listen_default(sub_key, timeout=.1) # type: np.ndarray
key_chr = sub_key.get(sub_key) # type: np.ndarray
WinCtrl.key_pub.publish(None) # consume data
if key_chr is not None:
print("key pressed: " + str(key_chr))
msg_cmd = listen_default(sub_cmd, block=False, empty='')
pubsub.publish("CVWinCmd", 'quit')
msg_cmd = sub_cmd.get()
WinCtrl.quit(force_all_read=False)
def start_print_keys_thread(): # type: (...) -> threading.Thread
@ -74,6 +75,7 @@ class TestSubWin(ut.TestCase):
t.join()
@ut.skip("I don't have stereo cams... :(")
def test_multi_cams_multi_source(self):
t1 = w.VideoHandlerThread(0, request_size=(1920, 1080))
t2 = w.VideoHandlerThread(1, request_size=(1920, 1080))