Source code for pyvcad_attribute_resolver.solver
from collections import deque
[docs]
class ResolverError(Exception):
"""Base error for the attribute resolver."""
[docs]
class NoPathError(ResolverError):
"""No conversion path found between the available and target attributes."""
[docs]
class AmbiguousPathError(ResolverError):
"""Multiple equally-valid conversion paths found."""
def __init__(self, message, paths=None):
super().__init__(message)
self.paths = paths or []
def solve(graph, available_attributes, target_attribute,
allowed_conversions=None, tags=None, max_depth=10):
"""Find a conversion path from any available attribute to the target.
Uses BFS to find shortest-length paths through the conversion graph.
When multiple shortest paths exist, the one with the highest total
priority is selected. If priorities are tied the call raises
``AmbiguousPathError`` so the user can disambiguate.
Arguments:
graph: ``ConversionGraph`` instance.
available_attributes: Attribute names present on the design.
target_attribute: The desired output attribute.
allowed_conversions: Optional list of ``ConversionEntry`` names
to restrict the search.
tags: Optional list of tags; edges must match *at least one*.
max_depth: Maximum conversion chain length (cycle guard).
Returns:
A list of ``ConversionEntry`` forming the conversion path.
An empty list if the target is already available.
Raises:
NoPathError: No conversion path exists.
AmbiguousPathError: Multiple shortest paths with equal priority.
"""
if target_attribute in available_attributes:
return []
allowed_set = set(allowed_conversions) if allowed_conversions else None
tags_set = set(tags) if tags else None
# BFS state: each queue element is (available_attribute_set, path_so_far).
# Conversions only add new attributes, so there are no cycles as long as
# we refuse to apply edges whose target is already present in the state.
initial_state = frozenset(available_attributes)
queue = deque([(initial_state, [])])
all_paths = [] # paths that reach the target
shortest_len = None # length of the first path found
while queue:
current_state, path = queue.popleft()
# Depth guard
if len(path) >= max_depth:
continue
# If we already found a shortest path and the current path
# is longer, stop exploring (BFS guarantees level-order)
if shortest_len is not None and len(path) >= shortest_len:
continue
candidate_edges = {}
for current_attribute in current_state:
for entry in graph.neighbors(current_attribute):
candidate_edges[entry.name] = entry
for entry in candidate_edges.values():
if allowed_set is not None and entry.name not in allowed_set:
continue
if tags_set is not None and not tags_set.intersection(entry.tags):
continue
if not set(entry.required_inputs).issubset(current_state):
continue
if entry.target in current_state:
continue
new_path = path + [entry]
new_state = frozenset(set(current_state) | {entry.target})
if entry.target == target_attribute:
if shortest_len is None:
shortest_len = len(new_path)
if len(new_path) == shortest_len:
all_paths.append(new_path)
continue
queue.append((new_state, new_path))
if not all_paths:
raise NoPathError(
f"No conversion path found to '{target_attribute}' from "
f"available attributes {list(available_attributes)}"
)
if len(all_paths) == 1:
return all_paths[0]
# Disambiguate by total priority (higher is better)
scored = [(sum(e.priority for e in p), p) for p in all_paths]
scored.sort(key=lambda x: x[0], reverse=True)
if scored[0][0] > scored[1][0]:
return scored[0][1]
# Still tied — report ambiguity
tied_paths = [p for s, p in scored if s == scored[0][0]]
descriptions = []
for p in tied_paths:
chain = " -> ".join(e.name for e in p)
descriptions.append(chain)
raise AmbiguousPathError(
f"Ambiguous conversion to '{target_attribute}': "
f"{len(tied_paths)} paths with equal priority: "
+ "; ".join(descriptions),
paths=tied_paths,
)