Skip to content
Snippets Groups Projects

Architecture planning demo.

No external dependencies (other than Python) required. To run, try running:

python src/arch/demo.py

Demo breakdown

In this file, you should first see:

from planner import *
from realtime import *
from domains.simplenav import *

from robots.demo import MyRobotInterface

These imports provide the planning and execution systems, and then provide the planning domain. The robot interface then provides the actions and belief update systems to tie the planning and execution together.

The domain itself contains information about states in the world and how they interact. These are standard first-order logic states, and can be evaluated on their own or as increasingly complex combinations of predicates:

assert (at_A).eval_given(StateSet(at_A))
assert (at_A).eval_given(StateSet([at_A]))
assert (at_A).eval_given(StateSet(at_A))
assert (at_B).eval_given(StateSet(at_B))
assert (at_A | at_B).eval_given(StateSet(at_B))
assert (at_A | at_B).eval_given(StateSet([at_A, at_B]))
assert (at_A & at_B).eval_given(StateSet([at_A, at_B]))
assert (at_A & at_B).eval_given([at_A, at_B]) # type:ignore
assert not ((at_B).eval_given(StateSet(at_A)))
assert not ((at_A & at_B).eval_given(StateSet(at_B)))

We then set up the various databases relevant to the problem.

actiondb = ActionDatabase()
actiondb.register_class(MyRobotInterface)

beliefdb = BeliefChecker()
beliefdb.register_class(MyRobotInterface)

assessment_db = AssessmentDatabase()
assessment_db.add_assessor(TimeAssessment)
assessment_db.add_assessor(ResourceAssessment, battery)

The action database contains actions the robot is able to make use of. The belief database contains functions which can be called to update the robots understanding of the world. The assessment database runs in real-time to track expected continuous operation: for example, how much battery draw we expect to occur over time. If these values fall outside of expectation, we can then identify that a failure scenario is occurring and address it.

We then hand all of these over to a behavior manager. When provided with a goal, it will execute behaviors which work towards solving them.

bm = BehaviorManager(
    action_db=actiondb,
    belief_db=beliefdb,
    assessment_db=assessment_db,
    verbose=False
)

bm.set_goal([package_at_A])

This configuration allows us to find and run the viable plan:

plan = bm.get_plan((MyRobotInterface, ["A", False]))
for p in plan if plan is not None else []:
    print(p)
bm.run_plan(plan)

Domains

The domain is described in python, which is convenient, reduces the things to learn, and allows us to easily build on top of it. The simplenav demo contains:

new_domain("simpleNav")

at_A = State()
at_B = State()
at_C = State()
at_D = State()
at_E = State()
holding_package = State()
package_at_A = State()
door_open = State()
never = State()

battery = Resource(min=0, max=100)

goto_A = Transition(
    requires(at_B | at_C),
    adds(at_A),
    removes(at_B, at_C),
    battery.lose(5)
)

[... remainder of file omitted for brevity ...]

Note the creation of states, which are used in the planning description process. Note also the "battery" resource: this functionality is hidden from the plan, but we define here that a plan should only be considered valid if the battery resource remains within some range. We could also define "time" as a resource in the same way.

We then define a 'transition', which describes how this hypothetical robot behavior will interact with the environment. In the case of the goto_A transition, we see that it requires us to be in B or C, and adds A. We also see that it drains the battery.

Note that the transition is not described in terms of actual robot behavior. This is intentional: by stripping away the implementation and only considering the rules of the environment, this domain can be reused across multiple different machines.

Plan execution cycle

The execution of a plan is pretty straightforward: there's a queue of upcoming behaviors, and we run through a process of executing them. From realtime/behavior_manager.py:

    def run_plan(self, plan):
        self.behavior_queue = plan.steps
        while len(self.behavior_queue) != 0:
            next_step = self.behavior_queue.pop(0)
            self.do_step(next_step)

This queue is dynamic. From the same file, we...

  1. start by informing the self-assessment system that we intend to call a function, and then call it:
    def do_step(self, step):
        assert type(step) is Transition
        all_signs_say_ok = True

        if self.assessment_db:
            self.assessment_db.note_calling(step)

        self.action_db.call_action(step)
  1. This allows us to note that the function call completed, and check that the operation of that action is as expected:

        if self.assessment_db:
            self.assessment_db.note_finishing(step)
            all_signs_say_ok = self.assessment_db.check_assessment_after_action(step)
  1. We also update the belief databse, based on which things are relevant to this action and state (to ensure that this update is performant):
        if self.belief_db:
            self.belief_db.update_belief_after_action(step)
            all_signs_say_ok = (
                all_signs_say_ok and self.belief_db.check_belief_after_action(step)
            )
  1. Should that process fail, we enter a simple recovery heuristic: modify the behavior queue to solve the problem from the new state, or fail out:
        if not all_signs_say_ok:
            self.recovery()
            self.recovery_counter += 1
            if self.recovery_counter >= 3:
                print("irrecoverable failure... clearing goal queue")
                self.behavior_queue.clear()
        else:
            self.recovery_counter = 0  # reset!

Robot interfaces

Creating new interfaces is designed to be pretty straightforward. Take a look at robots/demo.py, and you should see:


from planner import *
from realtime import *

from domains.simplenav import *


class MyRobotInterface:
    [ ... omitted for brevity ... ]

    @RegisterActionCallback(goto_A)
    def do_goto_A(self):
        if "init" not in dir(self):
            raise Exception("Failed to initialize class!")
        self.current_loc = "A"
        print("going to A")

    [ ... omitted for brevity ... ]

This is a pretty simple mock class, but it illustrates the concept of creating new actions for the architecture. The class imports the relevant domain, and then marks that it provides a function that implements a known action. When provided to the action database, this annotation will allow the action and its implementation to be tied together.

Later on in that file, we see:

    @RegisterBeliefCallback(at_A)
    def do_check_A(self) -> bool:
        if "init" not in dir(self):
            raise Exception("Failed to initialize class!")
        return self.current_loc == "A"

We now use the 'RegisterBeliefCallback' annotation to register a new belief callback. When the belief database needs to update its belief about this state, it will call this function. These updates occur intelligently: they are not called on an arbitrary queue, they are called prior to being needed (and will be deprioritized if not needed).

Other existing features

  • When actions are taken, the expected observations are compared to the actual ones. If a mismatch is identified, implying a failure to achieve a goal, the system re-plans and re-attempts to find a solution.
  • Because the backend is described via pythonic functions, produced plans are just a series of python function calls. We can modify these as needed, and even create plans as python scripts by hand pretty easily. This allows
  • Some basic safety proving. A plan can only be made in a way that enforces certain requirmenets for each action.
  • There are hooks to treat each of these modules (the planner, the databases) as plugins. So we can swap them in and out as new techniques or approaches are desired.

Other planned features

  • Formal verification: mathematical guarantees that we are keeping within a certain set of parameters while executing behavior.
  • Surprisal and Creativity mechanisms: there's hooks for these if we ever think it'll be useful to pull that functionality in, but I haven't really bothered implementing it.
  • Multi-Agent systems: if we can pass belief about other robots to the belief database, they can plan around each other in a way that scales and is programmatic.

Changelog

  • 1.2: Create C++ bindings, bugfixes.
  • 1.1: Use the "Unified Planning" engine, improvements to API, bugfixes.
  • 1.0: Prototype version shown to work on basic problems and shared internally.