tuxgo/tuxgo/engine.py

486 lines
14 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-or-later
# SPDX-FileCopyrightText: 2023 Wojtek Porczyk <woju@hackerspace.pl>
"""
Engine might be either a visitor or a virtual machine
"""
import abc
import asyncio
import itertools
import types
from abc import abstractmethod
from typing import (
Optional,
)
from loguru import logger
from . import (
ast,
bluetooth,
)
class BaseEngine(metaclass=abc.ABCMeta):
def __init__(self, functions=types.MappingProxyType({})):
# pylint: disable=redefined-outer-name
self.functions = {**functions}
self.variables = {}
#
# The following methods need to be overloaded in the engine implementation:
#
@abstractmethod
def eval_Number(self, number: ast.Number):
raise NotImplementedError()
@abstractmethod
def eval_Variable(self, variable: ast.Variable):
raise NotImplementedError()
@abstractmethod
def eval_BinOp(self, binop: ast.BinOp):
raise NotImplementedError()
@abstractmethod
def eval_Condition(self, condition: ast.Condition):
raise NotImplementedError()
@abstractmethod
def execute_Repeat(self, repeat: ast.Repeat):
raise NotImplementedError()
@abstractmethod
def execute_RepeatWhile(self, repeat_while: ast.RepeatWhile):
raise NotImplementedError()
@abstractmethod
def execute_Step(self, step: ast.Step):
raise NotImplementedError()
@abstractmethod
def execute_TurnLeft(self, turn_left: ast.TurnLeft):
raise NotImplementedError()
@abstractmethod
def execute_TurnRight(self, turn_right: ast.TurnRight):
raise NotImplementedError()
@abstractmethod
def execute_Jump(self, jump: ast.Jump):
raise NotImplementedError()
@abstractmethod
def execute_PickUp(self, pick_up: ast.PickUp):
raise NotImplementedError()
@abstractmethod
def execute_Activate(self, activate: ast.Activate):
raise NotImplementedError()
@abstractmethod
def execute_Draw(self, draw: ast.Draw):
raise NotImplementedError()
@abstractmethod
def execute_Place(self, place: ast.Place):
raise NotImplementedError()
@abstractmethod
def execute_CallFunction(self, call_function: ast.CallFunction):
raise NotImplementedError()
@abstractmethod
def execute_If(self, if_: ast.If):
raise NotImplementedError()
@abstractmethod
def execute_Else(self, else_: ast.Else):
raise NotImplementedError()
@abstractmethod
def execute_Programme(self, programme: ast.Programme):
raise NotImplementedError()
# original ScottieGo forbids redefining variables, which are more like constants
class _ConstDict(dict):
def __setitem__(self, key, value):
if key in self:
raise TypeError(f'cannot redefine variable {key}')
super().__setitem__(key, value)
class ExecEngine(BaseEngine):
class Break(BaseException):
pass
class BreakFunction(BaseException):
pass
def __init__(self, *args, allow_variable_reassignment=True, **kwds):
super().__init__(*args, **kwds)
self.variables = {} if allow_variable_reassignment else _ConstDict()
self.functions = {}
self.main: Optional[ast.Programme] = None
def eval_Number(self, number):
return number.value
def eval_Variable(self, variable):
try:
return self.variables[variable.name]
except KeyError as exc:
raise NameError(
f'variable {variable.name} referenced before assignment '
f'at line {variable.line} block {variable.cols[0]}'
) from exc
def eval_BinOp(self, binop):
return binop.op(binop.left.eval(self), binop.right.eval(self))
def eval_Condition(self, condition):
return self.detect(condition.direction, condition.object)
def execute_block(self, block):
# TODO bracket
for stmt in block:
stmt.execute(self)
def execute_Programme(self, programme):
self.execute_block(programme.body)
def execute_Repeat(self, repeat):
if isinstance(repeat.counter, ast.Forever):
counter = itertools.count(start=1)
else:
counter = range(1, repeat.counter.eval(self) + 1)
for _i in counter:
try:
self.execute_block(repeat.body)
except self.Break:
break
def execute_RepeatWhile(self, repeat_while):
while repeat_while.condition.eval(self):
try:
self.execute_block(repeat_while.body)
except self.Break:
break
def execute_If(self, if_):
# TODO this will need to be fixed using iteration over linked list (not
# recursion) because of the markers
if if_.condition.eval(self):
self.execute_block(if_.body)
elif if_.orelse is not None:
if_.orelse.execute(self)
def execute_Else(self, else_):
self.execute_block(else_.body)
def execute_CallFunction(self, call_function):
try:
function = self.functions[call_function.name]
except KeyError as exc:
raise NameError(
f'function {call_function.name} undefined '
f'at line {call_function.line} block {call_function.cols[0]}'
) from exc
function.execute(self)
def execute_Assign(self, assign):
self.variables[assign.name] = assign.expression.eval(self)
def execute_Place(self, place):
return self.place(place.direction)
def execute_Step(self, step):
return self.step(step.counter.eval(self))
def execute_Activate(self, activate):
return self.activate()
def execute_Draw(self, draw):
return self.draw()
def execute_Jump(self, jump):
return self.jump()
def execute_PickUp(self, pick_up):
return self.pick_up()
def execute_TurnLeft(self, turn_left):
return self.turn_left()
def execute_TurnRight(self, turn_right):
return self.turn_right()
def execute_Break(self, break_):
raise self.Break()
def execute_BreakFunction(self, break_function):
raise self.BreakFunction()
#
# Abstract methods to be defined by game implementation
# (e.g., pygame or physical robot)
#
@abstractmethod
def step(self, counter: int):
raise NotImplementedError()
@abstractmethod
def turn_left(self):
raise NotImplementedError()
@abstractmethod
def turn_right(self):
raise NotImplementedError()
@abstractmethod
def jump(self):
raise NotImplementedError()
@abstractmethod
def pick_up(self):
raise NotImplementedError()
@abstractmethod
def place(self, direction: ast.Direction):
raise NotImplementedError()
@abstractmethod
def activate(self):
raise NotImplementedError()
@abstractmethod
def draw(self):
raise NotImplementedError()
@abstractmethod
def detect(self, direction: ast.Direction, obj: ast.Object):
raise NotImplementedError()
def noop(self):
pass
class AsyncEngine(ExecEngine):
# pylint: disable=invalid-overridden-method
async def eval_Number(self, number):
return number.value
async def eval_Variable(self, variable):
try:
return self.variables[variable.name]
except KeyError as exc:
raise NameError(
f'variable {variable.name} referenced before assignment '
f'at line {variable.line} block {variable.cols[0]}'
) from exc
async def eval_BinOp(self, binop):
return binop.op(
await binop.left.eval(self),
await binop.right.eval(self),
)
async def eval_Condition(self, condition):
logger.debug(f'{type(self).__name__}.eval_Condition()')
return await self.detect(condition.direction, condition.object)
async def execute_block(self, block):
logger.debug(f'{type(self).__name__}.execute_block()')
# TODO bracket
for stmt in block:
await stmt.execute(self)
async def execute_Programme(self, programme):
logger.debug(f'{type(self).__name__}.execute_Programme()')
await self.execute_block(programme.body)
async def execute_Repeat(self, repeat):
logger.debug(f'{type(self).__name__}.execute_Repeat()')
if isinstance(repeat.counter, ast.Forever):
counter = itertools.count(start=1)
else:
counter = range(1, await repeat.counter.eval(self) + 1)
for _i in counter:
try:
await self.execute_block(repeat.body)
except self.Break:
break
async def execute_RepeatWhile(self, repeat_while):
logger.debug(f'{type(self).__name__}.execute_RepeatWhile()')
while await repeat_while.condition.eval(self):
try:
await self.execute_block(repeat_while.body)
except self.Break:
break
async def execute_If(self, if_):
# TODO this will need to be fixed using iteration over linked list (not
# recursion) because of the markers
if await if_.condition.eval(self):
await self.execute_block(if_.body)
elif if_.orelse is not None:
await if_.orelse.execute(self)
async def execute_Else(self, else_):
await self.execute_block(else_.body)
async def execute_CallFunction(self, call_function):
try:
function = self.functions[call_function.name]
except KeyError as exc:
raise NameError(
f'function {call_function.name} undefined '
f'at line {call_function.line} block {call_function.cols[0]}'
) from exc
await function.execute(self)
async def execute_Assign(self, assign):
self.variables[assign.name] = await assign.expression.eval(self)
async def execute_Place(self, place):
return await self.place(place.direction)
async def execute_Step(self, step):
return await self.step(await step.counter.eval(self))
async def execute_Activate(self, activate):
return await self.activate()
async def execute_Draw(self, draw):
return await self.draw()
async def execute_Jump(self, jump):
return await self.jump()
async def execute_PickUp(self, pick_up):
return await self.pick_up()
async def execute_TurnLeft(self, turn_left):
return await self.turn_left()
async def execute_TurnRight(self, turn_right):
return await self.turn_right()
async def execute_Break(self, break_):
raise self.Break()
async def execute_BreakFunction(self, break_function):
raise self.BreakFunction()
#
# Abstract methods to be defined by game implementation
# (e.g., pygame or physical robot)
#
@abstractmethod
async def step(self, counter: int):
raise NotImplementedError()
@abstractmethod
async def turn_left(self):
raise NotImplementedError()
@abstractmethod
async def turn_right(self):
raise NotImplementedError()
@abstractmethod
async def jump(self):
raise NotImplementedError()
@abstractmethod
async def pick_up(self):
raise NotImplementedError()
@abstractmethod
async def place(self, direction: ast.Direction):
raise NotImplementedError()
@abstractmethod
async def activate(self):
raise NotImplementedError()
@abstractmethod
async def draw(self):
raise NotImplementedError()
@abstractmethod
async def detect(self, direction: ast.Direction, obj: ast.Object):
raise NotImplementedError()
def noop(self):
pass
class RemoteControlEngine(AsyncEngine):
def __init__(self, *args, bot, **kwds):
super().__init__(*args, **kwds)
self.bot = bot
async def step(self, counter: int):
logger.debug(f'{type(self).__name__}.step({counter=})')
r = range(abs(counter))
logger.debug(f' {r=}')
for _ in r:
command = (
bluetooth.Command.FORWARD
if counter > 0
else bluetooth.Command.BACKWARD
)
logger.debug(f' {command=}')
await asyncio.sleep(0.1)
await self.bot.rpc(command)
logger.debug(f'{type(self).__name__}.step() return')
async def turn_left(self):
await self.bot.rpc(bluetooth.Command.TURN_LEFT)
async def turn_right(self):
await self.bot.rpc(bluetooth.Command.TURN_RIGHT)
async def jump(self):
raise NotImplementedError()
async def pick_up(self):
raise NotImplementedError()
async def place(self, direction: ast.Direction):
raise NotImplementedError()
async def activate(self):
raise NotImplementedError()
async def draw(self):
raise NotImplementedError()
async def detect(self, direction: ast.Direction, obj: ast.Object):
logger.debug(f'{type(self).__name__}.detect({direction=}, {obj=})')
if direction is not ast.Direction.IN_FRONT:
raise NotImplementedError()
if obj not in (ast.Object.OBSTACLE, ast.Object.EMPTY):
raise NotImplementedError()
sonar_range = await self.bot.sonar()
logger.debug(f'{type(self).__name__}.detect {sonar_range=}')
result = (sonar_range < 10) ^ (obj is ast.Object.EMPTY)
logger.debug(f'{type(self).__name__}.detect -> {result}')
return result
# vim: tw=80 ts=4 sts=4 sw=4 et