Changed frame_handler_thread to VideoHandlerThread class. Added display_callbacks. Added frame_handler documentation. Added display function. Added set_global_frames_dict. Update VideoHandlerThread to use callback list. Added tests. Used tests as examples in readme. Updated pypi readme.

This commit is contained in:
SimLeek
2018-08-09 21:48:12 -07:00
parent 356558a953
commit 256df87eaa
8 changed files with 209 additions and 66 deletions

View File

@ -12,6 +12,76 @@ Python 2.7/3.5+ and PyPy.
$ pip install CVPubSubs
Usage
-----------
###Video Editing and Publishing
####Display your webcam
import cvpubsubs.webcam_pub as w
w.VideoHandlerThread().display()
####Change Display Arguments
import cvpubsubs.webcam_pub as w
video_thread = w.VideoHandlerThread(video_source=0,
callbacks = w.display_callbacks,
request_size=(800, 600),
high_speed = False,
fps_limit = 8
)
video_thread.display()
####Run your own functions on the frames
import cvpubsubs.webcam_pub as w
def redden_frame_print_spam(frame, cam_id):
frame[:, :, 0] = 0
frame[:, :, 1] = 0
print("Spam!")
w.VideoHandlerThread(callbacks=[redden_frame_print_spam] + w.display_callbacks).display()
####Display multiple windows from one source
import cvpubsubs.webcam_pub as w
from cvpubsubs.window_sub import SubscriberWindows
def cam_handler(frame, cam_id):
SubscriberWindows.set_global_frame_dict(cam_id, frame, frame)
t = w.VideoHandlerThread(0, [cam_handler],
request_size=(1280, 720),
high_speed=True,
fps_limit=240
)
t.start()
SubscriberWindows(window_names=['cammy', 'cammy2'],
video_sources=[str(0)]
).loop()
t.join()
####Display multiple windows from multiple sources
iport cvpubsubs.webcam_pub as w
from cvpubsubs.window_sub import SubscriberWindows
t1 = w.VideoHandlerThread(0)
t2 = w.VideoHandlerThread(1)
t1.start()
t2.start()
SubscriberWindows(window_names=['cammy', 'cammy2'],
video_sources=[0,1]
).loop()
t1.join()
t1.join()
License
-------

View File

@ -0,0 +1 @@
__version__ = '0.2.0'

View File

@ -1,4 +1,4 @@
from .camctrl import CamCtrl
from .frame_handler import frame_handler_thread
from .frame_handler import VideoHandlerThread, display_callbacks
from .get_cam_ids import get_cam_ids
from .pub_cam import pub_cam_thread

View File

@ -7,35 +7,78 @@ from cvpubsubs.listen_default import listen_default
from .pub_cam import pub_cam_thread
if False:
from typing import Union, Tuple, Any, Callable
from typing import Union, Tuple, Any, Callable, List
from cvpubsubs.window_sub import SubscriberWindows
def frame_handler_loop(cam_id, # type: Union[int, str]
frame_handler, # type: Callable[[np.ndarray, int], Any]
request_size=(1280, 720), # type: Tuple[int, int]
high_speed=False, # type: bool
fps_limit=240 # type: float
):
t = pub_cam_thread(cam_id, request_size, high_speed, fps_limit)
sub_cam = pubsub.subscribe("CVCams." + str(cam_id) + ".Vid")
sub_owner = pubsub.subscribe("CVCamHandlers." + str(cam_id) + ".Cmd")
msg_owner = ''
while msg_owner != 'quit':
frame = listen_default(sub_cam, timeout=.1) # type: np.ndarray
if frame is not None:
frame = frame[0]
frame_handler(frame, cam_id)
msg_owner = listen_default(sub_owner, block=False, empty='')
pubsub.publish("CVCams." + str(cam_id) + ".Cmd", 'quit')
t.join()
def global_cv_display_callback(frame, # type: np.ndarray
cam_id # type: Union[int, str]
):
"""Default callback for sending frames to the global frame dictionary.
:param frame: The video or image frame
:type frame: np.ndarray
:param cam_id: The video or image source
:type cam_id: Union[int, str]
"""
SubscriberWindows.frame_dict[str(cam_id) + "frame"] = (frame,)
def frame_handler_thread(cam_id, # type: Union[int, str]
frame_handler, # type: Callable[[int, np.ndarray], Any]
request_size=(1280, 720), # type: Tuple[int, int]
high_speed=False, # type: bool
fps_limit=240 # type: float
): # type: (...) -> threading.Thread
t = threading.Thread(target=frame_handler_loop, args=(cam_id, frame_handler, request_size, high_speed, fps_limit))
t.start()
return t
display_callbacks = [global_cv_display_callback]
class VideoHandlerThread(threading.Thread):
"Thread for publishing frames from a video source."
def __init__(self, video_source=0, # type: Union[int, str]
callbacks=(global_cv_display_callback,), # type: List[Callable[[np.ndarray, int], Any]]
request_size=(1280, 720), # type: Tuple[int, int]
high_speed=True, # type: bool
fps_limit=240 # type: float
):
"""Sets up the main thread loop.
:param video_source: The video or image source. Integers typically access webcams, while strings access files.
:type video_source: Union[int, str]
:param callbacks: A list of operations to be performed on every frame, including publishing.
:type callbacks: List[Callable[[np.ndarray, int], Any]]
:param request_size: Requested video size. Actual size may vary, since this is requesting from the hardware.
:type request_size: Tuple[int, int]
:param high_speed: If true, use compression to increase speed.
:type high_speed: bool
:param fps_limit: Limits frames per second.
:type fps_limit: float
"""
super(VideoHandlerThread, self).__init__(target=self.loop, args=())
self.cam_id = video_source
self.callbacks = callbacks
self.request_size = request_size
self.high_speed = high_speed
self.fps_limit = fps_limit
def loop(self):
"""Continually gets frames from the video publisher, runs callbacks on them, and listens to commands."""
t = pub_cam_thread(self.cam_id, self.request_size, self.high_speed, self.fps_limit)
sub_cam = pubsub.subscribe("CVCams." + str(self.cam_id) + ".Vid")
sub_owner = pubsub.subscribe("CVCamHandlers." + str(self.cam_id) + ".Cmd")
msg_owner = ''
while msg_owner != 'quit':
frame = listen_default(sub_cam, timeout=.1) # type: np.ndarray
if frame is not None:
frame = frame[0]
for c in self.callbacks:
c(frame, self.cam_id)
msg_owner = listen_default(sub_owner, block=False, empty='')
pubsub.publish("CVCams." + str(self.cam_id) + ".Cmd", 'quit')
t.join()
def display(self,
callbacks=() # type: List[Callable[[List[np.ndarray]], Any]]
):
"""Default display operation. For multiple video sources, please use something outside of this class.
:param callbacks: List of callbacks to be run on frames before displaying to the screen.
:type callbacks: List[Callable[[List[np.ndarray]], Any]]
"""
self.start()
SubscriberWindows(video_sources=[self.cam_id], callbacks=callbacks).loop()
self.join()

View File

@ -8,7 +8,8 @@ from ..listen_default import listen_default
from ..webcam_pub.camctrl import CamCtrl
if False:
from typing import List
from typing import List, Union, Callable, Any
import numpy as np
class SubscriberWindows(object):
@ -17,15 +18,23 @@ class SubscriberWindows(object):
esc_key_codes = [27] # ESC key on most keyboards
def __init__(self,
window_names, # type: List[str]
input_vid_global_names, # type: List[str]
callbacks=(None,),
input_cams=(0,)
window_names=('cvpubsubs',), # type: List[str]
video_sources=(0,), # type: List[Union[str,int]]
callbacks=(None,), # type: List[Callable[[List[np.ndarray]], Any]]
):
self.window_names = window_names
self.input_vid_global_names = input_vid_global_names
self.input_vid_global_names = [str(name) + "frame" for name in video_sources]
self.callbacks = callbacks
self.input_cams = input_cams
self.input_cams = video_sources
@staticmethod
def set_global_frame_dict(name, *args):
SubscriberWindows.frame_dict[str(name)+"frame"] = [*args]
def __stop_all_cams(self):
for c in self.input_cams:
CamCtrl.stop_cam(c)
def handle_keys(self,
key_input, # type: int
@ -33,8 +42,7 @@ class SubscriberWindows(object):
if key_input in self.esc_key_codes:
for name in self.window_names:
cv2.destroyWindow(name + " (press ESC to quit)")
for c in self.input_cams:
CamCtrl.stop_cam(c)
self.__stop_all_cams()
WinCtrl.quit()
elif key_input not in [-1, 0]:
try:
@ -45,15 +53,17 @@ class SubscriberWindows(object):
)
def update_window_frames(self):
win_num = 0
for i in range(len(self.input_vid_global_names)):
if self.input_vid_global_names[i] in self.frame_dict and self.frame_dict[
self.input_vid_global_names[i]] is not None:
if self.callbacks[i % len(self.callbacks)] is not None:
if len(self.callbacks)>0 and self.callbacks[i % len(self.callbacks)] is not None:
frames = self.callbacks[i % len(self.callbacks)](self.frame_dict[self.input_vid_global_names[i]])
else:
frames = self.frame_dict[self.input_vid_global_names[i]]
for f in range(len(frames)):
cv2.imshow(self.window_names[f % len(self.window_names)] + " (press ESC to quit)", frames[f])
cv2.imshow(self.window_names[win_num % len(self.window_names)] + " (press ESC to quit)", frames[f])
win_num += 1
# todo: figure out how to get the red x button to work. Try: https://stackoverflow.com/a/37881722/782170
def loop(self):
@ -64,3 +74,4 @@ class SubscriberWindows(object):
self.handle_keys(cv2.waitKey(1))
msg_cmd = listen_default(sub_cmd, block=False, empty='')
pubsub.publish("CVWinCmd", 'quit')
self.__stop_all_cams()

View File

@ -20,6 +20,7 @@ setup(
version=version,
description='',
long_description=readme,
long_description_content_type='text/markdown',
author='SimLeek',
author_email='josh.miklos@gmail.com',
maintainer='SimLeek',

View File

@ -14,7 +14,7 @@ class TestFrameHandler(ut.TestCase):
print(frame.shape)
self.i += 1
w.frame_handler_thread(0, test_frame_handler,
w.VideoHandlerThread(0, [test_frame_handler],
request_size=(1280, 720),
high_speed=True,
fps_limit=240)

View File

@ -7,6 +7,9 @@ import cvpubsubs.webcam_pub as w
from cvpubsubs.listen_default import listen_default
from cvpubsubs.window_sub import SubscriberWindows
if False:
import numpy as np
def print_keys_thread():
sub_key = pubsub.subscribe("CVKeyStroke")
@ -29,40 +32,54 @@ def start_print_keys_thread(): # type: (...) -> threading.Thread
class TestSubWin(ut.TestCase):
def test_sub(self):
def cam_handler(frame, cam_id):
SubscriberWindows.frame_dict[str(cam_id) + "Frame"] = (frame, frame)
w.VideoHandlerThread().display()
t = w.frame_handler_thread(0, cam_handler,
request_size=(1280, 720),
high_speed=True,
fps_limit=240
)
def test_sub_with_args(self):
video_thread = w.VideoHandlerThread(video_source=0,
callbacks=w.display_callbacks,
request_size=(800, 600),
high_speed=False,
fps_limit=8
)
video_thread.display()
def test_sub_with_callback(self):
def redden_frame_print_spam(frame, cam_id):
frame[:, :, 0] = 0
frame[:, :, 1] = 0
print("Spam!")
w.VideoHandlerThread(callbacks=[redden_frame_print_spam] + w.display_callbacks).display()
def test_multi_cams_one_source(self):
def cam_handler(frame, cam_id):
SubscriberWindows.set_global_frame_dict(cam_id, frame, frame)
t = w.VideoHandlerThread(0, [cam_handler],
request_size=(1280, 720),
high_speed=True,
fps_limit=240
)
t.start()
SubscriberWindows(window_names=['cammy', 'cammy2'],
input_vid_global_names=[str(0) + "Frame"]
video_sources=[str(0)]
).loop()
w.CamCtrl.stop_cam(0)
t.join()
def test_key_sub(self):
def cam_handler(frame, cam_id):
SubscriberWindows.frame_dict[str(cam_id) + "Frame"] = (frame, frame)
def test_multi_cams_multi_source(self):
t1 = w.VideoHandlerThread(0)
t2 = w.VideoHandlerThread(1)
t = w.frame_handler_thread(0, cam_handler,
request_size=(1280, 720),
high_speed=True,
fps_limit=240
)
kt = start_print_keys_thread()
t1.start()
t2.start()
SubscriberWindows(window_names=['cammy', 'cammy2'],
input_vid_global_names=[str(0) + "Frame"]
video_sources=[0,1]
).loop()
w.CamCtrl.stop_cam(0)
t.join()
kt.join()
t1.join()
t1.join()