'''
Laws
====
*law of fresh*
if x is fresh, then unify(v, x) succeeds and associates x with v
*law of unify*
unify(v, w) is the same as unify(w, v)
*law of conde*
to get more values from conde, pretend that the successful conde line
has failed, refreshing all variables that got an association from that
line
Commandments
============
*second*
to transform a function whose value is not a Boolean into a function
whose value is a goal, add an extra argument to hold its value, replace
`cond` with `conde`, and unnest each question and answer.
>>> from muk.core import *
>>> from muk.ext import *
'''
from collections import namedtuple
from itertools import count
from contextlib import contextmanager
from inspect import signature
from functools import partial, reduce, wraps
from muk.sexp import *
from muk.utils import *
# STATES {{{
state = namedtuple('state', ['sub', 'next_index'])
def emptystate():
return state(sub={}, next_index=0)
@contextmanager
def states_stream(g, initial_state=emptystate()):
α = g(initial_state)
yield α
# }}}
# DIFFERENCE STRUCTURES {{{
class extend_list:
def __init__(self, prefix, var):
self.prefix = prefix
self.var = var
def as_cons(self, translate):
return translate(tuple(self.prefix + [self.var]))
def walk_star(self, W):
prefix = W(self.prefix) # foldr(lambda c, acc: [W(c)]+acc, self.prefix, [])
walked = W(self.var)
if isinstance(walked, extend_list):
return extend_list(prefix + walked.prefix, walked.var)
if isinstance(walked, list):
return prefix + walked
else:
return extend_list(prefix, walked)
def unification(self, other, sub, ext_s, U, E):
try:
UEL = other._unification_extend_list
except AttributeError:
try:
l = len(self.prefix)
if len(other) < l: raise E
other_prefix = other[:l]
other_suffix = other[l:]
except TypeError:
raise E
else:
sub_prefix = U(self.prefix, other_prefix, sub, ext_s)
return U(self.var, other_suffix, sub_prefix, ext_s)
else:
return UEL(self, sub, ext_s, U)
def _unification_extend_list(self, other, sub, ext_s, U):
l = len(self.prefix)
if len(other.prefix) < l:
return other._unification_extend_list(self, sub, ext_s, U)
elif l < len(other.prefix):
other_prefix = other.prefix[:l]
shorter_extend_list = extend_list(prefix=other.prefix[l:], var=other.var)
sub_prefix = U(self.prefix, other_prefix, sub, ext_s)
return U(self.var, shorter_extend_list, sub_prefix, ext_s)
else:
sub_prefix = U(other.prefix, self.prefix, sub, ext_s)
return U(other.var, self.var, sub_prefix, ext_s)
def _unification_difference_list(self, other, sub, ext_s, U):
return other.whole._unification_prefix_extend_list(self, sub, ext_s, U)
def _unification_prefix_extend_list(self, other, sub, ext_s, U):
return U(self.prefix, other, sub, ext_s)
def _unification_exclude(self, other, exclude, sub, ext_s, U):
if exclude == self.var:
return U(self.prefix, other, sub, ext_s)
raise NotImplemented
def reify_s(self, sub, R):
sub_prefix = foldr(lambda c, sub: R(c, sub), self.prefix, sub)
return R(self.var, sub_prefix)
def occur_check(self, u, O, E):
for c in self.prefix:
O(u, c)
return O(u, self.var)
def __eq__(self, other):
try: eq = other.__eq__extend_list
except AttributeError: return False
else: return eq(self)
def __eq__extend_list(self, other):
return self.prefix == other.prefix and self.var == other.var
def __repr__(self):
return '{} + {}'.format(repr(self.prefix), repr(self.var))
def __radd__(self, other):
if isinstance(other, list):
return extend_list(other+self.prefix, self.var)
raise NotImplemented
def __sub__(self, other):
if isinstance(other, var):
return difference_list(whole=self, suffix=other)
raise NotImplemented
class difference_list:
def __init__(self, whole, suffix):
self.whole = whole
self.suffix = suffix
def walk_star(self, W):
whole = W(self.whole)
suffix = W(self.suffix)
return [] if whole == suffix else difference_list(whole, suffix)
def unification(self, other, sub, ext_s, U, E):
try:
UDL = other._unification_difference_list
except AttributeError:
if isinstance(other, list):
try: UP = self.whole._unification_exclude
except AttributeError: raise E
else: return UP(other, self.suffix, sub, ext_s, U)
else:
raise E
else:
return UDL(self, sub, ext_s, U)
def _unification_difference_list(self, other, sub, ext_s, U):
sub_whole = U(other.whole, self.whole, sub, ext_s)
return U(other.suffix, self.suffix, sub_whole, ext_s)
def reify_s(self, sub, R):
sub_whole = R(self.whole, sub)
return R(self.suffix, sub_whole)
def occur_check(self, u, O, E):
raise NotImplemented
for c in self.prefix:
O(u, c)
return O(u, self.var)
def __eq__(self, other):
try: eq = other.__eq__difference_list
except AttributeError: return False
else: return eq(self)
def __eq__difference_list(self, other):
return self.whole == other.whole and self.suffix == other.suffix
def __repr__(self):
return '({}) - {}'.format(repr(self.whole), repr(self.suffix))
# }}}
# VARS {{{
class var:
_subscripts = {'0':'₀', '1':'₁','2':'₂','3':'₃','4':'₄','5':'₅','6':'₆','7':'₇','8':'₈','9':'₉'}
def __init__(self, index, name):
self.index = index
self.name = name
def __eq__(self, other):
try: eq = other.__eq__var
except AttributeError: return False
else: return eq(self)
def __eq__var(self, other):
return self.index == other.index and self.name == other.name
def __hash__(self):
t = self.index, self.name
return hash(t)
def __repr__(self):
return '{}{}'.format(self.name, ''.join(self._subscripts[c] for c in str(self.index)))
def __radd__(self, other):
if isinstance(other, list):
return extend_list(prefix=other, var=self)
raise NotImplemented
def __sub__(self, other):
if isinstance(other, (list, var)):
return difference_list(whole=self, suffix=other)
raise NotImplemented
def walk_star(self, W):
return self
def reify_s(self, sub, R):
return ext_s(self, rvar(len(sub)), sub)
def unification(self, other, sub, ext_s, U, E):
try: UV = other._unification_var
except AttributeError: return ext_s(self, other, sub)
else: return UV(self, sub, ext_s, U)
def _unification_var(self, other_var, sub, ext_s, U):
return sub if self == other_var else ext_s(other_var, self, sub)
def __getattr__(self, name):
if not name.startswith('_unification_'):
raise AttributeError
return lambda other, sub, ext_s, U: ext_s(self, other, sub)
def occur_check(self, u, O, E):
if self == u: raise E
class rvar(var):
def __init__(self, index, reifed_name='▢'):
super().__init__(index, name=reifed_name)
def reify_s(self, sub, R):
return sub
# }}}
# UNIFICATION {{{
class Tautology:
def __eq__(self, other):
return isinstance(other, Tautology)
def __repr__(self):
return type(self).__name__
class UnificationError(ValueError):
pass
[docs]def unification(u, v, sub, ext_s):
'''
Attempts to augment substitution ``sub`` with associations that makes ``u``
unify with ``v``.
'''
u, v = walk(u, sub), walk(v, sub)
try:
return u.unification(v, sub, ext_s, unification, UnificationError)
except (AttributeError, UnificationError): pass
try:
return v.unification(u, sub, ext_s, unification, UnificationError)
except (AttributeError, UnificationError): pass
if (type(u) is tuple and type(v) is tuple) or \
(type(u) is list and type(v) is list):
if len(u) != len(v):
raise UnificationError
else:
return reduce(lambda subr, pair: unification(*pair, subr, ext_s), zip(u, v), sub)
elif u == v:
return sub
raise UnificationError()
# }}}
# SUBSTITUTION {{{
def walk(u, sub):
try:
while True: u = sub[u]
except (TypeError, # to defend against unhashable objs
KeyError): # to defend from "ground" objs and stop iter when `u` is a *fresh* var
return u
def walk_star(v, sub):
v = walk(v, sub)
if hasattr(v, 'walk_star'): return v.walk_star(lambda u: walk_star(u, sub))
elif isinstance(v, (list, tuple)): return [walk_star(u, sub) for u in v]
else: return v
class OccurCheck(ValueError):
pass
def occur_check(ext_s):
@wraps(ext_s)
def E_s(u, v, sub, occur_check=False):
def O(u, v):
v = walk(v, sub)
if hasattr(v, 'occur_check'):
v.occur_check(u, O, OccurCheck)
elif type(v) is tuple or isinstance(v, list):
for vv in v: O(u, vv)
if occur_check: O(u, v)
return ext_s(u, v, sub)
return E_s
@occur_check
def ext_s(u, v, sub):
if False:
if not isinstance(u, var):
raise ValueError("Non `var` obj {} as key in substitution {}".format(u, sub))
if u == v:
raise ValueError('Illegal identity association between {} and {}, according to 9.12 of The Reasoned Schemer'.format(u, v))
if u in sub: # check to ensure consistency of previously unified values
if sub[u] != v: raise UnificationError
else: return sub
e = sub.copy()
e[u] = v
return e
def reify_s(v, sub):
v = walk(v, sub)
if hasattr(v, 'reify_s'): return v.reify_s(sub, reify_s)
elif isinstance(v, list): return reduce(lambda sub, u: reify_s(u, sub), v, sub)
else: return sub
def reify(v):
return walk_star(v, reify_s(v, sub={}))
# }}}
# GOAL CTORS {{{
class goal(object):
def __or__(self, g):
return _disj(self, g, interleaving=True)
def __xor__(self, g):
return _disj(self, g, interleaving=False)
def __matmul__(self, g):
return _conj(self, g, interleaving=True)
def __and__(self, g):
return _conj(self, g, interleaving=False)
def __invert__(self):
return complement(self)
class primitive(goal):
def __init__(self, rule):
self.rule = rule
def __call__(self, s : state):
yield from iter(self.rule(s))
succeed = primitive(rule=lambda s: [s]) # a goal that is satisfied by *any* substitution
fail = primitive(rule=lambda s: []) # a goal that is satisfied by *no* substitution
[docs]class _unify(goal):
'''
Attempts to perform :py:func:`unification <muk.core.unification>` to make
``u`` and ``v`` unifiable given a set of associations for logic variables.
'''
def __init__(self, u, v, ext_s):
self.u = u
self.v = v
self.ext_s = ext_s
def __call__(self, s : state):
try:
sub = unification(self.u, self.v, s.sub, self.ext_s)
except UnificationError:
yield from fail(s)
else:
yield from succeed(state(sub, s.next_index))
[docs]class fresh(goal):
'''
Introduce new logic variables according to the needs of receiver ``f``.
If ``f`` is a thunk then ``fresh`` is equivalent to the inversion of η-rule
as defined in *higher-order* logic::
def η_inverse(t):
def I(s : state):
g = t()
yield from g(s)
return I
having particular application in the definition of *recursive* relations.
'''
def __init__(self, f, arity=None):
self.f = f
if arity:
self.params = [(i, n) for i in range(arity) for n in [chr(ord('a') + i)]]
else:
f_sig = signature(f)
arity = len(f_sig.parameters)
self.params = [(i, v.name) for i, (k, v) in enumerate(f_sig.parameters.items())]
def __call__(self, s : state):
logic_vars = [var(s.next_index+i, n) for (i, n) in self.params]
setattr(self, 'logic_vars', logic_vars) # set the attr in any case, even if `logic_vars == []` because of η-inversion
g = self.f(*logic_vars)
α = g(state(s.sub, s.next_index + len(logic_vars)))
yield from α
[docs]class _disj(goal):
'''
A goal that is satisfiable if *either* goal ``g1`` *or* goal ``g2`` is satisfiable.
Formally, it produces the following states space:
.. math::
\left( \\begin{array}{c}s \stackrel{g_{1}}{\\mapsto} \\alpha \\\\ s \stackrel{g_{2}}{\\mapsto} \\beta \\end{array}\\right)
= \left( \\begin{array}{ccccc}
s_{00} & s_{01} & s_{02} & s_{03} & \\ldots \\\\
s_{10} & s_{11} & s_{12} & s_{13} & \\ldots \\\\
\\end{array}\\right)
enumerated using ``mplus`` according to ``interleaving`` arg.
'''
def __init__(self, g1, g2, *, interleaving):
self.g1 = g1
self.g2 = g2
self.interleaving = interleaving
def __call__(self, s : state):
α, β = self.g1(s), self.g2(s)
yield from mplus(iter([α, β]), self.interleaving)
[docs]class _conj(goal):
'''
A goal that is satisfiable if *both* goal ``g1`` *and* goal ``g2`` is satisfiable.
Formally, it produces the following states space:
.. math::
s \stackrel{g_{1}}{\\mapsto} \\alpha = \left( \\begin{array}{c}s_{0}\\\\s_{1}\\\\s_{2}\\\\ s_{3}\\\\ \\ldots\\end{array}\\right)
\stackrel{bind(\\alpha, g_{2})}{\\mapsto} \left( \\begin{array}{ccccc}
s_{00} & s_{01} & s_{02} & s_{03} & \\ldots \\\\
s_{10} & s_{11} & s_{12} & \\ldots & \\\\
s_{20} & s_{21} & \\ldots & & \\\\
s_{30} & \\ldots & & & \\\\
\\ldots & & & & \\\\
\\end{array}\\right)
'''
def __init__(self, g1, g2, *, interleaving):
self.g1 = g1
self.g2 = g2
self.interleaving = interleaving
def __call__(self, s : state):
α = self.g1(s)
yield from bind(α, self.g2, mplus=partial(mplus, interleaving=self.interleaving))
class complement(goal):
def __init__(self, g):
self.g = g
def __call__(self, s : state):
α = self.g(s)
try:
r = next(α) # r : state
except StopIteration:
yield from succeed(s)
else:
yield from fail(s)
# }}}
# STATE STREAMS {{{
[docs]def mplus(streams, interleaving):
'''
It enumerates the states space ``streams``, using different strategies
according to ``interleaving``.
In order to understand states enumeration can be helpful to use a matrix,
where we associate a row to each stream of states α belonging to
``streams``. Since ``streams`` is an ``iter`` obj over a *countably*,
possibly infinite, set of *states streams*, the matrix could have infinite
rows. In parallel, since each states stream α lying on a row is a ``iter``
obj over a *countably*, possibly infinite, set of *satisfying states*, the
matrix could have infinite columns; therefore, the matrix we are building
could be infinite in both dimensions. So, let ``streams`` be represented as follows:
.. math::
\left( \\begin{array}{ccccc}
s_{00} & s_{01} & s_{02} & s_{03} & \\ldots \\\\
s_{10} & s_{11} & s_{12} & \\ldots & \\\\
s_{20} & s_{21} & \\ldots & & \\\\
s_{30} & \\ldots & & & \\\\
\\ldots & & & & \\\\
\\end{array}\\right)
In order to enumerate this infinite matrix we have the following strategies:
*depth-first*
Enumerates a stream α committed to it until it is saturated or continue to yield its
``state`` objects forever; in other words, it enumerates row by row. For the sake of clarity,
assume the first :math:`k` streams are finite and the :math:`k`-th is the first to be infinite, hence
we are in the following context:
.. math::
\left( \\begin{array}{ccccc}
s_{00} & s_{01} & \\ldots & s_{0i_{0}} \\\\
s_{10} & s_{11} & \\ldots & s_{1i_{1}} \\\\
\ldots_{\\triangle} \\\\
s_{k-1, 0} & s_{k-1,1} & \\ldots & s_{k-1,i_{k-1}} \\\\
s_{k0} & s_{k1} & \\ldots & \ldots & \ldots \\\\
s_{k+1, 0} & \\ldots & & \\\\
\\ldots & & & \\\\
\\end{array}\\right)
so enumeration proceeds as follows: :math:`s_{00}, s_{01},\\ldots,
s_{0i_{0}}, s_{10}, s_{11}, \\ldots, s_{1i_{1}}, \\ldots_{\\triangle}, s_{k-1,0},
s_{k-1, 1},\\ldots, s_{k-1,i_{k-1}}, s_{k0}, s_{k1},\\ldots` yielding from stream :math:`\\alpha_{k}` forever,
*never* reaching :math:`s_{k+1, 0}`.
*breadth-first*
Enumerates interleaving *state* objects belonging to adjacent streams,
lying on the same column; in other words, it enumerates column by
column. If ``streams`` is an iterator over an infinite set of streams,
the it enumerates only the very first ``state`` objects, namely
:math:`s_{00}, s_{10}, s_{20}, s_{30},\ldots`.
The following is a straighforward implementation::
from itertools import chain
while True:
try:
α = next(streams)
except StopIteration:
break
else:
try:
s = next(α) # s : state
except StopIteration:
continue
else:
yield s
streams = chain(streams, [α])
*dovetail*
Enumerates interleaving `state` objects lying on the same *diagonal*,
resulting in a *fair scheduler* in the sense that *every* satisfying
``state`` object will be reached, eventually. For the sake of clarity,
enumeration proceeds as follows:
.. math::
s_{00}, s_{10}, s_{01}, s_{20}, s_{11}, s_{02}, s_{30}, s_{21},
s_{12}, s_{03}, \\ldots
providing a *complete* enumeration strategy.
:param iter stream: an iterator over a *countable* set of ``state``-streams
:param bool interleaving: enumeration strategy selector: *dovetail* if ``interleaving`` else *depth-first*
:return: an ``iter`` object over satisfying ``state`` objects
'''
if interleaving:
try: α = next(streams)
except StopIteration: return
else: S = [α]
while S:
for j in reversed(range(len(S))):
β = S[j]
try: s = next(β)
except StopIteration: del S[j]
else: yield s
try: α = next(streams)
except StopIteration: pass
else: S.append(α)
else:
for α in streams: yield from α
[docs]def bind(α, g, *, mplus):
'''
A stream combinator, it applies goal ``g`` to each ``state`` in stream ``α``.
Such mapping can be *linear* in the sense of vanilla ``map`` application:
.. math::
\\alpha = \left( \\begin{array}{c}s_{0}\\\\s_{1}\\\\s_{2}\\\\ s_{3}\\\\ \\ldots\\end{array}\\right)
\stackrel{map(g, \\alpha)}{\\mapsto} \left( \\begin{array}{ccccc}
s_{00} & s_{01} & s_{02} & s_{03} & \\ldots \\\\
s_{10} & s_{11} & s_{12} & \\ldots & \\\\
s_{20} & s_{21} & \\ldots & & \\\\
s_{30} & \\ldots & & & \\\\
\\ldots & & & & \\\\
\\end{array}\\right)
After the mapping obj is built, it is consumed as argument by ``mplus`` to
enumerate the states space.
Moreover, a recursive implementation can be written as found in *The
Reasoned Schemer*, but in some cases it raises ``RecursionError`` due to
Python limitation on stack usage::
try:
s = next(α) # s : state
except StopIteration:
yield from fail()
else:
β = g(s)
γ = bind(α, g, mplus)
yield from mplus(iter([β, γ]))
which produces the following states space:
.. math::
\\alpha = \left( \\begin{array}{c}s_{0}\\\\s_{1}\\\\s_{2}\\\\ s_{3}\\\\ \\ldots\\end{array}\\right)
\stackrel{bind(\\alpha, g)}{\\mapsto} \left( \\begin{array}{l}
s_{00} \, s_{01} \, s_{02} \, s_{03} \, s_{04} \,s_{05} \,\\ldots \\\\
\left( \\begin{array}{l}
s_{10} \, s_{11} \, s_{12} \, \\ldots \, \\\\
\left( \\begin{array}{l}
s_{20} \, s_{21} \, \\ldots \, \, \\\\
\left( \\begin{array}{l}
s_{30} \, \\ldots \, \, \, \\\\
\\ldots \, \, \, \, \\\\
\\end{array}\\right)
\\end{array}\\right)
\\end{array}\\right)
\\end{array}\\right)
assuming we want to enumerate using interleaving:
.. math::
s_{00}, s_{10}, s_{01}, s_{20}, s_{02}, s_{11}, s_{03}, s_{30}, s_{04}, s_{12}, s_{05}, s_{21}\ldots
which, although *complete*, is *unbalanced* in favor of first streams.
:param iter α: an iterator over a *countable* set of ``state`` objects
:param goal g: a relation to be satisfied
:param callable mplus: states space enumeration strategy
:return: an ``iter`` object over satisfying ``state`` objects
'''
yield from mplus(map(g, α))
# }}}
# INTERFACE {{{
[docs]def run(goal,
n=False,
var_selector=lambda *args: args[0],
post=lambda r: cons_to_list(r) if isinstance(r, cons) else r):
'''
Looks for a list of at most ``n`` associations ``[(u, v) for v in ...]``
such that when var ``u`` takes value ``v`` the relation ``goal`` is
satisfied, where ``u`` is the *main var* respect the whole ``run``
invocation; otherwise, it enumerates the relation if ``n`` is ``False``.
:param goal: the relation to be satisfied respect to the main var according to :py:obj:`var_selector`.
'''
with states_stream(goal) as α:
domain = range(n) if n else count()
subs = [a.sub for i, a in zip(domain, α)]
logic_vars = getattr(goal, 'logic_vars', None) # defaults to `None` instead of `[]` to distinguish attr set by `_fresh`
m_var = var_selector(*logic_vars) if logic_vars else Tautology() # any satisfying sub is a Tautology if there are no logic vars
def λ(sub):
w_var = walk_star(m_var, sub) # instantiate every content in the expr associated to `main_var` in `sub` to the most specific value
r_var = reify(w_var)
return post(r_var)
return list(map(λ, subs))
# }}}