Misc lint and style fixes

This commit is contained in:
woju 2024-11-28 14:48:27 +01:00
parent 0009a8962a
commit f897637d0f
Signed by: woju
GPG key ID: 81E859ECD7FB4F51
19 changed files with 624 additions and 514 deletions

View file

@ -1,9 +1,16 @@
[MESSAGES CONTROL]
# wrong-import-order:
# causes problems with tomllib, which is not considered stdlib?
disable=
c-extension-no-member,
trailing-comma-tuple,
consider-using-enumerate,
fixme,
too-few-public-methods,
too-many-instance-attributes,
too-many-public-methods,
trailing-comma-tuple,
wrong-import-order,
missing-docstring,
invalid-name,
@ -32,6 +39,9 @@ extension-pkg-allow-list=
# run arbitrary code. (This is an alternative name to extension-pkg-allow-list
# for backward compatibility.)
extension-pkg-whitelist=
pygame,
cv2,
cv2.aruco,
# Return non-zero exit code if any of these messages/categories are detected,
# even if score is above --fail-under value. Syntax same as enable. Messages
@ -152,6 +162,9 @@ contextmanager-decorators=contextlib.contextmanager
# system, and so shouldn't trigger E1101 when accessed. Python regular
# expressions are accepted.
generated-members=
cv2\.aruco\.,
picamera2.metadata.Metadata,
picamera2.job.Job,
# Tells whether missing members accessed in mixin class should be ignored. A
# class is considered mixin if its name matches the mixin-class-rgx option.
@ -305,7 +318,7 @@ indent-after-paren=4
indent-string=' '
# Maximum number of characters on a single line.
max-line-length=100
max-line-length=80
# Maximum number of lines in a module.
max-module-lines=1000
@ -465,7 +478,7 @@ exclude-too-few-public-methods=
ignored-parents=
# Maximum number of arguments for function / method.
max-args=5
max-args=10
# Maximum number of attributes for a class (see R0902).
max-attributes=7
@ -542,10 +555,12 @@ preferred-modules=
check-protected-access-in-special-methods=no
# List of method names used to declare (i.e. assign) instance attributes.
defining-attr-methods=__init__,
__new__,
setUp,
__post_init__
defining-attr-methods=
__init__,
__new__,
setUp,
__post_init__,
__set_name__,
# List of member names, which should be excluded from the protected access
# warning.
@ -566,8 +581,9 @@ valid-metaclass-classmethod-first-arg=cls
# Exceptions that will emit a warning when being caught. Defaults to
# "BaseException, Exception".
overgeneral-exceptions=BaseException,
Exception
overgeneral-exceptions=
builtins.BaseException,
builtins.Exception,
# vim: tw=80 ts=4 sts=4 sw=4 et

View file

@ -42,4 +42,8 @@ testpaths = [
'tests',
]
[tool.black]
line-length = 80
skip-string-normalization = true
# vim: tw=80 ts=4 sts=4 sw=4 et

View file

@ -12,18 +12,23 @@ from tuxgo import (
detector,
)
@pytest.fixture
def load_frame():
def load_frame(filename):
return cv2.imread(os.fspath(pathlib.Path(__file__).parent / filename))
return load_frame
@pytest.fixture
def detect_markers_in_frame(load_frame):
def detect_markers_in_frame(frame):
return detector.Detector(blur=5).detect_markers(load_frame(frame))
return detect_markers_in_frame
def test_trivial(detect_markers_in_frame):
analyser = detector.Analyser(detect_markers_in_frame('test_trivial.jpg'))
assert list(analyser.get_programme()) == [
@ -31,6 +36,7 @@ def test_trivial(detect_markers_in_frame):
(blocks.Token.END,),
]
rotate_programme = [
(blocks.Token.BEGIN,),
(blocks.Token.STEP, blocks.Token.DIGIT_6),
@ -42,10 +48,13 @@ rotate_programme = [
(blocks.Token.END,),
]
@pytest.mark.parametrize('rot', [0, 45, 90])
def test_rotate(detect_markers_in_frame, rot):
analyser = detector.Analyser(detect_markers_in_frame(
f'test_rotate_{rot:02d}.jpg'))
analyser = detector.Analyser(
detect_markers_in_frame(f'test_rotate_{rot:02d}.jpg')
)
assert list(analyser.get_programme()) == rotate_programme
# vim: tw=80 ts=4 sts=4 sw=4 et

View file

@ -8,6 +8,7 @@ from tuxgo import (
parser,
)
class Engine(engine.ExecEngine):
def __init__(self, *args, **kwds):
super().__init__(*args, **kwds)
@ -49,18 +50,20 @@ class Engine(engine.ExecEngine):
return obj != ast.Object.EMPTY
raise NotImplementedError()
def test_step():
programme = parser.parse([
(blocks.Token.BEGIN,),
(blocks.Token.STEP, blocks.Token.DIGIT_6),
(blocks.Token.STEP, blocks.Token.DIGIT_9),
(blocks.Token.END,),
])
]) # fmt: skip
engine = Engine()
programme.execute(engine)
assert engine.steps == 15
def test_turns():
programme = parser.parse([
(blocks.Token.BEGIN,),
@ -68,12 +71,13 @@ def test_turns():
(blocks.Token.TURN_LEFT,),
(blocks.Token.TURN_RIGHT,),
(blocks.Token.END,),
])
]) # fmt: skip
engine = Engine()
programme.execute(engine)
assert engine.turns == 1
def test_if():
programme = parser.parse([
(blocks.Token.BEGIN,),
@ -83,12 +87,13 @@ def test_if():
(blocks.Token.TURN_RIGHT,),
(blocks.Token.END_BLOCK,),
(blocks.Token.END,),
])
]) # fmt: skip
engine = Engine()
programme.execute(engine)
assert engine.turns == -1
def test_repeat():
programme = parser.parse([
(blocks.Token.BEGIN,),
@ -96,12 +101,13 @@ def test_repeat():
(blocks.Token.TURN_LEFT,),
(blocks.Token.END_BLOCK,),
(blocks.Token.END,),
])
]) # fmt: skip
engine = Engine()
programme.execute(engine)
assert engine.turns == 42
def test_repeat_forever():
programme = parser.parse([
(blocks.Token.BEGIN,),
@ -112,12 +118,13 @@ def test_repeat_forever():
(blocks.Token.END_BLOCK,),
(blocks.Token.END_BLOCK,),
(blocks.Token.END,),
])
]) # fmt: skip
engine = Engine()
programme.execute(engine)
assert engine.steps == 10
def test_repeat_while():
programme = parser.parse([
(blocks.Token.BEGIN,),
@ -125,12 +132,13 @@ def test_repeat_while():
(blocks.Token.STEP, blocks.Token.DIGIT_1),
(blocks.Token.END_BLOCK,),
(blocks.Token.END,),
])
]) # fmt: skip
engine = Engine()
programme.execute(engine)
assert engine.steps == 10
def test_add():
programme = parser.parse([
(blocks.Token.BEGIN,),
@ -138,12 +146,13 @@ def test_add():
(blocks.Token.TURN_LEFT,),
(blocks.Token.END_BLOCK,),
(blocks.Token.END,),
])
]) # fmt: skip
engine = Engine()
programme.execute(engine)
assert engine.turns == 6
def test_add2():
programme = parser.parse([
(blocks.Token.BEGIN,),
@ -151,10 +160,11 @@ def test_add2():
(blocks.Token.TURN_LEFT,),
(blocks.Token.END_BLOCK,),
(blocks.Token.END,),
])
]) # fmt: skip
engine = Engine()
programme.execute(engine)
assert engine.turns == 11
# vim: tw=80 ts=4 sts=4 sw=4 et

View file

@ -1,13 +1,12 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# SPDX-FileCopyrightText: 2023 Wojtek Porczyk <woju@hackerspace.pl>
import pprint
from tuxgo import (
blocks,
parser,
)
def test_parser():
parser.parse([
(blocks.Token.BEGIN,),
@ -24,7 +23,7 @@ def test_parser():
(blocks.Token.END_BLOCK,),
(blocks.Token.END_BLOCK,),
(blocks.Token.END,),
])
]) # fmt: skip
def test_if():
@ -34,7 +33,8 @@ def test_if():
(blocks.Token.STEP, blocks.Token.DIGIT_1),
(blocks.Token.END_BLOCK,),
(blocks.Token.END,),
])
]) # fmt: skip
def test_if_else():
parser.parse([
@ -45,7 +45,8 @@ def test_if_else():
(blocks.Token.STEP, blocks.Token.DIGIT_2),
(blocks.Token.END_BLOCK,),
(blocks.Token.END,),
])
]) # fmt: skip
def test_if_elif():
parser.parse([
@ -56,7 +57,8 @@ def test_if_elif():
(blocks.Token.STEP, blocks.Token.DIGIT_2),
(blocks.Token.END_BLOCK,),
(blocks.Token.END,),
])
]) # fmt: skip
def test_if_elif_else():
parser.parse([
@ -69,10 +71,11 @@ def test_if_elif_else():
(blocks.Token.STEP, blocks.Token.DIGIT_3),
(blocks.Token.END_BLOCK,),
(blocks.Token.END,),
])
]) # fmt: skip
def test_if_elif_elif():
pprint.pprint(parser.parse([
parser.parse([
(blocks.Token.BEGIN,),
(blocks.Token.IF, blocks.Token.HERE, blocks.Token.EMPTY),
(blocks.Token.STEP, blocks.Token.DIGIT_1),
@ -82,10 +85,11 @@ def test_if_elif_elif():
(blocks.Token.STEP, blocks.Token.DIGIT_3),
(blocks.Token.END_BLOCK,),
(blocks.Token.END,),
]))
]) # fmt: skip
def test_if_elif_elif_else():
pprint.pprint(parser.parse([
parser.parse([
(blocks.Token.BEGIN,),
(blocks.Token.IF, blocks.Token.HERE, blocks.Token.EMPTY),
(blocks.Token.STEP, blocks.Token.DIGIT_1),
@ -97,20 +101,22 @@ def test_if_elif_elif_else():
(blocks.Token.STEP, blocks.Token.DIGIT_4),
(blocks.Token.END_BLOCK,),
(blocks.Token.END,),
]))
]) # fmt: skip
def test_empty_programme():
parser.parse([
(blocks.Token.BEGIN,),
(blocks.Token.END,),
])
]) # fmt: skip
def test_empty_function():
parser.parse([
(blocks.Token.DEFINE_FUNCTION, blocks.Token.A),
(blocks.Token.END_FUNCTION,),
])
]) # fmt: skip
def test_empty_if():
parser.parse([
@ -118,7 +124,8 @@ def test_empty_if():
(blocks.Token.IF, blocks.Token.HERE, blocks.Token.EMPTY),
(blocks.Token.END_BLOCK,),
(blocks.Token.END,),
])
]) # fmt: skip
def test_empty_elif():
parser.parse([
@ -128,7 +135,8 @@ def test_empty_elif():
(blocks.Token.ELSE_IF, blocks.Token.HERE, blocks.Token.EMPTY),
(blocks.Token.END_BLOCK,),
(blocks.Token.END,),
])
]) # fmt: skip
def test_empty_else():
parser.parse([
@ -138,7 +146,8 @@ def test_empty_else():
(blocks.Token.ELSE,),
(blocks.Token.END_BLOCK,),
(blocks.Token.END,),
])
]) # fmt: skip
def test_empty_repeat():
parser.parse([
@ -146,7 +155,8 @@ def test_empty_repeat():
(blocks.Token.REPEAT, blocks.Token.DIGIT_1),
(blocks.Token.END_BLOCK,),
(blocks.Token.END,),
])
]) # fmt: skip
def test_empty_repeat_while():
parser.parse([
@ -154,6 +164,7 @@ def test_empty_repeat_while():
(blocks.Token.REPEAT_WHILE, blocks.Token.HERE, blocks.Token.EMPTY),
(blocks.Token.END_BLOCK,),
(blocks.Token.END,),
])
]) # fmt: skip
# vim: tw=80 ts=4 sts=4 sw=4 et

View file

@ -17,11 +17,16 @@ from . import (
bluetooth,
)
@click.group(cls=click_default_group.DefaultGroup, default='game',
default_if_no_args=True)
@click.group(
cls=click_default_group.DefaultGroup,
default='game',
default_if_no_args=True,
)
def cli():
pass
@cli.command('game')
@click.option('--mac', required=True)
def game(mac):
@ -35,12 +40,14 @@ def game(mac):
@cli.command('debug-image')
@click.option('--blur', type=click.IntRange(min=1), default=1)
@click.argument('inpath', type=click.Path(file_okay=True, dir_okay=False))
@click.argument('outpath', type=click.Path(file_okay=True, dir_okay=False))
def debug_image(inpath, outpath):
def debug_image(blur, inpath, outpath):
detector = _detector.Detector()
frame = cv2.imread(inpath)
#frame = cv2.blur(frame, (5, 5))
if blur > 1:
frame = cv2.blur(frame, (blur, blur))
detections = list(detector.detect_markers(frame))
analyser = _detector.Analyser(detections)
try:
@ -49,34 +56,6 @@ def debug_image(inpath, outpath):
finally:
cv2.imwrite(outpath, frame)
@cli.command('debug-video')
def debug_video():
vc = cv2.VideoCapture(0)
detector = _detector.Detector()
try:
while True:
ret, frame = vc.read()
if not ret:
click.echo('frame dropped', err=True)
continue
flipped = cv2.flip(frame, 1)
for detection in detector.detect_markers(frame):
detection.debug_draw_flipped(flipped)
cv2.imshow('tuxgo-debug', flipped)
key = cv2.waitKey(1)
if key & 0xff == ord('q'):
break
elif key & 0xff == ord('f'):
cv2.imwrite('screenshot.png', flipped)
finally:
vc.release()
cv2.destroyAllWindows()
if __name__ == '__main__':
cli()

View file

@ -10,22 +10,26 @@ from dataclasses import dataclass
from typing import (
Callable,
List,
Optional,
Tuple,
Union,
)
class Direction(enum.Enum):
# fmt: off
HERE = 'TUTAJ'
IN_FRONT = 'Z PRZODU'
ON_THE_LEFT = 'Z LEWEJ'
ON_THE_RIGHT = 'Z PRAWEJ'
BEHIND = 'Z TYŁU'
# fmt: on
def __repr__(self):
return f'{type(self).__name__}.{self.name}'
class Object(enum.Enum):
# fmt: off
OBSTACLE = 'PRZESZKODA'
ELEVATED = 'WZNIESIENIE'
MARK_1 = 'KOPERTA: KWADRAT'
@ -33,23 +37,28 @@ class Object(enum.Enum):
COLLECTABLE = 'PRZEDMIOT DO ZEBRANIA'
ACTION = 'POLE AKCJI'
EMPTY = 'PUSTE POLE'
# fmt: on
def __repr__(self):
return f'{type(self).__name__}.{self.name}'
@dataclass(frozen=True)
class Parsed(metaclass=abc.ABCMeta):
line: int = dataclasses.field(repr=False)
cols: Tuple[int, int] = dataclasses.field(repr=False)
class Expression(Parsed):
@abstractmethod
def eval(self, engine) -> int:
raise NotImplementedError()
class Forever(Parsed):
pass
@dataclass(frozen=True)
class Number(Expression):
value: int
@ -57,6 +66,7 @@ class Number(Expression):
def eval(self, engine):
return engine.eval_Number(self)
@dataclass(frozen=True)
class Variable(Expression):
name: str
@ -64,6 +74,7 @@ class Variable(Expression):
def eval(self, engine):
return engine.eval_Variable(self)
@dataclass(frozen=True)
class BinOp(Expression):
op: Callable[[int, int], int]
@ -73,6 +84,7 @@ class BinOp(Expression):
def eval(self, engine):
return engine.eval_BinOp(self)
@dataclass(frozen=True)
class Condition(Parsed):
direction: Direction
@ -81,10 +93,12 @@ class Condition(Parsed):
def eval(self, engine):
return engine.eval_Condition(self)
@dataclass(frozen=True)
class Statement(Parsed):
#: Range of lines. For simple statement this is (self.line, self.line).
lines: Tuple[int, int] = dataclasses.field(init=False, repr=False)
def __post_init__(self):
# in case a child class overrode this attribute, we don't overwrite
try:
@ -92,36 +106,43 @@ class Statement(Parsed):
except AttributeError:
# we're dataclass(frozen=True), so we need to resort to tricks
object.__setattr__(self, 'lines', (self.line, self.line))
@abstractmethod
def execute(self, engine: 'BaseEngine'):
raise NotImplementedError()
class TurnLeft(Statement):
def execute(self, engine):
return engine.execute_TurnLeft(self)
class TurnRight(Statement):
def execute(self, engine):
return engine.execute_TurnRight(self)
class Jump(Statement):
def execute(self, engine):
return engine.execute_Jump(self)
class PickUp(Statement):
def execute(self, engine):
return engine.execute_PickUp(self)
class Activate(Statement):
def execute(self, engine):
return engine.execute_Activate(self)
@dataclass(frozen=True)
class Draw(Statement):
def execute(self, engine):
return engine.execute_Draw(self)
@dataclass(frozen=True)
class Step(Statement):
counter: Expression
@ -129,6 +150,7 @@ class Step(Statement):
def execute(self, engine):
return engine.execute_Step(self)
@dataclass(frozen=True)
class Place(Statement):
direction: Direction
@ -136,6 +158,7 @@ class Place(Statement):
def execute(self, engine):
return engine.execute_Place(self)
@dataclass(frozen=True)
class Assign(Statement):
name: str
@ -144,6 +167,7 @@ class Assign(Statement):
def execute(self, engine):
return engine.execute_Assign(self)
@dataclass(frozen=True)
class CallFunction(Statement):
name: str
@ -151,18 +175,22 @@ class CallFunction(Statement):
def execute(self, engine):
return engine.execute_CallFunction(self)
@dataclass(frozen=True)
class Break(Statement):
def execute(self, engine):
return engine.execute_Break(self)
@dataclass(frozen=True)
class BreakFunction(Statement):
def execute(self, engine):
return engine.execute_BreakFunction(self)
@dataclass(frozen=True)
class Block(Statement):
# pylint: disable=abstract-method
lines: Tuple[int, int] = dataclasses.field(repr=False)
body: List[Statement]
body_lines: Tuple[int, int] = dataclasses.field(init=False, repr=False)
@ -171,46 +199,63 @@ class Block(Statement):
try:
self.body_lines
except AttributeError:
object.__setattr__(self, 'body_lines', (
min(stmt.lines[0] for stmt in self.body),
max(stmt.lines[1] for stmt in self.body),
) if self.body else (None, None))
object.__setattr__(
self,
'body_lines',
(
min(stmt.lines[0] for stmt in self.body),
max(stmt.lines[1] for stmt in self.body),
)
if self.body
else (None, None),
)
@dataclass(frozen=True)
class Repeat(Block):
counter: Union[Expression, Forever]
def execute(self, engine):
return engine.execute_Repeat(self)
return engine.execute_Repeat(self)
@dataclass(frozen=True)
class ConditionStatement(Block):
# pylint: disable=abstract-method
condition: Condition
@dataclass(frozen=True)
class RepeatWhile(ConditionStatement):
def execute(self, engine):
return engine.execute_RepeatWhile(self)
class Else(Block):
def execute(self, engine):
return engine.execute_Else(self)
@dataclass(frozen=True)
class If(ConditionStatement):
orelse: Union['If', Else]
def execute(self, engine):
return engine.execute_If(self)
@dataclass(frozen=True)
class Programme(Block):
def execute(self, engine):
return engine.execute_Programme(self)
@dataclass(frozen=True)
class Function(Block):
name: str
def execute(self, engine):
return engine.execute_Function(self)
# vim: tw=80 ts=4 sts=4 sw=4 et

View file

@ -11,6 +11,7 @@ import pygame.surfarray
from . import assets
class Badge:
def __init__(self, qrcode, image, image_menu=None):
self.qrcode = qrcode
@ -19,7 +20,9 @@ class Badge:
@classmethod
def load_from_assets(cls):
data = tomllib.load(open(assets / 'badges/badges.toml', 'rb'))
with open(assets / 'badges/badges.toml', 'rb') as file:
data = tomllib.load(file)
url_template = data['url_template']
for badge in data['badge']:
@ -29,7 +32,8 @@ class Badge:
url = url_template.format(token=badge.pop('token'))
qrcode = pygame.surfarray.make_surface(
cv2.QRCodeEncoder.create().encode(url))
cv2.QRCodeEncoder.create().encode(url)
)
image_path = assets / 'badges' / badge.pop('image')
image = pygame.image.load(image_path)
@ -40,11 +44,9 @@ class Badge:
image_menu = None
else:
image_menu = pygame.image.load(image_menu_path)
assert not badge
yield cls(qrcode, image, image_menu)
if __name__ == '__main__':
main()
# vim: tw=80 ts=4 sts=4 sw=4 et

View file

@ -44,7 +44,9 @@ class TokenDataMixIn:
text: str
type: Optional[TokenType] = dataclasses.field(default=None)
class Token(TokenDataMixIn, enum.Enum):
# fmt: off
BEGIN = 'START', TokenType.BEGIN
END = 'KONIEC', TokenType.END
@ -114,17 +116,23 @@ class Token(TokenDataMixIn, enum.Enum):
DIGIT_9 = '9', TokenType.DIGIT
NEWLINE = None, TokenType.NEWLINE
# fmt: on
def __repr__(self):
return f'{type(self).__name__}.{self.name}'
class Block(typing.NamedTuple):
aruco_id: int
id: int
token: int
def __str__(self):
return f'{self.id:03d} {self.token}' if self.id is not None else f'--- {self.token}'
return (
f'{self.id:03d} {self.token}'
if self.id is not None
else f'--- {self.token}'
)
class BlockLoader:
@ -148,4 +156,5 @@ class BlockLoader:
def get_block_by_id(self, key):
return self._by_id[key]
# vim: tw=80 ts=4 sts=4 sw=4 et

View file

@ -5,7 +5,6 @@
import asyncio
import enum
import sys
import threading
import bleak
@ -19,12 +18,15 @@ from . import (
NORDIC_UART_RX = '6e400002-b5a3-f393-e0a9-e50e24dcca9e'
NORDIC_UART_TX = '6e400003-b5a3-f393-e0a9-e50e24dcca9e'
class Command(bytes, enum.Enum):
# fmt: off
TURN_LEFT = b'H'
BACKWARD = b'J'
FORWARD = b'K'
TURN_RIGHT = b'L'
SONAR = b'S'
# fmt: on
class BluetoothBot:
@ -47,21 +49,23 @@ class BluetoothBot:
def disconnect(self):
assert self.loop is not None
asyncio.run_coroutine_threadsafe(
self.queue.put((None, None)), self.loop)
self.queue.put((None, None)), self.loop
)
self.thread.join()
def execute_programme(self, programme):
logger.debug(f'{type(self).__name__}.execute_programme()')
assert self.loop is not None
assert self.current_task is None
self.current_task = asyncio.run_coroutine_threadsafe(self._execute(programme), self.loop)
self.current_task = asyncio.run_coroutine_threadsafe(
self._execute(programme), self.loop
)
def stop_programme(self):
assert self.loop is not None
assert self.current_task is not None
self.current_task.cancel()
async def _execute(self, programme):
logger.debug(f'{type(self).__name__}._execute()')
try:
@ -69,7 +73,6 @@ class BluetoothBot:
finally:
self.current_task = None
async def rpc(self, command):
logger.debug(f'{type(self).__name__}.rpc({command=})')
future = self.loop.create_future()
@ -78,16 +81,19 @@ class BluetoothBot:
async def turn_left(self):
await self.rpc(Command.TURN_LEFT)
async def backward(self):
await self.rpc(Command.BACKWARD)
async def forward(self):
await self.rpc(Command.FORWARD)
async def turn_right(self):
await self.rpc(Command.TURN_RIGHT)
async def sonar(self):
return (await self.rpc(Command.SONAR))[0]
def rpc_threadsafe(self, command, *, timeout=None):
assert self.loop is not None
future = asyncio.run_coroutine_threadsafe(self.rpc(command), self.loop)
@ -95,39 +101,49 @@ class BluetoothBot:
def turn_left_threadsafe(self):
self.rpc_threadsafe(Command.TURN_LEFT)
def backward_threadsafe(self):
self.rpc_threadsafe(Command.BACKWARD)
def forward_threadsafe(self):
self.rpc_threadsafe(Command.FORWARD)
def turn_right_threadsafe(self):
self.rpc_threadsafe(Command.TURN_RIGHT)
def sonar_threadsafe(self):
return self.rpc_threadsafe(Command.SONAR)[0]
def rpc_nowait(self, command):
asyncio.run_coroutine_threadsafe(self.rpc(command), self.loop)
def turn_left_nowait(self):
self.rpc_nowait(Command.TURN_LEFT)
def backward_nowait(self):
self.rpc_nowait(Command.BACKWARD)
def forward_nowait(self):
self.rpc_nowait(Command.FORWARD)
def turn_right_nowait(self):
self.rpc_nowait(Command.TURN_RIGHT)
def sonar_nowait(self):
self.rpc_nowait(Command.SONAR)
async def _main(self):
self.loop = asyncio.get_running_loop()
self.last_range = None
try:
async with bleak.BleakClient(self.mac) as client:
nordic_uart_tx = client.services.get_characteristic(NORDIC_UART_TX)
nordic_uart_rx = client.services.get_characteristic(NORDIC_UART_RX)
nordic_uart_tx = client.services.get_characteristic(
NORDIC_UART_TX
)
nordic_uart_rx = client.services.get_characteristic(
NORDIC_UART_RX
)
self.connected = True
self.queue = asyncio.Queue(1)
@ -141,7 +157,9 @@ class BluetoothBot:
logger.debug(f'handle_tx(..., {data=})')
future.set_result(data)
if command is Command.SONAR:
self.last_range = data[0] if data[0] > 0 else float('inf')
self.last_range = (
data[0] if data[0] > 0 else float('inf')
)
await client.start_notify(nordic_uart_tx, handle_tx)
@ -161,3 +179,5 @@ class BluetoothBot:
self.connected = False
self.loop = None
# vim: tw=80 ts=4 sts=4 sw=4 et

View file

@ -1,23 +1,17 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# SPDX-FileCopyrightText: 2023 Wojtek Porczyk <woju@hackerspace.pl>
import sys
import time
import typing
import unicodedata
import cv2
import cv2.aruco
import cv2.aruco # pylint: disable=import-error
import matplotlib.path
import matplotlib.transforms
import numpy as np
from loguru import logger
from . import blocks
from . import (
ast,
blocks,
)
class Affine2DWithAxisRotation(matplotlib.transforms.Affine2D):
def rotate_to_axis(self, axis_unit_vector):
@ -25,7 +19,7 @@ class Affine2DWithAxisRotation(matplotlib.transforms.Affine2D):
rotate_mtx = np.array([
[-vy, vx, 0],
[-vx, -vy, 0],
[ 0, 0, 1]], float)
[ 0, 0, 1]], float) # fmt: skip
self._mtx = rotate_mtx @ self._mtx
self.invalidate()
return self
@ -35,17 +29,23 @@ class Affine2DWithAxisRotation(matplotlib.transforms.Affine2D):
rotate_mtx = np.array([
[-vy, -vx, 0],
[ vx, -vy, 0],
[ 0, 0, 1]], float)
[ 0, 0, 1]], float) # fmt: skip
self._mtx = rotate_mtx @ self._mtx
self.invalidate()
return self
def draw_label(img, text, org, fontFace, fontScale, color_bg, color_fg, thickness):
def draw_label(
img, text, org, fontFace, fontScale, color_bg, color_fg, thickness
):
if img is None:
return None
text = text.replace('Ł', 'L').replace('ł', 'l')
text = ''.join(c for c in unicodedata.normalize('NFKD', text)
if unicodedata.category(c)[0] != 'M')
text = ''.join(
c
for c in unicodedata.normalize('NFKD', text)
if unicodedata.category(c)[0] != 'M'
)
textsize, baseline = cv2.getTextSize(text, fontFace, fontScale, thickness)
cv2.rectangle(
img,
@ -57,26 +57,34 @@ def draw_label(img, text, org, fontFace, fontScale, color_bg, color_fg, thicknes
cv2.putText(img, text, org, fontFace, fontScale, color_fg, thickness)
return img
def draw_path(img, path, color, thickness):
if img is None:
return None
cv2.polylines(
img, np.array(path.to_polygons()).astype(int), True, color, thickness)
img, np.array(path.to_polygons()).astype(int), True, color, thickness
)
return img
class Detection(typing.NamedTuple):
aruco_id: int
corners: np.ndarray
block: blocks.Block
def _debug_draw(self, frame, corners, color_bg, color_fg):
def debug_draw(self, frame, color_bg=(0, 0, 0), color_fg=(255, 255, 255)):
if frame is None:
return
cv2.polylines(frame, corners.reshape((1,4,2)).astype(int), True,
color_bg, 2)
cv2.polylines(
frame,
self.corners.reshape((1, 4, 2)).astype(int),
True,
color_bg,
2,
)
top_left, *_ = corners
top_left, *_ = self.corners
cv2.rectangle(
frame,
top_left - [5, 5],
@ -95,36 +103,24 @@ class Detection(typing.NamedTuple):
2,
)
def debug_draw(self, frame, color_bg=(0, 0, 0), color_fg=(255, 255, 255)):
return self._debug_draw(frame, self.corners, color_bg, color_fg)
def debug_draw_flipped(self, frame, color_bg=(0, 0, 0), color_fg=(255, 255, 255)):
_, width, _ = frame.shape
return self._debug_draw(frame, np.array([
(width - x, y) for x, y in self.corners
]))
def get_next_detect_areas(
self, base_transform, axis_v, axis_h, *, debug_frame=None,
):
def get_next_detect_areas(self, base_transform, axis_v, axis_h):
top_left, _, _, bottom_left = self.corners
marker_height = np.linalg.norm(top_left - bottom_left)
corners_path = matplotlib.path.Path(self.corners)
transform = (
matplotlib.transforms.Affine2D()
.translate(*-top_left)
matplotlib.transforms.Affine2D().translate(*-top_left)
+ base_transform
+ matplotlib.transforms.Affine2D()
.translate(*top_left)
.translate(*axis_v * marker_height * -0.6)
.translate(*top_left)
.translate(*axis_v * marker_height * -0.6)
)
transform_v = (transform + matplotlib.transforms.Affine2D()
.translate(*axis_v * marker_height * 2.2)
.translate(*axis_h * marker_height * -1))
transform_v = transform + matplotlib.transforms.Affine2D().translate(
*axis_v * marker_height * 2.2
).translate(*axis_h * marker_height * -1)
transform_h = (transform + matplotlib.transforms.Affine2D()
.translate(*axis_h * marker_height * 4.5)
transform_h = transform + matplotlib.transforms.Affine2D().translate(
*axis_h * marker_height * 4.5
)
next_area_v = transform_v.transform_path(corners_path)
@ -137,13 +133,11 @@ class Detector:
def __init__(self, blur=1):
self.blocks = blocks.BlockLoader()
self.dictionary = cv2.aruco.getPredefinedDictionary(
cv2.aruco.DICT_ARUCO_ORIGINAL)
cv2.aruco.DICT_ARUCO_ORIGINAL
)
self.blur = blur
# self.parameters = cv2.aruco.DetectorParameters_create()
# self.parameters.detectInvertedMarker = True
def detect_markers(self, frame):
start_time = time.perf_counter()
frame = 255 - frame
if self.blur > 1:
frame = cv2.blur(frame, (self.blur, self.blur))
@ -151,11 +145,8 @@ class Detector:
corners, ids, _rejected = cv2.aruco.detectMarkers(
image=frame,
dictionary=self.dictionary,
# parameters=self.parameters,
)
# logger.debug(f'{rejected=}')
if ids is not None:
for c, aruco_id in zip(corners, ids.flatten()):
try:
@ -164,12 +155,11 @@ class Detector:
block = None
yield Detection(aruco_id, c.reshape((4, 2)).astype(int), block)
perf_ms = (time.perf_counter() - start_time) * 1000
# logger.debug(f'detect_markers() took {perf_ms:.3f} ms')
class BoardError(Exception):
pass
class Analyser:
def __init__(self, detections):
self.detections = list(detections)
@ -179,12 +169,11 @@ class Analyser:
end = None
is_func = None
for i in range(len(self.detections)-1, -1, -1):
for i in range(len(self.detections) - 1, -1, -1):
block = self.detections[i].block
if block is None:
continue
token = block.token
# logger.debug(f'{token=} {start=} {end=}')
if token.type in (blocks.TokenType.BEGIN, blocks.TokenType.END):
if token.type is blocks.TokenType.BEGIN:
if start is not None:
@ -194,8 +183,10 @@ class Analyser:
if end is not None:
raise BoardError('double end tile')
end = self.detections[i]
current_is_func = token in (blocks.Token.DEFINE_FUNCTION,
blocks.Token.END_FUNCTION)
current_is_func = token in (
blocks.Token.DEFINE_FUNCTION,
blocks.Token.END_FUNCTION,
)
if is_func is not None and current_is_func != is_func:
raise BoardError('mismatched start and end')
is_func = current_is_func
@ -224,7 +215,7 @@ class Analyser:
"""
candidate = None
found_multiple = False
for i in range(len(self.detections)-1, -1, -1):
for i in range(len(self.detections) - 1, -1, -1):
i_top_left, *_ = self.detections[i].corners
if area.contains_point(i_top_left):
if found_multiple or candidate is not None:
@ -245,21 +236,27 @@ class Analyser:
return candidate
def get_line(
self, detection, area_h, base_transform, axis_v, axis_h, *,
debug_frame=None
self,
detection,
area_h,
base_transform,
axis_v,
axis_h,
*,
debug_frame=None,
):
while True:
yield detection.block.token
detection = self.find_detection_in_area(area_h,
debug_frame=debug_frame)
detection = self.find_detection_in_area(
area_h, debug_frame=debug_frame
)
if detection is None:
break
_, area_h = detection.get_next_detect_areas(
base_transform, axis_v, axis_h, debug_frame=debug_frame)
base_transform, axis_v, axis_h
)
def get_programme(self, *, debug_frame=None):
start_time = time.perf_counter()
# 1. find start and end tiles
start, end = self.find_start_and_end()
@ -268,8 +265,9 @@ class Analyser:
end_top_left, *_ = end.corners
if debug_frame is not None:
# cv2.line(debug_frame, start_top_left, (0, 0), (255, 255, 0), 2)
cv2.line(debug_frame, start_top_left, end_top_left, (255, 255, 0), 2)
cv2.line(
debug_frame, start_top_left, end_top_left, (255, 255, 0), 2
)
axis = end_top_left - start_top_left
axis_v = axis / np.linalg.norm(axis)
@ -278,7 +276,8 @@ class Analyser:
# 3. calcucate transforms for rotating and scaling detection area
# (will be applied to each marker outline around its top_left)
# (will be used for both horizontal and vertical areas)
base_transform = (Affine2DWithAxisRotation()
base_transform = (
Affine2DWithAxisRotation()
.rotate_to_axis(axis_v)
# 4 == difference between narrowest and widest tile + 1
.scale(4, 1.2)
@ -290,9 +289,18 @@ class Analyser:
current = start
while True:
detect_area_v, detect_area_h = current.get_next_detect_areas(
base_transform, axis_v, axis_h, debug_frame=debug_frame)
yield tuple(self.get_line(current, detect_area_h,
base_transform, axis_v, axis_h, debug_frame=debug_frame))
base_transform, axis_v, axis_h
)
yield tuple(
self.get_line(
current,
detect_area_h,
base_transform,
axis_v,
axis_h,
debug_frame=debug_frame,
)
)
if detect_area_v.contains_point(end_top_left):
yield (end.block.token,)
@ -300,8 +308,9 @@ class Analyser:
end.debug_draw(debug_frame)
break
current = self.find_detection_in_area(detect_area_v,
debug_frame=debug_frame)
current = self.find_detection_in_area(
detect_area_v, debug_frame=debug_frame
)
if current is None:
raise BoardError('no candidate in detection area')
@ -309,10 +318,9 @@ class Analyser:
finally:
if debug_frame is not None:
for detection in self.detections:
detection.debug_draw(debug_frame, color_bg=(0, 255, 255),
color_fg=(0, 0, 0))
detection.debug_draw(
debug_frame, color_bg=(0, 255, 255), color_fg=(0, 0, 0)
)
perf_ms = (time.perf_counter() - start_time) * 1000
# logger.debug(f'get_programme() took {perf_ms:.3f} ms')
# vim: tw=80 ts=4 sts=4 sw=4 et