Updated major version due to breaking changes. Reformatted files and imports. Removed unused ctrl commands. Made all pubsub channels camel case. changed sub_win_loop to class. Removed implementation specific code. Removed unreliable 'x to quit' code. Made windows quit from pubsub. Added WinCtrl class for window specific commands. Added threaded interactive key press test. Removed testt file.

This commit is contained in:
SimLeek
2018-04-08 14:12:02 -07:00
parent 496a5eaffe
commit 2b7899377e
12 changed files with 143 additions and 105 deletions
View File
+48 -12
View File
@@ -1,22 +1,36 @@
import threading
import unittest as ut
import cv_pubsubs.webcam_pub as w
from cv_pubsubs.window_sub import frame_dict, sub_win_loop
def subscribe_to_key_command(cam_id, # type: Union[int, str]
frame_handler, # type: Callable[[int, np.ndarray], Any]
request_size=(1280, 720), # type: Tuple[int, int]
high_speed=False, # type: bool
fps_limit=240 # type: float
): # type: (...) -> threading.Thread
t = threading.Thread(target=frame_handler_loop, args=(cam_id, frame_handler, request_size, high_speed, fps_limit))
import pubsub
import cv_pubsubs.webcam_pub as w
from cv_pubsubs.listen_default import listen_default
from cv_pubsubs.window_sub import SubscriberWindows
def print_keys_thread():
sub_key = pubsub.subscribe("CVKeyStroke")
sub_cmd = pubsub.subscribe("CVWinCmd")
msg_cmd = ''
while msg_cmd != 'quit':
key_chr = listen_default(sub_key, timeout=.1) # type: np.ndarray
if key_chr is not None:
print("key pressed: " + str(key_chr))
msg_cmd = listen_default(sub_cmd, block=False, empty='')
pubsub.publish("CVWinCmd", 'quit')
def start_print_keys_thread(): # type: (...) -> threading.Thread
t = threading.Thread(target=print_keys_thread, args=())
t.start()
return t
class TestSubWin(ut.TestCase):
def test_sub(self):
def cam_handler(frame, cam_id):
frame_dict[str(cam_id) + "Frame"] = (frame, frame)
SubscriberWindows.frame_dict[str(cam_id) + "Frame"] = (frame, frame)
t = w.frame_handler_thread(0, cam_handler,
request_size=(1280, 720),
@@ -24,9 +38,31 @@ class TestSubWin(ut.TestCase):
fps_limit=240
)
sub_win_loop(names=['cammy', 'cammy2'],
input_vid_global_names=[str(0) + "Frame"])
SubscriberWindows(window_names=['cammy', 'cammy2'],
input_vid_global_names=[str(0) + "Frame"]
).loop()
w.CamCtrl.stop_cam(0)
t.join()
def test_key_sub(self):
def cam_handler(frame, cam_id):
SubscriberWindows.frame_dict[str(cam_id) + "Frame"] = (frame, frame)
t = w.frame_handler_thread(0, cam_handler,
request_size=(1280, 720),
high_speed=True,
fps_limit=240
)
kt = start_print_keys_thread()
SubscriberWindows(window_names=['cammy', 'cammy2'],
input_vid_global_names=[str(0) + "Frame"]
).loop()
w.CamCtrl.stop_cam(0)
t.join()
kt.join()