Added bus_info for v4l2. Added optional forcing OpenCV or V4L2. Added non-linear transform. Added transparent overlay.

This commit is contained in:
Josh Miklos
2020-06-22 16:09:34 -07:00
parent 823bb3e173
commit b8453da881
24 changed files with 334 additions and 719 deletions

View File

@ -14,10 +14,10 @@ jobs:
with:
python-version: 3.7
- uses: dschep/install-poetry-action@v1.2
- name: Install dependencies
run: poetry install
- name: Install and build
run: pip install .
- name: Build with Poetry
run: poetry build
run: python setup.py sdist bdist_wheel
- name: Publish distribution 📦 to Test PyPI
uses: pypa/gh-action-pypi-publish@master
with:

View File

@ -18,8 +18,7 @@ jobs:
uses: actions/setup-python@v1
with:
python-version: ${{ matrix.python-version }}
- uses: dschep/install-poetry-action@v1.2
- name: Install dependencies
run: poetry install
run: pip install .
- name: Test with tox
run: poetry run tox -p auto -o
run: tox -p auto -o

View File

@ -1,3 +1,3 @@
"""Effects to run on numpy arrays to make data clearer."""
from . import crop, lens, select_channels
from . import crop, lens, select_channels, overlay, transform

View File

@ -0,0 +1,54 @@
"""Overlay functions."""
def overlay_transparent(background, overlay, x=None, y=None):
"""
Overlay a transparent image on top of a background image.
:param background: background rgb image to overlay on top of
:param overlay: rgba image to overlay on top of the background
:param x: leftmost part to overlay at on the background
:param y: topmost part to overlay at on the background
"""
# https://stackoverflow.com/a/54058766/782170
assert overlay.shape[2] == 4, "Overlay must be BGRA"
background_width = background.shape[1]
background_height = background.shape[0]
if (x is not None and x >= background_width) or (
y is not None and y >= background_height
):
return background
h, w = overlay.shape[0], overlay.shape[1]
if x is None:
x = int(background_width / 2 - w / 2)
if y is None:
y = int(background_height / 2 - h / 2)
if x < 0:
w += x
overlay = overlay[:, -x:]
if y < 0:
w += y
overlay = overlay[:, -y:]
if x + w > background_width:
w = background_width - x
overlay = overlay[:, :w]
if y + h > background_height:
h = background_height - y
overlay = overlay[:h]
overlay_image = overlay[..., :3]
mask = overlay[..., 3:] / 255.0
background[y : y + h, x : x + w] = (1.0 - mask) * background[
y : y + h, x : x + w
] + mask * overlay_image
return background

View File

@ -0,0 +1,45 @@
"""Transform functions."""
import numpy as np
import cv2
def transform_about_center(
arr, scale_multiplier=(1, 1), rotation_degrees=0, translation=(0, 0), skew=(0, 0)
):
"""
Transform an image about its center.
:param arr: numpy image to be transformed
:param scale_multiplier: grow/shrink in x and y
:param rotation_degrees: degrees to rotate, from 0 to 360
:param translation: pixels to translate the image
:param skew: mimics rotation along screen axes. In degrees. 90 degrees should give a line.
:return: transformed numpy image
"""
center_scale_xform = np.eye(3)
center_scale_xform[0, 0] = scale_multiplier[1]
center_scale_xform[1, 1] = scale_multiplier[0]
center_scale_xform[0:2, -1] = [arr.shape[1] // 2, arr.shape[0] // 2]
rotation_xform = np.eye(3)
theta = np.radians(rotation_degrees)
c, s = np.cos(theta), np.sin(theta)
R = np.array(((c, -s), (s, c)))
rotation_xform[0:2, 0:2] = R
skew = np.radians(skew)
skew = np.tan(skew)
rotation_xform[-1, 0:2] = [skew[1] / arr.shape[1], skew[0] / arr.shape[0]]
translation_skew_xform = np.eye(3)
translation_skew_xform[0:2, -1] = [
(-arr.shape[1] - translation[1]) // 2,
(-arr.shape[0] - translation[0]) // 2,
]
full_xform = center_scale_xform @ rotation_xform @ translation_skew_xform
xformd_arr = cv2.warpPerspective(
arr, full_xform, tuple(reversed(arr.shape[:2])), flags=0
)
return xformd_arr

View File

@ -11,8 +11,10 @@ import cv2
using_pyv4l2cam = False
try:
if sys.platform == "linux":
from PyV4L2Cam.camera import Camera as pyv4lcamera
from PyV4L2Cam.controls import ControlIDs as pyv4lcontrolids
from PyV4L2Cam.camera import Camera as pyv4lcamera # type: ignore
from PyV4L2Cam.controls import ControlIDs as pyv4lcontrolids # type: ignore
from PyV4L2Cam import convert_mjpeg, convert_rgb24 # type: ignore
from PyV4L2Cam.get_camera import get_camera_by_bus_info, get_camera_by_string # type: ignore
using_pyv4l2cam = True
except ImportError:
@ -32,25 +34,6 @@ from typing import Union, Tuple, Optional, Dict, Any, List, Callable
FrameCallable = Callable[[np.ndarray], Optional[np.ndarray]]
def _v4l2_convert_mjpeg(mjpeg: bytes) -> Optional[np.ndarray]:
# Thanks: https://stackoverflow.com/a/21844162
a = mjpeg.find(b"\xff\xd8")
b = mjpeg.find(b"\xff\xd9")
if a == -1 or b == -1:
return None
else:
jpg = mjpeg[a : b + 2]
frame = cv2.imdecode(np.frombuffer(jpg, dtype=np.uint8), cv2.IMREAD_COLOR)
return frame
def _v4l2_convert_rgb24(rgb24: bytes, width: int, height: int) -> Optional[np.ndarray]:
nparr = np.frombuffer(rgb24, np.uint8)
np_frame = nparr.reshape((height, width, 3))
return np_frame
def pub_cam_loop_pyv4l2(
cam_id: Union[int, str, np.ndarray],
request_size: Tuple[int, int] = (-1, -1),
@ -77,13 +60,16 @@ def pub_cam_loop_pyv4l2(
f"/dev/video{cam_id}", *request_size
)
else:
cam = pyv4lcamera(cam_id, *request_size) # type: ignore
if "usb" in cam_id:
cam = get_camera_by_bus_info(cam_id, *request_size)
else:
cam = get_camera_by_string(cam_id, *request_size) # type: ignore
else:
raise TypeError(
"Only strings or ints representing cameras are supported with v4l2."
)
subscriber_dictionary.register_cam(name)
subscriber_dictionary.register_cam(name, cam)
sub = subscriber_dictionary.cam_cmd_sub(name)
sub.return_on_no_data = ""
@ -99,9 +85,9 @@ def pub_cam_loop_pyv4l2(
frame_bytes = cam.get_frame() # type: bytes
if cam.pixel_format == "MJPEG":
nd_frame = _v4l2_convert_mjpeg(frame_bytes)
nd_frame = convert_mjpeg(frame_bytes)
elif cam.pixel_format == "RGB24":
nd_frame = _v4l2_convert_rgb24(frame_bytes, cam.width, cam.height)
nd_frame = convert_rgb24(frame_bytes, cam.width, cam.height)
else:
raise NotImplementedError(f"{cam.pixel_format} format not supported.")
@ -149,7 +135,7 @@ def pub_cam_loop_opencv(
"Only strings or ints representing cameras, or numpy arrays representing pictures supported."
)
subscriber_dictionary.register_cam(name)
subscriber_dictionary.register_cam(name, cam)
frame_counter = 0
@ -158,7 +144,10 @@ def pub_cam_loop_opencv(
msg = ""
if high_speed:
cam.set(cv2.CAP_PROP_FOURCC, cv2.CAP_OPENCV_MJPEG)
try:
cam.set(cv2.CAP_PROP_FOURCC, cv2.CAP_OPENCV_MJPEG)
except AttributeError:
warnings.warn("Please update OpenCV")
cam.set(cv2.CAP_PROP_FRAME_WIDTH, request_size[0])
cam.set(cv2.CAP_PROP_FRAME_HEIGHT, request_size[1])
@ -196,6 +185,7 @@ def pub_cam_thread(
request_ize: Tuple[int, int] = (-1, -1),
high_speed: bool = True,
fps_limit: float = float("inf"),
force_backend="",
) -> threading.Thread:
"""Run pub_cam_loop in a new thread. Starts on creation."""
@ -203,15 +193,20 @@ def pub_cam_thread(
if name in uid_dict.keys():
t = uid_dict[name]
else:
if (
if "cv" in force_backend.lower():
pub_cam_loop = pub_cam_loop_opencv
elif (
sys.platform == "linux"
and using_pyv4l2cam
and (
isinstance(cam_id, int)
or (isinstance(cam_id, str) and "/dev/video" in cam_id)
or (
isinstance(cam_id, str)
and any(["/dev/video" in cam_id, "usb" in cam_id])
)
)
):
pub_cam_loop = pub_cam_loop_pyv4l2
) or "v4l2" in force_backend.lower():
pub_cam_loop = pub_cam_loop_pyv4l2 # type: ignore
else:
pub_cam_loop = pub_cam_loop_opencv

View File

@ -27,6 +27,7 @@ class FrameUpdater(threading.Thread):
request_size: Tuple[int, int] = (-1, -1),
high_speed: bool = True,
fps_limit: float = float("inf"),
force_backend="",
):
"""Create the frame updater thread."""
super(FrameUpdater, self).__init__(target=self.loop, args=())
@ -42,6 +43,7 @@ class FrameUpdater(threading.Thread):
self.high_speed = high_speed
self.fps_limit = fps_limit
self.exception_raised = None
self.force_backend = force_backend
def __wait_for_cam_id(self):
while str(self.cam_id) not in subscriber_dictionary.CV_CAMS_DICT:
@ -84,7 +86,11 @@ class FrameUpdater(threading.Thread):
def loop(self):
"""Continually get frames from the video publisher, run callbacks on them, and listen to commands."""
t = pub_cam_thread(
self.video_source, self.request_size, self.high_speed, self.fps_limit
self.video_source,
self.request_size,
self.high_speed,
self.fps_limit,
self.force_backend,
)
self.__wait_for_cam_id()

View File

@ -20,22 +20,23 @@ class CamHandler(object):
class Cam(object):
"""A camera publisher instance that will send frames, status, and commands out."""
def __init__(self, name):
def __init__(self, name, cam_instance=None):
"""Create the cam."""
self.name = name
self.cmd = None
self.frame_pub = VariablePub()
self.cmd_pub = VariablePub()
self.status_pub = VariablePub()
self.cam_instance = cam_instance
CV_CAM_HANDLERS_DICT: Dict[str, CamHandler] = {}
CV_CAMS_DICT: Dict[str, Cam] = {}
def register_cam(cam_id):
def register_cam(cam_id, cam_instance=None):
"""Register camera "cam_id" to a global list so it can be picked up."""
cam = Cam(str(cam_id))
cam = Cam(str(cam_id), cam_instance)
CV_CAMS_DICT[str(cam_id)] = cam
CV_CAM_HANDLERS_DICT[str(cam_id)] = CamHandler(
str(cam_id), cam.frame_pub.make_sub()

View File

@ -13,11 +13,22 @@ from displayarray._uid import uid_for_source
from displayarray.frame import subscriber_dictionary
from displayarray.frame.frame_updater import FrameCallable
from displayarray.frame.frame_updater import FrameUpdater
from displayarray.frame.subscriber_dictionary import CV_CAMS_DICT
from displayarray.input import MouseEvent
from displayarray.window import window_commands
from displayarray._util import WeakMethod
from displayarray.effects.select_channels import SelectChannels
try:
import sys
if sys.platform == "linux":
from PyV4L2Cam.get_camera import get_bus_info_from_camera # type: ignore
else:
get_bus_info_from_camera = None
except:
get_bus_info_from_camera = None
class SubscriberWindows(object):
"""Windows that subscribe to updates to cameras, videos, and arrays."""
@ -266,12 +277,26 @@ class SubscriberWindows(object):
window_commands.quit(force_all_read=False)
self.__stop_all_cams()
@property
def cams(self):
"""Get the camera instances. Can be used for OpenCV or V4L2 functions, depending on backend."""
return [CV_CAMS_DICT[v].cam_instance for v in self.input_vid_global_names]
@property
def busses(self):
"""Get the busses the cameras are plugged into. Can be used as UIDs. Requires V4L2 backend."""
if get_bus_info_from_camera is not None:
return [get_bus_info_from_camera(c) for c in self.cams]
else:
raise RuntimeError("Getting bus info not supported on this system")
def _get_video_callback_dict_threads(
*vids,
callbacks: Optional[Dict[Any, Union[FrameCallable, List[FrameCallable]]]] = None,
fps=float("inf"),
size=(-1, -1),
force_backend="",
):
assert callbacks is not None
vid_threads = []
@ -289,7 +314,13 @@ def _get_video_callback_dict_threads(
elif callable(callbacks[v]):
v_callbacks.append(callbacks[v]) # type: ignore
vid_threads.append(
FrameUpdater(v, callbacks=v_callbacks, fps_limit=fps, request_size=size)
FrameUpdater(
v,
callbacks=v_callbacks,
fps_limit=fps,
request_size=size,
force_backend=force_backend,
)
)
return vid_threads
@ -305,6 +336,7 @@ def _get_video_threads(
] = None,
fps=float("inf"),
size=(-1, -1),
force_backend="",
):
vid_threads: List[Thread] = []
if isinstance(callbacks, Dict):
@ -314,17 +346,33 @@ def _get_video_threads(
elif isinstance(callbacks, List):
for v in vids:
vid_threads.append(
FrameUpdater(v, callbacks=callbacks, fps_limit=fps, request_size=size)
FrameUpdater(
v,
callbacks=callbacks,
fps_limit=fps,
request_size=size,
force_backend=force_backend,
)
)
elif callable(callbacks):
for v in vids:
vid_threads.append(
FrameUpdater(v, callbacks=[callbacks], fps_limit=fps, request_size=size)
FrameUpdater(
v,
callbacks=[callbacks],
fps_limit=fps,
request_size=size,
force_backend=force_backend,
)
)
else:
for v in vids:
if v is not None:
vid_threads.append(FrameUpdater(v, fps_limit=fps, request_size=size))
vid_threads.append(
FrameUpdater(
v, fps_limit=fps, request_size=size, force_backend=force_backend
)
)
return vid_threads
@ -342,6 +390,7 @@ def display(
fps_limit=float("inf"),
size=(-1, -1),
silent=False,
force_backend="",
):
"""
Display all the arrays, cameras, and videos passed in.
@ -351,7 +400,11 @@ def display(
Window names end up becoming the title of the windows
"""
vid_threads = _get_video_threads(
*vids, callbacks=callbacks, fps=fps_limit, size=size
*vids,
callbacks=callbacks,
fps=fps_limit,
size=size,
force_backend=force_backend,
)
for v in vid_threads:
v.start()

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -9,7 +9,7 @@ def black_and_white(arr):
import time
t0 = t1 = time.time()
for up in display(0, size=(1, 1), callbacks=black_and_white):
for up in display("usb-0000:00:14.0-7"):
if up:
t1 = time.time()
print(1.0 / (t1 - t0))

View File

@ -9,25 +9,25 @@ test_video_3 = str(Path.joinpath(Path(__file__).parent, "fractal test 3.mp4"))
urls = {
"test_video": "https://www.youtube.com/watch?v=LpWhaBVIrZw",
"test_video_2": "https://www.youtube.com/watch?v=GASynpGr-c8",
"test_video_3": "https://www.youtube.com/watch?v=u_P83LcI8Oc"
"test_video_3": "https://www.youtube.com/watch?v=u_P83LcI8Oc",
}
def populate_videos(fps=60, res="720p", ext="mp4"):
from pytube import YouTube # Note: pip install pytube3, not pytube
from pathlib import Path
for n, v in globals().items():
if 'test_video' in n:
if "test_video" in n:
print(f"Checking if '{n}' is downloaded.")
if Path(v).exists():
print("Video already downloaded.")
else:
the_path = Path(v)
print("Downloading...")
YouTube(urls[n]) \
.streams \
.filter(fps=fps, res=res, file_extension=ext)[0] \
.download(output_path=the_path.parent, filename=the_path.stem)
YouTube(urls[n]).streams.filter(fps=fps, res=res, file_extension=ext)[
0
].download(output_path=the_path.parent, filename=the_path.stem)
if __name__ == "__main__":

599
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,36 +0,0 @@
[tool.poetry]
name = 'displayarray'
version = '1.1.1'
description = 'Tool for displaying numpy arrays.'
authors = ['SimLeek <simulator.leek@gmail.com>']
license = 'MIT'
readme = "README.rst"
repository = "https://github.com/simleek/displayarray"
[tool.poetry.dependencies]
python = "^3.6"
opencv_python = "^4*"
docopt = "0.6.2"
numpy = "1.16.1"
localpubsub = "0.0.4"
pyzmq = "18.1.0"
[tool.poetry.dev-dependencies]
pytest = "5.2.1"
typing = "3.7.4.1"
mock = "^3.0"
tox = "^3.14"
tox-gh-actions = "^0.3.0"
coverage = "^4.5"
sphinx = "^2.2"
black = {version = "^18.3-alpha.0", allows-prereleases = true}
mypy = "^0.740.0"
pydocstyle = "^4.0"
[tool.poetry.scripts]
displayarray = "displayarray.__main__:main"
[build-system]
requires = ['poetry']
build-backend = "poetry.masonry.api"

64
setup.py Normal file
View File

@ -0,0 +1,64 @@
# -*- coding: utf-8 -*-
# DO NOT EDIT THIS FILE!
# This file has been autogenerated by dephell <3
# https://github.com/dephell/dephell
try:
from setuptools import setup
except ImportError:
from distutils.core import setup
import os.path
readme = ""
here = os.path.abspath(os.path.dirname(__file__))
readme_path = os.path.join(here, "README.rst")
if os.path.exists(readme_path):
with open(readme_path, "rb") as stream:
readme = stream.read().decode("utf8")
setup(
long_description=readme,
name="displayarray",
version="1.2.0",
description="Tool for displaying numpy arrays.",
python_requires="==3.*,>=3.6.0",
project_urls={"repository": "https://github.com/simleek/displayarray"},
author="SimLeek",
author_email="simulator.leek@gmail.com",
license="MIT",
entry_points={"console_scripts": ["displayarray = displayarray.__main__:main"]},
packages=[
"displayarray",
"displayarray.effects",
"displayarray.frame",
"displayarray.window",
],
package_dir={"": "."},
package_data={},
install_requires=[
"docopt==0.6.2",
"localpubsub==0.0.4",
"numpy==1.16.1",
"opencv-python==4.*,>=4.0.0",
"pyzmq==18.1.0",
],
extras_require={
"linux": ["PyV4L2Cam @ git+https://github.com/SimLeek/PyV4L2Cam"],
"dev": [
"black==18.*,>=18.3.0.a0",
"coverage==4.*,>=4.5.0",
"mock==3.*,>=3.0.0",
"mypy==0.*,>=0.740.0",
"pydocstyle==4.*,>=4.0.0",
"pytest==5.2.1",
"sphinx==2.*,>=2.2.0",
"tox==3.*,>=3.14.0",
"tox-gh-actions==0.*,>=0.3.0",
"typing==3.7.4.1",
],
},
)

View File

@ -42,7 +42,7 @@ def test_pub_cam_int():
cam_pub.assert_has_calls([mock.call(img)] * 4)
reg_cam.assert_called_once_with("0")
reg_cam.assert_called_once_with("0", cap)
cam_cmd_sub.assert_called_once_with("0")
cap.set.assert_has_calls(

View File

@ -57,7 +57,7 @@ def test_loop():
ud.loop()
mock_pubcam_thread.assert_called_once_with(0, (-1, -1), True, float("inf"))
mock_pubcam_thread.assert_called_once_with(0, (-1, -1), True, float("inf"), "")
mock_frame_sub.assert_called_once_with("0")
handler_cmd_sub.assert_called_once_with("0")
sub_cam.get.assert_has_calls([mock.call(blocking=True, timeout=1.0)] * 3)

View File

@ -495,8 +495,12 @@ def test_display():
fup.assert_has_calls(
[
mock.call(0, fps_limit=float("inf"), request_size=(50, 50)),
mock.call(1, fps_limit=float("inf"), request_size=(50, 50)),
mock.call(
0, force_backend="", fps_limit=float("inf"), request_size=(50, 50)
),
mock.call(
1, force_backend="", fps_limit=float("inf"), request_size=(50, 50)
),
]
)
assert fup_inst.start.call_count == 2
@ -541,10 +545,18 @@ def test_display_callbacks():
fup.assert_has_calls(
[
mock.call(
0, callbacks=[cb], fps_limit=float("inf"), request_size=(-1, -1)
0,
callbacks=[cb],
force_backend="",
fps_limit=float("inf"),
request_size=(-1, -1),
),
mock.call(
1, callbacks=[cb], fps_limit=float("inf"), request_size=(-1, -1)
1,
callbacks=[cb],
force_backend="",
fps_limit=float("inf"),
request_size=(-1, -1),
),
]
)
@ -557,8 +569,20 @@ def test_display_callbacks():
fup.assert_has_calls(
[
mock.call(0, callbacks=[cb, cb2], fps_limit=60, request_size=(-1, -1)),
mock.call(1, callbacks=[cb, cb2], fps_limit=60, request_size=(-1, -1)),
mock.call(
0,
callbacks=[cb, cb2],
force_backend="",
fps_limit=60,
request_size=(-1, -1),
),
mock.call(
1,
callbacks=[cb, cb2],
force_backend="",
fps_limit=60,
request_size=(-1, -1),
),
]
)
@ -579,16 +603,25 @@ def test_display_callbacks_dict():
fup.assert_has_calls(
[
mock.call(
0, callbacks=[cb1], fps_limit=float("inf"), request_size=(-1, -1)
),
mock.call(
1,
callbacks=[cb1, cb2],
0,
callbacks=[cb1],
force_backend="",
fps_limit=float("inf"),
request_size=(-1, -1),
),
mock.call(
2, callbacks=[cb3], fps_limit=float("inf"), request_size=(-1, -1)
1,
callbacks=[cb1, cb2],
force_backend="",
fps_limit=float("inf"),
request_size=(-1, -1),
),
mock.call(
2,
callbacks=[cb3],
force_backend="",
fps_limit=float("inf"),
request_size=(-1, -1),
),
]
)

32
tox.ini
View File

@ -10,39 +10,39 @@ python =
3.7: py37, mypy, test-docs, black, pydocstyle
[testenv]
whitelist_externals = poetry
whitelist_externals = coverage
description = run the tests with pytest under {basepython}
commands = poetry run coverage run --source=displayarray -m pytest tests
poetry run coverage report
poetry run coverage erase
commands = coverage run --source=displayarray -m pytest tests
coverage report
coverage erase
[testenv:docgen]
whitelist_externals = poetry
whitelist_externals = sphinx-build
description = generating documentation
commands = poetry run sphinx-build -b dirhtml docs/docsrc docs
commands = sphinx-build -b dirhtml docs/docsrc docs
[testenv:test-docs]
whitelist_externals = poetry
whitelist_externals = sphinx-build
cmd
description = generating documentation
commands = poetry run sphinx-build -b dirhtml docs/docsrc docs_test -n -T
#rm -rf docs_test
cmd /c RMDIR /Q/S docs_test
commands = sphinx-build -b dirhtml docs/docsrc docs_test -n -T
rm -rf docs_test
#cmd /c RMDIR /Q/S docs_test
[testenv:mypy]
whitelist_externals = poetry
whitelist_externals = mypy
description = enforce typing
commands = poetry run mypy displayarray
commands = mypy displayarray
[testenv:black]
whitelist_externals = poetry
whitelist_externals = black
description = enforce code style
commands = poetry run black displayarray --check
commands = black displayarray --check
[pydocstyle]
ignore = D105, D212, D203, D202
[testenv:pydocstyle]
whitelist_externals = poetry
whitelist_externals = pydocstyle
description = enforce documentation style
commands = poetry run pydocstyle displayarray
commands = pydocstyle displayarray