moved quick start to front

This commit is contained in:
Josh Miklos
2018-03-14 16:35:18 -07:00
committed by GitHub
commit 7ebace2e64
20 changed files with 397 additions and 0 deletions

108
.gitignore vendored Normal file
View File

@ -0,0 +1,108 @@
# Created by .ignore support plugin (hsz.mobi)
### Python template
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
# Translations
*.mo
*.pot
# Django stuff:
*.log
.static_storage/
.media/
local_settings.py
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.idea/

21
LICENSE.md Normal file
View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2018 Josh Miklos
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

1
README.md Normal file
View File

@ -0,0 +1 @@
A threaded PubSub OpenCV interfaceREADME.md. Webcam and video feeds to multiple windows is supported.

1
README.txt Normal file
View File

@ -0,0 +1 @@
A threaded PubSub OpenCV interfaceREADME.md. Webcam and video feeds to multiple windows is supported.

1
__init__.py Normal file
View File

@ -0,0 +1 @@
from .cv_pubsubs import *

1
cv_pubsubs/__init__.py Normal file
View File

@ -0,0 +1 @@
from . import window_sub, webcam_pub

View File

@ -0,0 +1,5 @@
from .listen_default import listen_default
from .get_cam_ids import get_cam_ids
from .pub_cam import pub_cam_thread
from .frame_handler import frame_handler_thread
from .camctrl import CamCtrl

View File

@ -0,0 +1,11 @@
import pubsub
if False:
from typing import Union
class CamCtrl:
@staticmethod
def stop_cam(cam_id # type: Union[int, str]
):
pubsub.publish("cvcamhandlers." + str(cam_id) + ".cmd", 'q')

View File

@ -0,0 +1,39 @@
import pubsub
import numpy as np
import threading
from .listen_default import listen_default
from .pub_cam import pub_cam_thread
if False:
from typing import Union, Tuple, Any, Callable
def frame_handler_loop(cam_id, # type: Union[int, str]
frame_handler, # type: Callable[[np.ndarray, int], Any]
request_size=(1280, 720), # type: Tuple[int, int]
high_speed=False, # type: bool
fps_limit=240 # type: float
):
t = pub_cam_thread(cam_id, request_size, high_speed, fps_limit)
sub_cam = pubsub.subscribe("cvcams." + str(cam_id) + ".vid")
sub_owner = pubsub.subscribe("cvcamhandlers." + str(cam_id) + ".cmd")
msg_owner = ''
while msg_owner != 'q':
frame = listen_default(sub_cam, timeout=.1) # type: np.ndarray
if frame is not None:
frame = frame[0]
frame_handler(frame, cam_id)
msg_owner = listen_default(sub_owner, block=False, empty='')
pubsub.publish("cvcams." + str(cam_id) + ".cmd", 'q')
t.join()
def frame_handler_thread(cam_id, # type: Union[int, str]
frame_handler, # type: Callable[[int, np.ndarray], Any]
request_size=(1280, 720), # type: Tuple[int, int]
high_speed=False, # type: bool
fps_limit=240 # type: float
): # type: (...) -> threading.Thread
t = threading.Thread(target=frame_handler_loop, args=(cam_id, frame_handler, request_size, high_speed, fps_limit))
t.start()
return t

View File

@ -0,0 +1,16 @@
import cv2
if False:
from typing import List
def get_cam_ids(): # type: () -> List[int]
cam_list = []
while True:
cam = cv2.VideoCapture(len(cam_list))
if not cam.isOpened():
break
cam_list.append(len(cam_list))
return cam_list

View File

@ -0,0 +1,18 @@
if False:
from typing import Any, Optional, queue
def listen_default(sub, # type: queue
block=True, # type: bool
timeout=None, # type: Optional[float]
empty=None # type: Any
): # type: (...)->Any
try:
msg = (sub.listen(block=block, timeout=timeout))
try:
msg = next(msg)['data']
except StopIteration:
msg = empty
except queue.Empty:
msg = empty
return msg

View File

@ -0,0 +1,66 @@
import pubsub
import cv2
import numpy as np
import time
import threading
from .listen_default import listen_default
if False:
from typing import Union, Tuple
def pub_cam_loop(cam_id, # type: Union[int, str]
request_size=(1280, 720), # type: Tuple[int, int]
high_speed=False, # type: bool
fps_limit=240 # type: float
): # type: (...)->bool
"""Publishes whichever camera you select to cvcams.<cam_id>.vid
You can send a quit command 'q' to cvcams.<cam_id>.cmd
Status information, such as failure to open, will be posted to cvcams.<cam_id>.status
:param high_speed: Selects mjpeg transferring, which most cameras seem to support, so speed isn't limited
:param fps_limit: Limits the frames per second.
:param cam_id: An integer representing which webcam to use, or a string representing a video file.
:param request_size: A tuple with width, then height, to request the video size.
:return: True if loop ended normally, False if it failed somehow.
"""
sub = pubsub.subscribe("cvcams." + str(cam_id) + ".cmd")
msg = ''
cam = cv2.VideoCapture(cam_id)
# cam.set(cv2.CAP_PROP_CONVERT_RGB, 0)
if high_speed:
cam.set(cv2.CAP_PROP_FOURCC, cv2.CAP_OPENCV_MJPEG)
cam.set(cv2.CAP_PROP_FRAME_WIDTH, request_size[0])
cam.set(cv2.CAP_PROP_FRAME_HEIGHT, request_size[1])
if not cam.isOpened():
pubsub.publish("cvcams." + str(cam_id) + ".status", "failed")
return False
now = time.time()
while msg != 'q':
time.sleep(1. / (fps_limit - (time.time() - now)))
now = time.time()
(ret, frame) = cam.read() # type: Tuple[bool, np.ndarray ]
if ret is False or not isinstance(frame, np.ndarray):
cam.release()
pubsub.publish("cvcams." + str(cam_id) + ".status", "failed")
return False
pubsub.publish("cvcams." + str(cam_id) + ".vid", (frame,))
msg = listen_default(sub, block=False, empty='')
cam.release()
return True
def pub_cam_thread(cam_id, # type: Union[int, str]
request_ize=(1280, 720), # type: Tuple[int, int]
high_speed=False, # type: bool
fps_limit=240 # type: float
):
# type: (...) -> threading.Thread
t = threading.Thread(target=pub_cam_loop, args=(cam_id, request_ize, high_speed, fps_limit))
t.start()
return t

View File

@ -0,0 +1 @@
from .cv_window_sub import sub_win_loop, frame_dict

View File

@ -0,0 +1,42 @@
import cv2
from ..webcam_pub.camctrl import CamCtrl
if False:
from typing import List
frame_dict = {}
def destruct_windows(window_names, input_cams):
for name in window_names:
cv2.destroyWindow(name)
for c in input_cams:
CamCtrl.stop_cam(c)
def set_frames_from_callbacks(input_vid_global_names, callbacks, frame):
global frame_dict
if callbacks[frame % len(callbacks)] is not None:
frames = callbacks[frame % len(callbacks)](frame_dict[input_vid_global_names[frame]])
else:
frames = frame_dict[input_vid_global_names[frame]]
return frames
# todo: figure out how to get the red x button to work. Try: https://stackoverflow.com/a/37881722/782170
def sub_win_loop(
names, # type: List[str]
input_vid_global_names, # type: List[str]
callbacks=(None,),
input_cams=(0,)
):
global frame_dict
while True:
for i in range(len(input_vid_global_names)):
if input_vid_global_names[i] in frame_dict and frame_dict[input_vid_global_names[i]] is not None:
frames = set_frames_from_callbacks(input_vid_global_names, callbacks, i)
for f in range(len(frames)):
cv2.imshow(names[f % len(names)], frames[f])
if cv2.waitKey(1) & 0xFF == ord('q'):
destruct_windows(names, input_cams)
return

3
requirements.txt Normal file
View File

@ -0,0 +1,3 @@
opencv-python
pubsub
numpy

2
setup.cfg Normal file
View File

@ -0,0 +1,2 @@
[metadata]
description-file = README.md

19
setup.py Normal file
View File

@ -0,0 +1,19 @@
from distutils.core import setup
setup(
name= 'cv_pubsubs',
packages = ['cv_pubsubs', 'cv_pubsubs.webcam_pub', 'cv_pubsubs.window_sub'],
version='0.1',
description='Pubsub interface for Python OpenCV',
author='Josh Miklos',
author_email='simulatorleek@gmail.com',
url='https://github.com/SimLeek/cv_pubsubs',
download_url='https://github.com/SimLeek/cv_pubsubs/archive/0.1.tar.gz',
keywords=['OpenCV', 'PubSub'],
license='MIT',
classifiers=[
'Development Status :: 3 - Alpha',
'License :: OSI Approved :: MIT License',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.6',
]
)

View File

View File

@ -0,0 +1,20 @@
from cv_pubsubs import webcam_pub as w
import unittest as ut
class TestFrameHandler(ut.TestCase):
i = 0
def test_handler(self):
def test_frame_handler(frame, cam_id):
if self.i == 200:
w.CamCtrl.stop_cam(cam_id)
if self.i % 100 == 0:
print(frame.shape)
self.i += 1
w.frame_handler_thread(0, test_frame_handler,
request_size=(1280, 720),
high_speed=True,
fps_limit=240)

View File

@ -0,0 +1,22 @@
import unittest as ut
from cv_pubsubs import window_sub as win
from cv_pubsubs import webcam_pub as cam
class TestSubWin(ut.TestCase):
def test_sub(self):
def cam_handler(frame, cam_id):
win.frame_dict[str(cam_id) + "Frame"] = (frame, frame)
t = cam.frame_handler_thread(0, cam_handler,
request_size=(1280, 720),
high_speed=True,
fps_limit=240
)
win.sub_win_loop(names=['cammy', 'cammy2'],
input_vid_global_names=[str(0) + "Frame"])
cam.CamCtrl.stop_cam(0)
t.join()