potassco / clingo

🤔 A grounder and solver for logic programs.
https://potassco.org/clingo
MIT License
601 stars 79 forks source link

Extend safety by computing intervals from comparisons #366

Closed rkaminsk closed 2 years ago

rkaminsk commented 2 years ago

TODO: check ignoreIfFixed logic.

We canuse clingcon's bound propagation algorithm to compute the intervals from comparisons. The code below shows how intervals for a body containing 1<X<Y, X+Y<=100 can be computed:

from typing import Callable, Dict, List, Optional, Tuple
from dataclasses import dataclass

@dataclass
class Term:
    coefficient: int
    variable: str

    def __str__(self) -> str:
        if self.coefficient == 1:
            return self.variable
        if self.coefficient == -1:
            return f'-{self.variable}'
        return f'{self.coefficient}*{self.variable}'

@dataclass
class Inequality:
    terms: List[Term]
    bound: int

    def __str__(self) -> str:
        e = ' + '.join(str(t) for t in self.terms) if self.terms else '0'
        return f'{e} >= {self.bound}'

def floor_div(a: int, b: int) -> int:
    return a // b

def ceil_div(a: int, b: int) -> int:
    return (a - 1) // b + 1

def update_bound(lb: Dict[str, int], ub: Dict[str, int], slack: int, num_unbounded: int,
                 select: Callable[[int, int], int], div: Callable[[int, int], int], term: Term) -> bool:
    if num_unbounded == 0:
        slack += term.coefficient * ub[term.variable]
    elif num_unbounded > 1 or term.variable in ub:
        return False

    value = div(slack, term.coefficient)
    if term.variable in lb:
        new_val = select(value, lb[term.variable])
        changed = new_val != lb[term.variable]
    else:
        new_val = value
        changed = True
    lb[term.variable] = new_val

    return changed

def update_slack(lub: Dict[str, int], term: Term, slack: int, num_unbounded: int) -> Tuple[int, int]:
    if term.variable in lub:
        slack -= term.coefficient * lub[term.variable]
    else:
        num_unbounded += 1

    return slack, num_unbounded

def compute_bounds(iqs: List[Inequality]) -> Optional[Tuple[Dict[str, int], Dict[str, int]]]:
    lb: Dict[str, int] = dict()
    ub: Dict[str, int] = dict()

    changed = True
    while changed:
        changed = False
        for iq in iqs:
            # compute slack and number of unbounded terms
            slack, num_unbounded = iq.bound, 0
            for term in iq.terms:
                if term.coefficient > 0:
                    slack, num_unbounded = update_slack(ub, term, slack, num_unbounded)
                else:
                    slack, num_unbounded = update_slack(lb, term, slack, num_unbounded)

            # the inequalities cannot be satisfied
            if num_unbounded == 0 and slack > 0:
                return None

            # propagate if there is at most one unbounded term
            if num_unbounded <= 1:
                for term in iq.terms:
                    if term.coefficient > 0:
                        changed = update_bound(lb, ub, slack, num_unbounded, max, ceil_div, term) or changed
                    else:
                        changed = update_bound(ub, lb, slack, num_unbounded, min, floor_div, term) or changed

    return lb, ub

def test():
    # X>1, Y>X, X+Y <= 100
    i1 = Inequality([Term(1, "X")], 2)
    i2 = Inequality([Term(1, "Y"), Term(-1, "X")], 1)
    i3 = Inequality([Term(-1, "X"), Term(-1, "Y")], -100)
    print('rule body:')
    print('  1<X<Y, X+Y<=100')
    print('inequalities:')
    print(f'  {i1}')
    print(f'  {i2}')
    print(f'  {i3}')
    res = compute_bounds([i1, i2, i3])
    if res:
        print("ranges:")
        lb, ub = res
        for var in sorted(lb):
            if var in ub:
                print(f'  {var} = {lb[var]}..{ub[var]}')
    else:
        print('the inequalities cannot be satisfied')

test()

The test code outputs:

rule body:
  1<X<Y, X+Y<=100
inequalities:
  X >= 2
  Y + -X >= 1
  -X + -Y >= -100
ranges:
  X = 2..97
  Y = 3..98

The task to extract linear inequalities from rule bodies is straightforward.