Module spatial.automaton_planning
Expand source code
from copy import deepcopy
import networkx as nx
from spatial.logic import Spatial
from spatial.ltlf2dfa_nx import LTLf2nxParser
def number_to_base(n, b):
"""
translates a decimal number to a different base in a list of digits
Args:
n: the decimal number
b: the new base
Returns:
A list of digits in the new base
"""
if n == 0:
return [0]
digits = []
while n:
digits.append(int(n % b))
n //= b
return digits[::-1]
def sog_fits_to_guard(guard, sog, guard_ap, sog_ap):
"""
checks if a guard fits to a set of guards, including undecided bits denoted as X
Args:
guard: A single guard. Example 'X01'
sog: A set of guards Example {'001','101'}
guard_ap: atomic propositions of the guard
sog_ap: atomic propositions of the set of guards
Returns:
A subset of the set of guards matching to the single guard
"""
guards = deepcopy(sog)
# check if this guard fits to one of the guards
for i, g_value in enumerate(guard):
if g_value == 'X':
continue
if guard_ap[i] in sog_ap:
j = sog_ap.index(guard_ap[i])
wrong_guards = []
for guard in guards:
if guard[j] != 'X' and guard[j] != g_value:
# if a synth guard is not matching to the current config, mark for removal
wrong_guards.append(guard)
# remove marked guards
for wg in wrong_guards:
guards.remove(wg)
return guards
def flip_guard_bit(guard, bit, skip_ones=False):
# construct hypothetical test_guard
test_guard = list(guard)
if guard[bit] == '1':
if skip_ones:
test_guard[bit] = '1'
else:
test_guard[bit] = '0'
elif guard[bit] == '0':
test_guard[bit] = '1'
elif guard[bit] == 'X':
test_guard[bit] = 'X'
test_guard = ''.join(test_guard)
return test_guard
def replace_guard_bit(guard, bit, lit):
new_guard = list(guard)
new_guard[bit] = lit
new_guard = ''.join(new_guard)
return new_guard
def resolve_all_x(guard):
return_set = set()
for i, o in enumerate(guard):
if o == 'X':
return_set = return_set.union(resolve_all_x(replace_guard_bit(guard, i, '0')))
return_set = return_set.union(resolve_all_x(replace_guard_bit(guard, i, '1')))
if len(return_set) == 0:
return_set.add(guard)
return return_set
def compare_obs(test_obs, existing_obs):
if len(test_obs) != len(existing_obs):
return False
for i in range(len(test_obs)):
if test_obs[i] == 'X' and existing_obs[i] == 'X':
continue
if test_obs[i] != existing_obs[i]:
return False
return True
def reduce_set_of_guards(sog):
# first, resolve all generalizations
new_sog = set()
for g in sog:
new_sog = new_sog.union(resolve_all_x(g))
changed = True
# blow up the set of guards with all possible generalizations
while changed:
blown_up_sog = set(new_sog)
for guard in new_sog: # for each guard
for i, o in enumerate(guard): # for each bit in the guard
# construct hypothetical test_guard
test_guard = flip_guard_bit(guard, i)
# check if the test_guard and the guard are different (they are not if 'X' got "flipped")
if test_guard == guard:
continue
# if the hypothetical guard is not also in the new sog, we cannot reduce
test_guard_in_new_sog = False
for g in new_sog:
if compare_obs(test_guard, g):
test_guard_in_new_sog = True
if not test_guard_in_new_sog:
continue
# create a reduced guard
reduced_guard = replace_guard_bit(guard, i, 'X')
# add it to the reduced sog
blown_up_sog.add(reduced_guard)
if new_sog == blown_up_sog:
changed = False
new_sog = blown_up_sog
# sog is all blown up with all possible generalizations
unnecessary_guards = set()
for guard in new_sog:
for other_guard in new_sog:
# skip yourself
if guard == other_guard:
continue
# check if the guard subsumes the other_guard
subsumes = True
for j in range(len(guard)):
if guard[j] == 'X':
continue
if other_guard[j] == 'X':
subsumes = False
if not guard[j] == other_guard[j]:
subsumes = False
if subsumes:
unnecessary_guards.add(other_guard)
return new_sog.difference(unnecessary_guards)
class AutomatonPlanner:
"""
Enables online automaton-based planning of the temporal aspects.
"""
def __init__(self):
"""
Initializes the AutomatonPlanner
"""
self.ltlf_parser = LTLf2nxParser()
self.spatial_dict = {}
self.next_free_variable_id = 0
self.temporal_formula = None
self.dfa = None
self.current_state = None
def currently_accepting(self):
"""
Returns True if the current state is accepting
"""
return self.current_state in self.dfa.graph['acc']
def reset_state(self):
"""
Resets the current state to the initial state
"""
self.current_state = self.dfa.graph['init']
def simulate_trace(self, trace, ap):
"""
Simulates a run on the automaton, given some inputs and updates the state accordingly
Args:
trace: The input trace
ap: the input atomic propositions
"""
self.reset_state()
for symbol in trace:
self.dfa_step(symbol, ap)
return self.current_state in self.dfa.graph['acc']
def dfa_step(self, symbol, ap):
"""
Executes a single step of the automaton
Args:
symbol: The input symbol
ap: the input atomic propositions
"""
assert isinstance(symbol, str), 'Symbol is of wrong type, should be str! %s' % type(symbol)
for succ in self.dfa.successors(self.current_state):
# directly applying the first edge that fits works because the dfa is deterministic!
if sog_fits_to_guard(symbol, self.dfa.edges[self.current_state, succ]['guard'], ap, self.dfa.graph['ap']):
self.current_state = succ
break
def plan_step(self):
"""
Returns the desired transition and a selfloop to maintain current state
based on the shortest path to any accepting state.
Returns:
target_next_edge, self_loop_constraint, edge: If no path exists, target_next_edge is None.
If target_next_edge is a self-loop (holding accepting state), self_loop_constraint is None.
edge refers to the targeted edge in the DFA and can be used as a reference for pruning.
"""
if self.current_state in self.dfa.graph['acc']:
# if we are in an accepting state, hold it!
return self.dfa.edges[self.current_state, self.current_state]['guard'], None, None
# find shortest path to all accepting states
targets = {}
path_exists = False
for acc_node in self.dfa.graph['acc']:
try:
targets[acc_node] = nx.shortest_path(self.dfa, source=self.current_state, target=acc_node)
path_exists = True
except nx.exception.NetworkXNoPath:
targets[acc_node] = float('inf')
if not path_exists:
return None, None, None
# find the closest reachable accepting state
target = min(targets.items(), key=lambda x: len(x[1]))
target_path = target[1]
target_next_guards = self.dfa.edges[self.current_state, target_path[1]]['guard']
target_constraint_guards = []
for succ in self.dfa.successors(self.current_state):
if succ != target_path[1] and succ != self.current_state:
target_constraint_guards.extend(self.dfa.edges[self.current_state, succ]['guard'])
return reduce_set_of_guards(target_next_guards), reduce_set_of_guards(target_constraint_guards), (self.current_state, target_path[1])
def tree_to_dfa(self, tree):
"""
Converts a SpaTiaL tree to a DFA.
Args:
tree: input tree parsed by spatial lark parser
"""
self.temporal_formula = self.extract_temporal(tree)
self.ltlf_parser.parse_formula(self.temporal_formula)
self.dfa = self.ltlf_parser.to_nxgraph()
def get_next_variable_string(self):
"""
Creates a new, unique variable name.
Returns: the variable string
"""
id_base_26 = number_to_base(self.next_free_variable_id, 26)
letters = [chr(n + ord('a')) for n in id_base_26]
self.next_free_variable_id += 1
return ''.join(letters)
def extract_temporal(self, tree):
"""
Takes a parsed tree and extracts an LTLf formula, creating variables for each "spatial"-type subtree.
Keeps dictionaries for mappings between subtrees and variable names.
Args:
tree: input tree parsed by spatial lark parser
Returns:
LTLf formula represented as a string.
"""
f = ''
if tree.data == 'temporal':
f += self.extract_temporal(tree.children[0])
elif tree.data == 'always':
f += 'G(' + self.extract_temporal(tree.children[0]) + ')'
elif tree.data == 'eventually':
f += 'F(' + self.extract_temporal(tree.children[0]) + ')'
elif tree.data == 'until':
f += '(' + self.extract_temporal(tree.children[0]) + ') U (' + self.extract_temporal(tree.children[1]) + ')'
elif tree.data == 'and_':
f += '(' + self.extract_temporal(tree.children[0]) + ') & (' + self.extract_temporal(tree.children[1]) + ')'
elif tree.data == 'or_':
f += '(' + self.extract_temporal(tree.children[0]) + ') | (' + self.extract_temporal(tree.children[1]) + ')'
elif tree.data == 'not_':
f += '!(' + self.extract_temporal(tree.children[0]) + ')'
elif tree.data == 'implies_':
f += '(' + self.extract_temporal(tree.children[0]) + ') -> (' + self.extract_temporal(tree.children[1]) + ')'
elif tree.data == 'spatial':
element = hash(tree) # compute hash once
# map hash to variable name and tree
if element not in self.spatial_dict.keys():
variable_string = self.get_next_variable_string()
self.spatial_dict[element] = {
'variable': variable_string,
'tree': tree
}
f += self.spatial_dict[element]['variable']
else:
assert False, 'Operator %s not implemented for automaton-based planning!' % tree.data
return f
def get_variable_to_tree_dict(self):
"""
Returns:
A mapping between temporal variable name and associated spatial tree.
"""
return_dict = {}
for val in self.spatial_dict.values():
return_dict[val['variable']] = val['tree']
return return_dict
def get_dfa_ap(self):
"""
Returns:
The atomic propositions of the DFA.
"""
return self.dfa.graph['ap']
if __name__ == '__main__':
spatial = Spatial()
planner = AutomatonPlanner()
# parse the formula
ex = "( (G(!(blue touch red))) & (F(blue touch green)) )"
ex_tree = spatial.parse(ex) # build the usual tree
planner.tree_to_dfa(ex_tree) # transform the tree into an automaton
# print the corresponding LTLf formula
print(planner.temporal_formula)
# this dictionary contains a variable name to spatial tree mapping
print(planner.get_variable_to_tree_dict())
# you have to define in which order you pass variable assignments
trace_ap = ['a', 'b']
# example traces (sequences of observations)
ex_trace_acc = ['10', '10', '10', '11']
ex_trace_rej = ['10', '00', '10', '01']
# the automaton can evaluate such traces
print('Trace 1 is', planner.simulate_trace(ex_trace_acc, trace_ap))
print('Trace 2 is', planner.simulate_trace(ex_trace_rej, trace_ap))
# INTERACTIVE PLANNING EXAMPLE
# resets the automaton current state to the initial state
planner.reset_state()
# before you ask anything from the automaton, provide a initial observation
planner.dfa_step('10', trace_ap)
# planning loop
print('')
print('INTERACTIVE EXAMPLE')
print('')
print('enter observations in order', planner.get_dfa_ap(), 'or enter \'exit\' to exit')
while True:
target_set, constraint_set, _ = planner.plan_step()
print('Currently accepting:', planner.currently_accepting())
if target_set:
print('Next step: Reach', target_set, planner.get_dfa_ap(),
', never satisfy', constraint_set, planner.get_dfa_ap())
else:
print('No path exists!')
obs = input('Next Observation:')
if obs == 'exit':
break
planner.dfa_step(obs, planner.get_dfa_ap())
Functions
def compare_obs(test_obs, existing_obs)
-
Expand source code
def compare_obs(test_obs, existing_obs): if len(test_obs) != len(existing_obs): return False for i in range(len(test_obs)): if test_obs[i] == 'X' and existing_obs[i] == 'X': continue if test_obs[i] != existing_obs[i]: return False return True
def flip_guard_bit(guard, bit, skip_ones=False)
-
Expand source code
def flip_guard_bit(guard, bit, skip_ones=False): # construct hypothetical test_guard test_guard = list(guard) if guard[bit] == '1': if skip_ones: test_guard[bit] = '1' else: test_guard[bit] = '0' elif guard[bit] == '0': test_guard[bit] = '1' elif guard[bit] == 'X': test_guard[bit] = 'X' test_guard = ''.join(test_guard) return test_guard
def number_to_base(n, b)
-
translates a decimal number to a different base in a list of digits
Args
n
- the decimal number
b
- the new base
Returns
A list of digits in the new base
Expand source code
def number_to_base(n, b): """ translates a decimal number to a different base in a list of digits Args: n: the decimal number b: the new base Returns: A list of digits in the new base """ if n == 0: return [0] digits = [] while n: digits.append(int(n % b)) n //= b return digits[::-1]
def reduce_set_of_guards(sog)
-
Expand source code
def reduce_set_of_guards(sog): # first, resolve all generalizations new_sog = set() for g in sog: new_sog = new_sog.union(resolve_all_x(g)) changed = True # blow up the set of guards with all possible generalizations while changed: blown_up_sog = set(new_sog) for guard in new_sog: # for each guard for i, o in enumerate(guard): # for each bit in the guard # construct hypothetical test_guard test_guard = flip_guard_bit(guard, i) # check if the test_guard and the guard are different (they are not if 'X' got "flipped") if test_guard == guard: continue # if the hypothetical guard is not also in the new sog, we cannot reduce test_guard_in_new_sog = False for g in new_sog: if compare_obs(test_guard, g): test_guard_in_new_sog = True if not test_guard_in_new_sog: continue # create a reduced guard reduced_guard = replace_guard_bit(guard, i, 'X') # add it to the reduced sog blown_up_sog.add(reduced_guard) if new_sog == blown_up_sog: changed = False new_sog = blown_up_sog # sog is all blown up with all possible generalizations unnecessary_guards = set() for guard in new_sog: for other_guard in new_sog: # skip yourself if guard == other_guard: continue # check if the guard subsumes the other_guard subsumes = True for j in range(len(guard)): if guard[j] == 'X': continue if other_guard[j] == 'X': subsumes = False if not guard[j] == other_guard[j]: subsumes = False if subsumes: unnecessary_guards.add(other_guard) return new_sog.difference(unnecessary_guards)
def replace_guard_bit(guard, bit, lit)
-
Expand source code
def replace_guard_bit(guard, bit, lit): new_guard = list(guard) new_guard[bit] = lit new_guard = ''.join(new_guard) return new_guard
def resolve_all_x(guard)
-
Expand source code
def resolve_all_x(guard): return_set = set() for i, o in enumerate(guard): if o == 'X': return_set = return_set.union(resolve_all_x(replace_guard_bit(guard, i, '0'))) return_set = return_set.union(resolve_all_x(replace_guard_bit(guard, i, '1'))) if len(return_set) == 0: return_set.add(guard) return return_set
def sog_fits_to_guard(guard, sog, guard_ap, sog_ap)
-
checks if a guard fits to a set of guards, including undecided bits denoted as X
Args
guard
- A single guard. Example 'X01'
sog
- A set of guards Example {'001','101'}
guard_ap
- atomic propositions of the guard
sog_ap
- atomic propositions of the set of guards
Returns
A subset of the set of guards matching to the single guard
Expand source code
def sog_fits_to_guard(guard, sog, guard_ap, sog_ap): """ checks if a guard fits to a set of guards, including undecided bits denoted as X Args: guard: A single guard. Example 'X01' sog: A set of guards Example {'001','101'} guard_ap: atomic propositions of the guard sog_ap: atomic propositions of the set of guards Returns: A subset of the set of guards matching to the single guard """ guards = deepcopy(sog) # check if this guard fits to one of the guards for i, g_value in enumerate(guard): if g_value == 'X': continue if guard_ap[i] in sog_ap: j = sog_ap.index(guard_ap[i]) wrong_guards = [] for guard in guards: if guard[j] != 'X' and guard[j] != g_value: # if a synth guard is not matching to the current config, mark for removal wrong_guards.append(guard) # remove marked guards for wg in wrong_guards: guards.remove(wg) return guards
Classes
class AutomatonPlanner
-
Enables online automaton-based planning of the temporal aspects.
Initializes the AutomatonPlanner
Expand source code
class AutomatonPlanner: """ Enables online automaton-based planning of the temporal aspects. """ def __init__(self): """ Initializes the AutomatonPlanner """ self.ltlf_parser = LTLf2nxParser() self.spatial_dict = {} self.next_free_variable_id = 0 self.temporal_formula = None self.dfa = None self.current_state = None def currently_accepting(self): """ Returns True if the current state is accepting """ return self.current_state in self.dfa.graph['acc'] def reset_state(self): """ Resets the current state to the initial state """ self.current_state = self.dfa.graph['init'] def simulate_trace(self, trace, ap): """ Simulates a run on the automaton, given some inputs and updates the state accordingly Args: trace: The input trace ap: the input atomic propositions """ self.reset_state() for symbol in trace: self.dfa_step(symbol, ap) return self.current_state in self.dfa.graph['acc'] def dfa_step(self, symbol, ap): """ Executes a single step of the automaton Args: symbol: The input symbol ap: the input atomic propositions """ assert isinstance(symbol, str), 'Symbol is of wrong type, should be str! %s' % type(symbol) for succ in self.dfa.successors(self.current_state): # directly applying the first edge that fits works because the dfa is deterministic! if sog_fits_to_guard(symbol, self.dfa.edges[self.current_state, succ]['guard'], ap, self.dfa.graph['ap']): self.current_state = succ break def plan_step(self): """ Returns the desired transition and a selfloop to maintain current state based on the shortest path to any accepting state. Returns: target_next_edge, self_loop_constraint, edge: If no path exists, target_next_edge is None. If target_next_edge is a self-loop (holding accepting state), self_loop_constraint is None. edge refers to the targeted edge in the DFA and can be used as a reference for pruning. """ if self.current_state in self.dfa.graph['acc']: # if we are in an accepting state, hold it! return self.dfa.edges[self.current_state, self.current_state]['guard'], None, None # find shortest path to all accepting states targets = {} path_exists = False for acc_node in self.dfa.graph['acc']: try: targets[acc_node] = nx.shortest_path(self.dfa, source=self.current_state, target=acc_node) path_exists = True except nx.exception.NetworkXNoPath: targets[acc_node] = float('inf') if not path_exists: return None, None, None # find the closest reachable accepting state target = min(targets.items(), key=lambda x: len(x[1])) target_path = target[1] target_next_guards = self.dfa.edges[self.current_state, target_path[1]]['guard'] target_constraint_guards = [] for succ in self.dfa.successors(self.current_state): if succ != target_path[1] and succ != self.current_state: target_constraint_guards.extend(self.dfa.edges[self.current_state, succ]['guard']) return reduce_set_of_guards(target_next_guards), reduce_set_of_guards(target_constraint_guards), (self.current_state, target_path[1]) def tree_to_dfa(self, tree): """ Converts a SpaTiaL tree to a DFA. Args: tree: input tree parsed by spatial lark parser """ self.temporal_formula = self.extract_temporal(tree) self.ltlf_parser.parse_formula(self.temporal_formula) self.dfa = self.ltlf_parser.to_nxgraph() def get_next_variable_string(self): """ Creates a new, unique variable name. Returns: the variable string """ id_base_26 = number_to_base(self.next_free_variable_id, 26) letters = [chr(n + ord('a')) for n in id_base_26] self.next_free_variable_id += 1 return ''.join(letters) def extract_temporal(self, tree): """ Takes a parsed tree and extracts an LTLf formula, creating variables for each "spatial"-type subtree. Keeps dictionaries for mappings between subtrees and variable names. Args: tree: input tree parsed by spatial lark parser Returns: LTLf formula represented as a string. """ f = '' if tree.data == 'temporal': f += self.extract_temporal(tree.children[0]) elif tree.data == 'always': f += 'G(' + self.extract_temporal(tree.children[0]) + ')' elif tree.data == 'eventually': f += 'F(' + self.extract_temporal(tree.children[0]) + ')' elif tree.data == 'until': f += '(' + self.extract_temporal(tree.children[0]) + ') U (' + self.extract_temporal(tree.children[1]) + ')' elif tree.data == 'and_': f += '(' + self.extract_temporal(tree.children[0]) + ') & (' + self.extract_temporal(tree.children[1]) + ')' elif tree.data == 'or_': f += '(' + self.extract_temporal(tree.children[0]) + ') | (' + self.extract_temporal(tree.children[1]) + ')' elif tree.data == 'not_': f += '!(' + self.extract_temporal(tree.children[0]) + ')' elif tree.data == 'implies_': f += '(' + self.extract_temporal(tree.children[0]) + ') -> (' + self.extract_temporal(tree.children[1]) + ')' elif tree.data == 'spatial': element = hash(tree) # compute hash once # map hash to variable name and tree if element not in self.spatial_dict.keys(): variable_string = self.get_next_variable_string() self.spatial_dict[element] = { 'variable': variable_string, 'tree': tree } f += self.spatial_dict[element]['variable'] else: assert False, 'Operator %s not implemented for automaton-based planning!' % tree.data return f def get_variable_to_tree_dict(self): """ Returns: A mapping between temporal variable name and associated spatial tree. """ return_dict = {} for val in self.spatial_dict.values(): return_dict[val['variable']] = val['tree'] return return_dict def get_dfa_ap(self): """ Returns: The atomic propositions of the DFA. """ return self.dfa.graph['ap']
Methods
def currently_accepting(self)
-
Returns True if the current state is accepting
Expand source code
def currently_accepting(self): """ Returns True if the current state is accepting """ return self.current_state in self.dfa.graph['acc']
def dfa_step(self, symbol, ap)
-
Executes a single step of the automaton
Args
symbol
- The input symbol
ap
- the input atomic propositions
Expand source code
def dfa_step(self, symbol, ap): """ Executes a single step of the automaton Args: symbol: The input symbol ap: the input atomic propositions """ assert isinstance(symbol, str), 'Symbol is of wrong type, should be str! %s' % type(symbol) for succ in self.dfa.successors(self.current_state): # directly applying the first edge that fits works because the dfa is deterministic! if sog_fits_to_guard(symbol, self.dfa.edges[self.current_state, succ]['guard'], ap, self.dfa.graph['ap']): self.current_state = succ break
def extract_temporal(self, tree)
-
Takes a parsed tree and extracts an LTLf formula, creating variables for each "spatial"-type subtree. Keeps dictionaries for mappings between subtrees and variable names.
Args
tree
- input tree parsed by spatial lark parser
Returns
LTLf formula represented as a string.
Expand source code
def extract_temporal(self, tree): """ Takes a parsed tree and extracts an LTLf formula, creating variables for each "spatial"-type subtree. Keeps dictionaries for mappings between subtrees and variable names. Args: tree: input tree parsed by spatial lark parser Returns: LTLf formula represented as a string. """ f = '' if tree.data == 'temporal': f += self.extract_temporal(tree.children[0]) elif tree.data == 'always': f += 'G(' + self.extract_temporal(tree.children[0]) + ')' elif tree.data == 'eventually': f += 'F(' + self.extract_temporal(tree.children[0]) + ')' elif tree.data == 'until': f += '(' + self.extract_temporal(tree.children[0]) + ') U (' + self.extract_temporal(tree.children[1]) + ')' elif tree.data == 'and_': f += '(' + self.extract_temporal(tree.children[0]) + ') & (' + self.extract_temporal(tree.children[1]) + ')' elif tree.data == 'or_': f += '(' + self.extract_temporal(tree.children[0]) + ') | (' + self.extract_temporal(tree.children[1]) + ')' elif tree.data == 'not_': f += '!(' + self.extract_temporal(tree.children[0]) + ')' elif tree.data == 'implies_': f += '(' + self.extract_temporal(tree.children[0]) + ') -> (' + self.extract_temporal(tree.children[1]) + ')' elif tree.data == 'spatial': element = hash(tree) # compute hash once # map hash to variable name and tree if element not in self.spatial_dict.keys(): variable_string = self.get_next_variable_string() self.spatial_dict[element] = { 'variable': variable_string, 'tree': tree } f += self.spatial_dict[element]['variable'] else: assert False, 'Operator %s not implemented for automaton-based planning!' % tree.data return f
def get_dfa_ap(self)
-
Returns
The atomic propositions of the DFA.
Expand source code
def get_dfa_ap(self): """ Returns: The atomic propositions of the DFA. """ return self.dfa.graph['ap']
def get_next_variable_string(self)
-
Creates a new, unique variable name.
Returns: the variable string
Expand source code
def get_next_variable_string(self): """ Creates a new, unique variable name. Returns: the variable string """ id_base_26 = number_to_base(self.next_free_variable_id, 26) letters = [chr(n + ord('a')) for n in id_base_26] self.next_free_variable_id += 1 return ''.join(letters)
def get_variable_to_tree_dict(self)
-
Returns
A mapping between temporal variable name and associated spatial tree.
Expand source code
def get_variable_to_tree_dict(self): """ Returns: A mapping between temporal variable name and associated spatial tree. """ return_dict = {} for val in self.spatial_dict.values(): return_dict[val['variable']] = val['tree'] return return_dict
def plan_step(self)
-
Returns the desired transition and a selfloop to maintain current state based on the shortest path to any accepting state.
Returns
target_next_edge, self_loop_constraint, edge
- If no path exists, target_next_edge is None.
If target_next_edge is a self-loop (holding accepting state), self_loop_constraint is None. edge refers to the targeted edge in the DFA and can be used as a reference for pruning.
Expand source code
def plan_step(self): """ Returns the desired transition and a selfloop to maintain current state based on the shortest path to any accepting state. Returns: target_next_edge, self_loop_constraint, edge: If no path exists, target_next_edge is None. If target_next_edge is a self-loop (holding accepting state), self_loop_constraint is None. edge refers to the targeted edge in the DFA and can be used as a reference for pruning. """ if self.current_state in self.dfa.graph['acc']: # if we are in an accepting state, hold it! return self.dfa.edges[self.current_state, self.current_state]['guard'], None, None # find shortest path to all accepting states targets = {} path_exists = False for acc_node in self.dfa.graph['acc']: try: targets[acc_node] = nx.shortest_path(self.dfa, source=self.current_state, target=acc_node) path_exists = True except nx.exception.NetworkXNoPath: targets[acc_node] = float('inf') if not path_exists: return None, None, None # find the closest reachable accepting state target = min(targets.items(), key=lambda x: len(x[1])) target_path = target[1] target_next_guards = self.dfa.edges[self.current_state, target_path[1]]['guard'] target_constraint_guards = [] for succ in self.dfa.successors(self.current_state): if succ != target_path[1] and succ != self.current_state: target_constraint_guards.extend(self.dfa.edges[self.current_state, succ]['guard']) return reduce_set_of_guards(target_next_guards), reduce_set_of_guards(target_constraint_guards), (self.current_state, target_path[1])
def reset_state(self)
-
Resets the current state to the initial state
Expand source code
def reset_state(self): """ Resets the current state to the initial state """ self.current_state = self.dfa.graph['init']
def simulate_trace(self, trace, ap)
-
Simulates a run on the automaton, given some inputs and updates the state accordingly
Args
trace
- The input trace
ap
- the input atomic propositions
Expand source code
def simulate_trace(self, trace, ap): """ Simulates a run on the automaton, given some inputs and updates the state accordingly Args: trace: The input trace ap: the input atomic propositions """ self.reset_state() for symbol in trace: self.dfa_step(symbol, ap) return self.current_state in self.dfa.graph['acc']
def tree_to_dfa(self, tree)
-
Converts a SpaTiaL tree to a DFA.
Args
tree
- input tree parsed by spatial lark parser
Expand source code
def tree_to_dfa(self, tree): """ Converts a SpaTiaL tree to a DFA. Args: tree: input tree parsed by spatial lark parser """ self.temporal_formula = self.extract_temporal(tree) self.ltlf_parser.parse_formula(self.temporal_formula) self.dfa = self.ltlf_parser.to_nxgraph()