Source code for resmt.datatypes

"""
datatypes.py
==============

This module contains the core classes and methods for defining Z3 representations of datatypes for which \
 rewriting is possible in ``ReSMT`` (with typed directed graphs currently the main datatype implemented).



"""

#####################################################################################

# %% setup of logging tool for providing various level of feedback to the user:
import logging

# NOTE: in order to alter the behavior of the logging tool *after* having once
# executed the code below, it is necessary to restart Python and load the module
# again! (-> possibly find a nicer method to allow "on-the-fly" changes...)
logging.basicConfig(format='%(asctime)s.%(msecs)03d  %(message)s',
                    filename='datatypes.log',
                    # filemode='w',
                    # datefmt='%d/%m/%y %H:%M:%S',
                    datefmt='%H:%M:%S',
                    level=logging.CRITICAL)  # set to logging.INFO for more verbose mode
# uncomment the following if an output to the console is also desired:
# logging.getLogger().addHandler(logging.StreamHandler())

logging.info('---------->datatypes.py module loaded<----------')

#####################################################################################

# %% Initialization:
# support for intelligent type-aliases
from typing import Any, Dict, List, Set, Tuple, NamedTuple, TypeVar, Type, Optional

import os
import json
from pprint import pformat
from IPython.display import SVG, display

# set the interactive Python shell to recognize the current working directory:
import sys

sys.path.append(os.getcwd())

from collections import OrderedDict, Counter
import timeit
import numpy
import networkx as nx
import z3
import mpmath

from jinja2 import FileSystemLoader
from latex.jinja2 import make_env
from latex import build_pdf

import subprocess
import signal

from network2tikz import plot
import seaborn as sns

#### some nice available styles:
# 'seaborn-whitegrid', 'seaborn-pastel', 'seaborn'
####################################

# for plotting runtimes etc.:
import matplotlib
import matplotlib.pyplot as plt
from matplotlib import style
style.use('seaborn-white')
from mpl_toolkits.mplot3d import Axes3D

# allow for TeX forumlas in labels etc.:
# source: https://stackoverflow.com/questions/46259617/how-does-one-use-latex-amsmath-with-matplotlib
matplotlib.rc('text', usetex=True)
matplotlib.rc('font', **{'family': "sans-serif"})

# method compatible with matplotlib >= 3.3:
latexPreambleParams = [r'\usepackage{amsmath,amssymb}',
                                  r'\usepackage[english]{babel}',
                                  r'\usepackage[utf8]{inputenc}',
                                  r'\usepackage{sansmathfonts}',
                                  r'\usepackage[scaled=0.95]{helvet}',
                                  r'\renewcommand{\rmdefault}{\sfdefault}',
                                  r'\usepackage[T1]{fontenc}']

params = {'text.latex.preamble': '\n'.join(map(str, latexPreambleParams))}

plt.rcParams.update(params)
##########################################################################################################

# %%


[docs]class Z3TypedSet: """Encode sets of elements with different types. A ``Z3TypedSet`` object is a data structure that serves the dual purpose of implementing an\ encoding of a set within an instance of a Z3 solver, and at the same time providing access\ to a number of attributes storing information important for the algorithms defined in this\ package. .. note:: Since in some of the algorithms (e.g. in the definition of functions between tyoed sets)\ it is necessary to assert that only constants of the same "global" sort are mappable to each\ other, we store the name of the set and the name of the sorts of the elements provided as\ input at initialization in two different fields. Then for the set at hand, the Z3 sorts\ are named ``setName+'|'+elSortName``, to ensure that the sort universe is closed for each set.\ In declarations of functions with domain some typed set `A` and codomain some typed set `B`, \ one may then emulate a "global" type mapping constraint via asserting that only constants\ of type ``setAname+'|'+elSortName`` may be mapped to constants of type \ ``setBname+'|'+elSortName`` (with additional issues related to nil-carriers etc. discussed\ in the documentation of the typed function class). Attributes: solv: a Z3 solver instance provided at initialization setName: a string value containing the name of the set sortNames: the "global" names of the sorts of the elements els: a dictionary of element names as keys and "global" element sort names as values sorts: a dictionary with "local" sort names as keys and the corresponding Z3 sort declaration\ references as values (where the "local" names are defined as prepending the ``setName`` to\ each "global" sort name in ``elSortNames``) subsets: a dictionary with "global" sort names as keys and Z3 enumeration sort instances \ (one for each subset) as values subsetEls: a dictionary with "global" sort names as keys and the different Z3 constants of the \ Z3 enumeration sort for the corresponding sort as values (with the constants representing \ elements of the given sort) subsetNamesToEls: a dictionary with "global" sort names as keys, and with **ordered** \ dictionaries of element name/Z3 constant entries (in order to permit accessing Z3 constants \ by element names) nilEls: a dictionary with sort names as keys and Z3 constants as values \ (representing nil-elements) """
[docs] def __init__(self, solv: z3.Solver, setName: str, els: Dict[str, str]) -> None: """Instantiate the Z3 set data from a dictionary of element names and types. Examples: >>> S = z3.Solver() >>> setA = Z3TypedSet(S, 'setA', {'a':'elSort1', 'b':'elSort1', 'c':'elSort2'}) An example of an empty set: >>> emptySet = Z3TypedSet(S, 'setB', {}) Args: solv: the handle to a Z3 solver instance (automatically passes the current context) setName: the name the Z3 sort for the set els: dictionary of element names and types (given as strings) Returns: Setting up the Z3TypedSet object. Moreover, a side-effect of executing the method \ consists in introducing the necessary definitions of Z3 sorts for the different \ element sorts and nil-elements for each of the element sorts. .. note:: If an empty ``els`` dictionary is specified, the method instantiates a Z3 \ constant nonetheless for a nil-element whose sort is named ``'_DEFAULT'``, since \ fuhnctions must be total in Z3, which is why nil-elements are neeeded even for empty \ sets. .. admonition:: Z3 Python API programming details In order to reference the context of a solver instance ``solv``, it suffices to declare \ the sorts within the context ``solv.ctx`` of ``solv``. All other directives (constant \ or function declarations, assertions etc.) will automatically inherit their context from \ the sorts. .. admonition:: Python 3 programing gems In some of the constructs here and elsewhere of the code, we use a new language feature in\ Python 3 (available in Python version > 3.5) that permits to merge two dictionaries \ *without* having to update one into the other \ (cf. this `StackExchange article <https://stackoverflow.com/a/26853961/703382>`_). \ Given dictionaries ``x`` and ``y``, the merge of the two is obtained as ``{**x, **y}``. """ self.solv = solv self.name = setName # depending on whether or not the input data encodes an empty set, # store the elements data into fields: if (not els) or (els == {'nil': '_DEFAULT'}): logging.info('Empty set definition detected.') self.els = {} self.sorts = {'_DEFAULT'} else: # drop the nil-elements from the list of elements (as these will be treated separately below) logging.info('Non-empty set definition detected.') self.sorts = set(els.values()) self.els = {elName: srtName for elName, srtName in els.items() if elName != 'nil'} logging.info('\n\t element and sort name dictionary:\n\t %s' % self.els) # for each sort, instantiate Z3 enumeration sorts and constants for the elements: self.subsets = {} self.subsetEls = {} self.subsetNamesToEls = {} self.nilEls = {} ctx = solv.ctx for srt in self.sorts: elNames = ['nil|'+srt]+[elN for elN, srtName in self.els.items() if srtName == srt] logging.info('\n\t element names for sort\t%s:\n\t %s' % (srt,elNames)) self.subsets[srt], self.subsetEls[srt] = \ z3.EnumSort(setName + '|' + srt, tuple(elNames), ctx) logging.info('\n\t Z3 sort for all elements of this sort:\n\t %s' % (self.subsets[srt])) logging.info('\n\t element names for this sort:\n\t %s' % (self.subsetEls[srt])) # for convenience: a dictionary for accessing Z3 constants by name logging.info('\n\t dictionary of element names to Z3 constants:\n\t %s' % (dict(zip(elNames, self.subsetEls[srt])))) self.subsetNamesToEls[srt] = dict(zip(elNames, self.subsetEls[srt])) # store the nil-element of the sort into the nilEls dictionary: self.nilEls[srt] = self.subsetNamesToEls[srt]['nil|' + srt]
[docs] def getEl(self, elName: str) -> z3.ExprRef: """Get the Z3 constant associated to an element name.""" if elName[:3] == 'nil': elSrt = elName[4:] else: elSrt = self.els[elName] return self.subsetNamesToEls[elSrt][elName]
[docs] def getElandSrtName(self, elName: str) -> Tuple[z3.ExprRef, str]: """Get both the Z3 constant and the name of the element sort associated to an element name.""" if elName[:3] == 'nil': elSrt = elName[4:] else: elSrt = self.els[elName] return self.subsetNamesToEls[elSrt][elName], elSrt
[docs] def patchNilElements(self, setB: 'Z3TypedSet') -> None: """Patch nil-elements. Given a second ``Z3TypedSet`` instance ``setB``, extract the sorts present in ``setB`` but not \ in the original set, and then add extra nil-elements to the original set for each of these \ extra sorts. Args: setB: a ``Z3TypedSet`` instance whose additional sorts should be "patched" into the ``self`` \ insstance (and suitable nil-elements added, one for each additional sort) Example: >>> S = z3.Solver() >>> setA = Z3TypedSet(S, 'setA', {'a':'elSort1', 'b':'elSort1', 'c':'elSort2'}) >>> emptySet = Z3TypedSet(S, 'setB', {}) >>> emptySet.patchNilElements(setA) """ extraSorts = setB.sorts - self.sorts self.sorts = self.sorts.union(extraSorts) ctx = self.solv.ctx for srt in extraSorts: elNames = ['nil|' + srt] self.subsets[srt], self.subsetEls[srt] = z3.EnumSort(self.name + '|' + srt, tuple(elNames), ctx) self.subsetNamesToEls[srt] = dict(zip(elNames, self.subsetEls[srt])) self.nilEls[srt] = self.subsetNamesToEls[srt]['nil|' + srt]
[docs]class Z3TypedSetTemplate(Z3TypedSet): """A templating class for sets of elements with different types. Mainly for purposes of computing pushouts, this class emulates closely the structure of the \ ``Z3TypedSet`` class, yet forgoes the *range closure* part of the initialization routine. \ More precisely, while the initialization method of the ``Z3TypedSet`` class relies upon the \ ``z3.EnumSort`` functionality of the ``Z3`` API, ``Z3TypedSetTemplate`` instances are initialized \ via 1) issuing a ``z3.DeclareSort`` call for each sort of elements, and 2) only explicitly \ instantiating the nil-elements as ``Z3`` constants for each sort. In consequence, instances of \ the ``Z3TypedSetTemplate`` are useful in situations such as pushout constructions where the \ concrete non-nil element content of a given respective typed set is only determined at runtime \ of the algorithm. Attributes: solv: a Z3 solver instance provided at initialization setName: a string value containing the name of the set sortNames: the "global" names of the sorts of the elements els: a dictionary of element names as keys and "global" element sort names as values sorts: a dictionary with "local" sort names as keys and the corresponding Z3 sort declaration\ references as values (where the "local" names are defined as prepending the ``setName`` to\ each "global" sort name in ``elSortNames``) subsets: a dictionary with "global" sort names as keys and Z3 enumeration sort instances \ (one for each subset) as values subsetEls: a dictionary with "global" sort names as keys and the different Z3 constants of the \ Z3 enumeration sort for the corresponding sort as values (with the constants representing \ elements of the given sort) subsetNamesToEls: a dictionary with "global" sort names as keys, and with **ordered** \ dictionaries of element name/Z3 constant entries (in order to permit accessing Z3 constants \ by element names) nilEls: a dictionary with sort names as keys and Z3 constants as values \ (representing nil-elements) """
[docs] def __init__(self, solv: z3.Solver, setName: str, elSorts: Set[str]) -> None: """Instantiate the Z3 typed set template data from a set of element types. Examples: >>> S = z3.Solver() >>> setA = Z3TypedSetTemplate(S, 'setA', {'elSort1', 'elSort2'}) Args: solv: the handle to a Z3 solver instance (automatically passes the current context) setName: the name the Z3 sort for the set elSorts: a set of element types (given as strings) Returns: Setting up the ``Z3TypedSetTemplate`` object, with the side-effect of introducing the \ necessary definitions of ``Z3`` sorts for the different element sorts and ``Z3`` constants \ for the nil-elements of each of the element sorts. """ self.solv = solv self.name = setName self.sorts = elSorts # the following definition of an *empty* self.els dictionary is introduced for compatibility # of the fields of this class with the structure of fields in the Z3TypedSet class: self.els = {} logging.info('\n\t Template for typed set instantiated for the following sorts:\n\t %s' % self.sorts) # for each element sort, instantiate a Z3 sort as well as a nil-element constant of that sort: self.subsets = {} self.subsetEls = {} self.subsetNamesToEls = {} self.nilEls = {} ctx = solv.ctx for srt in self.sorts: z3Srt = z3.DeclareSort(setName + '|' + srt, ctx) z3NilConst = z3.Const('nil|'+srt, z3Srt) self.subsets[srt] = z3Srt self.subsetEls[srt] = z3NilConst elNames = ['nil|'+srt] logging.info('\n\t element names for sort\t%s:\n\t %s' % (srt, elNames)) logging.info('\n\t Z3 sort for all elements of this sort:\n\t %s' % (self.subsets[srt])) logging.info('\n\t element names for this sort:\n\t %s' % (self.subsetEls[srt])) # for convenience: a dictionary for accessing Z3 constants by name self.subsetNamesToEls[srt] = {'nil|'+srt: z3NilConst} logging.info('\n\t dictionary of element names to Z3 constants:\n\t %s' % (self.subsetNamesToEls[srt])) # store the nil-element of the sort into the nilEls dictionary: self.nilEls[srt] = z3NilConst
[docs]class Z3TypedFunction: """A function of Z3TypedSet objects as domain and codomain. .. note: A priori, we do *not* make any assumptions on the nature of the functions \ other than that they must be total (albeit the latter out of strict necessity). Moreover, \ in order to provide an intuitive API, nil-elements are automatically mapped to each other, in \ the sense that for each domain/codomain element pair given in ``fData``, the sorts involved are \ analyzed, and a nil-element for each sort is instantiated if needed; then the mapping of the nil-\ elements is asserted. It is left to the user however to take care in implementing additional \ properties or mapping consistency, since e.g. injectivity is only tested/asserted as a property \ that holds for each sort-mapping subfunction. """
[docs] def __init__(self, solv: z3.Solver, fName: str, domSet: Z3TypedSet, codomSet: Z3TypedSet, fData: Dict[str, str], isInjective: bool = False, isSurjective: bool = False) -> None: """Initialization of a Z3TypedFunction object. Example: >>> S = z3.Solver() >>> setD = Z3TypedSet(S, 'domSet', {'a1':'sortX', 'a2':'sortX', 'a3': 'sortY'}) >>> setC = Z3TypedSet(S, 'codomSet', {'b1':'sortX', 'b2':'sortX', 'b3': 'sortY', 'b4':'sortZ'}) >>> f = Z3TypedFunction(S,'f', setD, setC, {'a1':'b2', 'a2':'b1','a3':'b4'}, isInjective = True) This renders both the respective ``Z3`` object instnces as well as the requisite assertions \ in order to faithfully encode a function of typed sets. As a special case, consider below \ as another example a function of empty sets: >>> emptySet = Z3TypedSet(S, 'emptySet', {}) >>> g = Z3TypedFunction(S,'g', emptySet, emptySet, {}, isInjective = True) Args: solv: a Z3 solver instance fName: function name (as string) domSet: a ``Z3TypedSet`` instance encoding the domain codomSet: a ``Z3TypedSet`` instance encoding the codomain fData: a dictionary with entries key/value pairs of names of domain/codomain elements, \ encoding the function itself isInjective: (optional) boolean parameter declaring whether or not the function is \ injective (default: ``False``) isSurjective: (optional) boolean parameter declaring whether or not the function is \ surjective (default: ``False``) """ # verify that all non-nil elements explicitly referenced in fData are also present in domSet/codomSet: dEls = {key for key in fData.keys() if key[:3] != 'nil'} cEls = {val for val in fData.values() if val[:3] != 'nil'} if (not set(domSet.els.keys()).issuperset(dEls)) or (not set(codomSet.els.keys()).issuperset(cEls)): raise ValueError('There are missing elements either in domSet or in codomSet!') # a special case is represented when a function of empty sets is specified; in this case, # we render internally a special case of fData value mapping nil-elements: if (domSet.sorts == {'_DEFAULT'}) and (codomSet.sorts == {'_DEFAULT'}) and not bool(fData): # case of an empty set mapping detected! self.fData = {'nil|_DEFAULT': 'nil|_DEFAULT'} else: self.fData = fData # define a filtering of fData to non-nil elements: fDataNonNil = {key: val for key, val in fData.items() if key[:3] != 'nil'} logging.info('\nNon-nil element mapping:\n\t%s' % fDataNonNil) # define a filtering of fData to nil elements only: fDataNilOnly = {key: val for key, val in fData.items() if key[:3] == 'nil'} logging.info('\nNil element mapping:\n\t%s' % fDataNilOnly) self.solv = solv self.domSet = domSet self.codomSet = codomSet # log an info message if the function encoded in fData is not total: if fDataNonNil.keys() != domSet.els.keys(): logging.info('Non-total function specification detected.') # determine which domain/codomain sort pairs are mapped by the function: if self.fData == {'nil|_DEFAULT': 'nil|_DEFAULT'}: # special case of a mapping of empty sets! self.srtMaps = {('_DEFAULT', '_DEFAULT')} else: nonNilSrtMaps = set({(domSet.els[d], codomSet.els[c]) for d, c in fDataNonNil.items()}) nilOnlySrtMaps = set({(d[4:], c[4:]) for d, c in fDataNilOnly.items()}) self.srtMaps = nonNilSrtMaps.union(nilOnlySrtMaps) logging.info('\nSort mappings:\n\t%s' % self.srtMaps) # verify that fData contains only one codomain set sort per domain sort: # taking advantage of the Python 3 semantics, auXSrts as defined below will only # record one codomain sort per domain sort (i.e. the last one defined if there are multiple): auxSrtsNonNil = {domSet.els[d]: codomSet.els[c] for d, c in fDataNonNil.items()} # in the dictionary definition below, d[4:] and c[4:] are the names of the sorts! auxSrtsNilOnly = {d[4:]: c[4:] for d, c in fDataNilOnly.items()} auxSrts = {**auxSrtsNonNil, **auxSrtsNilOnly} if {(dSrt, cSrt) for dSrt, cSrt in auxSrts.items()} != self.srtMaps: raise ValueError('Sort mismatch error: In the present version of the algorithm, only one ' + 'codomain sort may be mapped per domain sort.') logging.info('\n\t sort maps dictionary:\n\t %s' % self.srtMaps) # instantiate a Z3 Function object for each sort pairing: self.sfNames = {} # names of the subfunctions self.subFunctions = {} # Z3 Function instances self.asts = [] # list of assertions for function properties for dSrt, cSrt in self.srtMaps: sfName = '%s|%s|%s' % (fName, dSrt, cSrt) self.sfNames[(dSrt, cSrt)] = sfName logging.info('\n--| Subfunction name:\t %s\n' % self.sfNames[(dSrt, cSrt)]) subFunction = z3.Function(sfName, domSet.subsets[dSrt], codomSet.subsets[cSrt]) self.subFunctions[(dSrt, cSrt)] = subFunction logging.info('\n--| Updated subfunctions dictionary:\n\t%s\n' % self.subFunctions) # as a minimal requirement, nil-elements must be mapped: nilD = domSet.nilEls[dSrt] nilC = codomSet.nilEls[cSrt] self.asts += [subFunction(nilD) == nilC] # if the function is not injective, we must assert that no other element # can be mapped to the nil-element: if not isInjective: x = z3.Const('x|' + dSrt, domSet.subsets[dSrt]) self.asts += [z3.ForAll(x, z3.Implies(subFunction(x) == nilC, x == nilD))] # else (i.e. if the function is injective), add this assertion to the self.asts list: else: y1 = z3.Const('y1|' + dSrt, domSet.subsets[dSrt]) y2 = z3.Const('y2|' + dSrt, domSet.subsets[dSrt]) self.asts += [z3.ForAll([y1, y2], z3.Implies( subFunction(y1) == subFunction(y2), y1 == y2) )] logging.info('\n--| injectivity assertions included:\n\t%s\n' % self.asts) # assertions for surjectivity in case that isInjective == True: if isSurjective: logging.info('\n--| entering the assmebly of surjectivity assertions...\n') for cSrt in codomSet.sorts: sAsts = [] y = z3.Const('y|' + cSrt, codomSet.subsets[cSrt]) # NOTE: as we have asserted that there must be specified only one codomain # type per domain type in the function mapping, the following code # unambiguously recovers precisely that domain type dSrt = [dSort for dSort, cSort in self.srtMaps if cSort == cSrt][0] z = z3.Const('z|' + dSrt, domSet.subsets[dSrt]) sAsts += [z3.Exists(z, self.subFunctions[(dSrt, cSrt)](z) == y)] self.asts += [z3.ForAll(y, z3.Or(sAsts))] logging.info('\n--| surjectivity assertions included:\n\t%s\n' % self.asts) # "populate" the function (i.e. assert the concrete mappings encoded in fData): logging.info('\n--| asserting the explicitly specified function mappings...\n') self.asts += [self.f(d) == codomSet.getEl(c) for d, c in fDataNonNil.items()] logging.info('\n--| function assertions included:\n\t%s\n' % self.asts) # pass all assertions to the solver instance: solv.add(self.asts)
[docs] def f(self, elName: str) -> z3.ExprRef: """Retrieve the Z3 value of a function application by domain element name.""" dEl, dSrt = self.domSet.getElandSrtName(elName) cSrt = dict(self.srtMaps)[dSrt] if elName[:3] == 'nil': return self.codomSet.nilEls[cSrt] else: return self.subFunctions[(dSrt, cSrt)](dEl)
[docs]class Z3TypedDirectedGraph: """Representation of typed directed graphs in Z3. Following the convention "one vertex-type source/tarhet signature per edge type", \ typrd directed multigtraphs may be specified via providing the following data: 1. Names of vertices and edges (int he form of two dictionaries of strings, \ with the entry format ``'elName':'elSort'``). 2. Source-target incidence dictionaries (of the entry format \ ``'edgeName':('sourceVname', 'targetVname')``). Examples: >>> S = z3.Solver() >>> vDictA = {'v1':'sortX', 'v2': 'sortX', 'v3': 'sortY'} >>> eDictA = {'e1':'sortW', 'e2': 'sortT'} >>> stDictA = {'e1':('v1','v2'), 'e2':('v3','v1')} >>> graphA = Z3TypedDirectedGraph(S, 'gA', vDictA, eDictA, stDictA) This will render a representation of a typed directed graph with three vertices, \ two edges and incidences as specified in the ``stDictz`` dictionary. Note that since \ in this example the two edges have different types, each of them is permitted to \ have a different "type-signature" in terms of their source- and target-vertex types/ >>> gB = Z3TypedDirectedGraph(S, 'gEmpty', {}, {}, {}) This will render a representation of an empty graph. Special default types are \ utilized in order to give the nil-elements an invariant type, and the nil-element \ of the vertex set is both the source and target "vertex" of the nil-edge. Attributes: solv: a ``Z3`` solver instance name: the name of the TDG instance stDict: the source/target vertex incidence dictionary vertices: a ``Z3TypedSet`` instances representing the vertex set edges: a ``Z3TypedSet`` instance representing the edge set src: a ``Z3TypedFunction`` representing the source function trgt: a ``Z3TypedFunction`` representing the targetß function stSortsDct: a dictionary with edge-sorts as keys and tuples of source/target \ vertex sorts as values """
[docs] def __init__(self, solv: z3.Solver, gName: str, vDict: Dict[str, str], eDict: Dict[str, str], stDict: Dict[str, Tuple[str, str]]) -> None: """Initialization of a Z3TypedDirectedGraph object. Args: solv: Z3 solver instance gName: the name of the graph vDict: a dictionary of vertex name/sort name pairs eDict: a dictionary of edge name/sort name pairs stDict: a dictionary with entries of the form ``e: (s, t)``, where ``e`` is \ the name of an edge, while ``s`` and ``t`` are the names of the source/target vertices \ of the edge ``e`` """ # input data consistency checks: # we do not allow "endpoint-less" edges, i.e. if vDict = {}, but eDict != {}, this is an input error: if not bool(vDict) and bool(eDict): raise ValueError('Empty vertex set despite of non-empty edge set detected!') self.solv = solv self.name = gName self.stDict = stDict # special case: non-empty vertex set, but empty edge set if bool(vDict) and not bool(eDict): vDict = {**vDict, **{'nil': '_DEFAULT'}} eDict = {'nil': '_DEFAULT'} stDict = {'nil|_DEFAULT': ('nil|_DEFAULT', 'nil|_DEFAULT')} # set up Z3TypedSet instances for sets of vertices and edges: self.vertices = Z3TypedSet(solv, gName + '|V', vDict) self.edges = Z3TypedSet(solv, gName + '|E', eDict) # set up Z3TypedFunction instances for source and target maps: self.src = Z3TypedFunction(solv, gName + '|src', self.edges, self.vertices, {e: vs[0] for e, vs in stDict.items()}, isInjective=False) self.trgt = Z3TypedFunction(solv, gName + '|trgt', self.edges, self.vertices, {e: vs[1] for e, vs in stDict.items()}, isInjective=False) # for later convenience: dictionary of edge-to-source/target vertex sort "signatures": # special case: non-empty vertex set, but empty edge set if bool(vDict) and eDict == {'nil': '_DEFAULT'}: self.stSortsDct = {'_DEFAULT': ('_DEFAULT', '_DEFAULT')} else: self.stSortsDct = {self.edges.els[e]: (self.vertices.els[vs[0]], self.vertices.els[vs[1]]) for e, vs in stDict.items()}
[docs] def patchNilElements(self, gB: 'Z3TypedDirectedGraph') -> None: """Patch nil-elements for vertex and edge sorts not in original graph. As an additional effect to merely adding the nil-elements to the vertex and edge sets, \ the method also updates the source and target functions (s.th. the new nil-edges have \ source and target as nil-elements of the sort "signature" of edges as in ``gB``). Args: gB: a ``Z3TypedDirectedGraph`` instance from which to determine the additional sorts Example: We first set up a ``Z3`` solver instance and two TDGs: >>> S = z3.Solver() >>> vDictA = {'v1':'sortX', 'v2': 'sortX', 'v3': 'sortY'} >>> eDictA = {'e1':'sortW', 'e2': 'sortT'} >>> stDictA = {'e1':('v1','v2'), 'e2':('v3','v1')} >>> graphA = Z3TypedDirectedGraph(S, 'gA', vDictA, eDictA, stDictA) >>> vDictB = {'v1':'sortX', 'v2': 'sortX', 'v3': 'sortZ'} >>> eDictB = {'e1':'sortW', 'e2': 'sortT', 'e3': 'sortU'} >>> stDictB = {'e1':('v1','v3'), 'e2':('v2','v3'), 'e3':('v1','v3')} >>> graphB = Z3TypedDirectedGraph(S, 'gB', vDictB, eDictB, stDictB) Then the mutual nil-element patching is performed as follows: >>> graphA.patchNilElements(graphB) >>> graphB.patchNilElements(graphA) Indeed, one may verify that now e.g. ``graphA`` has all the sorts it had \ before plus the new sorts of ``graphB``: >>> graphA.vertices.sorts >>> graphA.edges.sorts """ # explicitly determine the additional edge sorts: extraEdgeSorts = gB.edges.sorts - self.edges.sorts logging.info('\n Extra edge-sorts detected:\n\t %s' % extraEdgeSorts) # "patch" the source and target functions of self to point the extra nil-edges to nil-vertices # of the correct sorts: evBsrts = gB.stSortsDct logging.info('\n StSortDct of gB:\n\t %s' % evBsrts) origStDict = self.stDict nilPatchStDict = {('nil|' + eSrt): ('nil|' + evBsrts[eSrt][0], 'nil|' + evBsrts[eSrt][1]) for eSrt in extraEdgeSorts} self.stDict = {**origStDict, **nilPatchStDict} logging.info('\n Patched stDict of gA:\n\t %s' % self.stDict) # "patch" the vertex and edge sets: self.vertices.patchNilElements(gB.vertices) self.edges.patchNilElements(gB.edges) logging.info('\nVertex and edge sets patched with nil-elements!\n') logging.info('\nVertex sorts:\n\t %s\n Edge sorts:\n\t %s\n' % (self.vertices.sorts, self.edges.sorts)) # for later convenience: dictionary of edge-to-source/target vertex sort "signatures": origSTsrtDict = {self.edges.els[e]: (self.vertices.els[vs[0]], self.vertices.els[vs[1]]) for e, vs in origStDict.items()} nilPatchSTsrtDict = {eSrt: (evBsrts[eSrt][0], evBsrts[eSrt][1]) for eSrt in extraEdgeSorts} self.stSortsDct = {**origSTsrtDict, **nilPatchSTsrtDict} logging.info('\n stSortsDct after patching:\n\t %s' % self.stSortsDct) logging.info('\n Vertex els and nil-els after patching:\n\t %s\n\t %s' % (self.vertices.els, self.vertices.nilEls)) logging.info('\n Edges els and nil-els after patching:\n\t %s\n\t %s' % (self.edges.els, self.edges.nilEls)) self.src = Z3TypedFunction(self.solv, self.name + '|src', self.edges, self.vertices, {e:vs[0] for e, vs in self.stDict.items()}, isInjective=False) self.trgt = Z3TypedFunction(self.solv, self.name + '|trgt', self.edges, self.vertices, {e: vs[1] for e, vs in self.stDict.items()}, isInjective=False)
[docs] def isSortCompatibleWith(self, gB: 'Z3TypedDirectedGraph') -> bool: """Compare two ``Z3TypedDirectedGraph`` instances for sort compatibility. Two TDGs are defined to be sort-compatible if and only if they have precisely the same vertex- \ and edge-sorts, and if in addition their edge-to-source/target sort "signatures" coincide. Args: gB: a second ``Z3TypedDirectedGraph`` instance to compare with Returns: A boolean value (with ``True`` indicating sort-compatibility). """ if (self.vertices.sorts == gB.vertices.sorts) and (self.stSortsDct == gB.stSortsDct): return True else: return False
[docs] def renderNonEmbeddingAssertion(self, gN: 'Z3TypedDirectedGraph') -> List[z3.BoolRef]: """Render assertions of a TDG not containing a forbidden pattern as a subgraph. Args: gN: a ``Z3TypedDirectedGraph`` instance encoding the forbidden pattern Returns: A list of Z3 assertions expressing the non-embedding constraint. """ if (not set(self.vertices.sorts).issuperset(gN.vertices.sorts))\ or (not set(self.edges.sorts).issuperset(gN.edges.sorts)) \ or (set({eSrt for eSrt in gN.edges.sorts if self.stSortsDct[eSrt] != gN.stSortsDct[eSrt]})): # if the sorts of the forbidden pattern gN are not included in the sorts of # the graph `self` for which the condition is formulated, gN can never embed # into the graph, which is why in this case an empty assertion list is returned: return [] else: logging.info('\n --> Entering assembly of non-embedding assertions...\n') constantsList = [] # the list of all Z3 constants necessary to formulate the assertions astsNE = [] # this will hold the non-embedding constraint assertions vConsts = {} # auxiliary dictionary for vName: z3Const dictionary eConsts = {} # auxiliary dictionary for eName: z3Const dictionary idStr = '|' + gN.name # common naming suffix for each constant # vertex-part of non-embedding assertions: for vSrt in gN.vertices.sorts: vNames = [vName + idStr for vName in gN.vertices.subsetNamesToEls[vSrt].keys() if vName[:3] != 'nil'] # only extract non-nil vertex names! if bool(vNames): # the nil-element of the sort vSrt: vNil = self.vertices.nilEls[vSrt] # the Z3 sort reference for vertex sort `vSrt` within the host graph `self`: z3Vsort = self.vertices.subsets[vSrt] vs = z3.Consts(" ".join(vNames), z3Vsort) constantsList += vs astsNE += [z3.Distinct(vs + [vNil])] # auxiliary dictionary needed to formulate the source and target consistency assertions: vConsts = {**vConsts, **dict(zip(vNames, vs))} logging.info('\n\tNon-nil vertices of sort \'%s\' detected:\n\t\t%s\n' % (vSrt, dict(zip(vNames, vs)))) # edge-part of non-embedding assertions: for eSrt in gN.edges.sorts: eNames = [eName + idStr for eName in gN.edges.subsetNamesToEls[eSrt].keys() if eName[:3] != 'nil'] # only extract non-nil edge names! if bool(eNames): # the nil-element of the sort eSrt: eNil = self.edges.nilEls[eSrt] # the Z3 sort reference for edge sort `eSrt` within the host graph `self`: z3Esort = self.edges.subsets[eSrt] es = z3.Consts(" ".join(eNames), z3Esort) constantsList += es astsNE += [z3.Distinct(es + [eNil])] # auxiliary dictionary needed to formulate the source and target consistency assertions: eConsts = {**eConsts, **dict(zip(eNames, es))} logging.info('\n\tNon-nil edges of sort \'%s\' detected:\n\t\t%s\n' % (eSrt, dict(zip(eNames, es)))) # source and target consistency requirements: sSrt, tSrt = gN.stSortsDct[eSrt] # the source- and target-sorts of edges of type `srt` for eId in eNames: eName = eId[:-len(idStr)] # name of the dge without the idStr vSrcId = gN.stDict[eName][0] + idStr vTrgtId = gN.stDict[eName][1] + idStr astsNE += [ self.src.subFunctions[(eSrt, sSrt)](eConsts[eId]) == vConsts[vSrcId], self.trgt.subFunctions[(eSrt, tSrt)](eConsts[eId]) == vConsts[vTrgtId] ] # assemble and return the full non-embedding assertion set: astsOutput = [z3.Not(z3.Exists(constantsList, z3.And(astsNE)))] logging.info('\n<--- Assertions generated:\n\t\t%s\n' % astsOutput) return astsOutput
[docs] def writeTDGdataToJSONfile(self, fName: str) -> None: """Routine to write the defining data of a ``Z3TypedDirectedGraph`` instance to a JSON file. The purpose of this routine is to permit a convenient way to store all information necessary \ to instantiate a ``Z3TypedDirectedGraph`` instance (except for the ``Z3`` solver instance). Args: fName: the base name (possibly including a path) for the JSON file """ # verify if the file already exists, and if so, read out its contents: if os.path.exists(fName + '.json'): with open(fName + '.json', 'r') as f: TDGdata = json.load(f) else: TDGdata = {} # update the key self.name to a data dictionary generated from the input data: TDGdata[self.name] = { 'gName': self.name, 'vDict': self.vertices.els, 'eDict': self.edges.els, 'stDict': self.stDict } with open(fName + '.json', 'w') as f: json.dump(TDGdata, f, sort_keys=True, indent=2)
[docs] def display(self, graphLayout: str = 'neato') -> SVG: """Display a rendering of the TDG in the current IPython or Jupyter lab interactive session. Args: graphLayout: (optional) a string with the name of a graph layouting mehtod (default: 'neato'; \ alternative options are those provided by the ``GraphViz`` package, e.g. \ ``dot``, ``twopi``, ``fdp``, ``sfdp`` and ``circo``) Returns: An ``IPython`` SVG display instance. """ import resmt.visualizations return resmt.visualizations.nxTypedDirectedGraph(self, drawGraph=True, writeGraph=False, gLayout=graphLayout).display
[docs] def renderVisualization(self, fName: str, graphLayout: str = 'neato') -> SVG: """Display and write to disk a rendering of the TDG. Args: fName: a filename (possibly with a path), where the suffix controls the output format graphLayout: (optional) a string with the name of a graph layouting mehtod (default: 'neato'; \ alternative options are those provided by the ``GraphViz`` package, e.g. \ ``dot``, ``twopi``, ``fdp``, ``sfdp`` and ``circo``) ..note :: Any format accessible for the ``pydot`` package is permitted (including e.g. PDF, \ SVG, JPG, ...). Returns: An ``IPython`` SVG display instance. """ import resmt.visualizations return resmt.visualizations.nxTypedDirectedGraph(self, drawGraph=True, writeGraph=True, fileName=fName, gLayout=graphLayout).display
[docs]def renderTDGdataFromJSONfile(fName: str) -> Any: """Read out the TDG data from a data file in JSON format. This is the inverse method to the ``writeTDGdataToJSONfile`` method (and likely will only be \ useful to operate on JSON files generated with said method). .. note:: Due to a limitation of the JSON file format, storing TDG data via the \ ``writeTDGdataToJSONfile`` method inherently converts the *tuples* in the ``stDict`` fields \ into *lists*. This defect is fixed via the additional "re-packaging" step performed by the \ present method. Args: fName: the filename of the JSON input file Returns: A dictionary of dictionaries, with the top-level dictionary having keys ``gName``, \ ``vDict``, ``eDict`` and ``stDict`` (i.e. the data encoding a TDG). """ with open(fName + '.json', 'r') as f: TDGdata = json.load(f) # re-convert lists to tuples in the stDict fields: for key in TDGdata.keys(): stDictWithLists = TDGdata[key]['stDict'] # stDict with the wrong encoding (i.e. with lists) # fix the stDict encoding by "re-packaging" the lists into tuples TDGdata[key]['stDict'] = {e: tuple(vs) for e, vs in stDictWithLists.items()} return TDGdata
[docs]def renderTDGfromJSONfile(fName: str, gName: str, S: z3.Solver) -> Z3TypedDirectedGraph: """Render a ``Z3TypedDirectedGraph`` instance from data stored in a JSON file. Args: fName: the JSON file containing the TDG data gName: the name under which the data for the TDG is stored (which doubles as the name of the TDG) S: a ``Z3`` solver instance within which the TDG should be instantiated Returns: A ``Z3TypedDirectedGraph`` instance rendered from the TDG data read from the JSON file. """ TDGdata = renderTDGdataFromJSONfile(fName)[gName] gNameStr, vDict, eDict, stDict = tuple(TDGdata[key] for key in ['gName', 'vDict', 'eDict', 'stDict']) return Z3TypedDirectedGraph(S, gNameStr, vDict, eDict, stDict)
[docs]class Z3TDGtemplate(Z3TypedDirectedGraph): """Representation of templates of typed directed graphs in Z3. Much as in the case of templates for typed sets, this class permits to encode templates of \ typed directed graphs for algorithms such as the pushout construction where the concrete \ elements of the graph and its source and target functions will only be fully instantiated at \ runtime of the algorithm. Technically, this feature is achieved by utilizing ``Z3TypedSetTemplate`` \ instances in lieu of ``Z3TypedSet`` instances at initialization of instance of the template class. \ The input data necessary for TDG templates is comprised of 1. Sets of vertex- and edge-sorts, and 2. a dictionary of edge-sort to edge source/target vertex sorts. Examples: >>> S = z3.Solver() >>> vSrts = {'sortX', 'sortY'} >>> eSrts = {'sortW', 'sortT'} >>> stSrtsDict = {'sortX':('sortW','sortT'), 'sortY':('sortT','sortT')} >>> graphA = Z3TDGtemplate(S, 'gA', vSrts, eSrts, stSrtsDict) Attributes: solv: a ``Z3`` solver instance name: the name of the TDG instance stDict: the source/target vertex incidence dictionary vertices: a ``Z3TypedSet`` instances representing the vertex set edges: a ``Z3TypedSet`` instance representing the edge set src: a ``Z3TypedFunction`` representing the source function trgt: a ``Z3TypedFunction`` representing the targetß function stSortsDct: a dictionary with edge-sorts as keys and tuples of source/target \ vertex sorts as values """
[docs] def __init__(self, solv: z3.Solver, gName: str, vSorts: Set[str], eSorts: Set[str], stSortsDict: Dict[str, Tuple[str, str]]) -> None: """Initialization of a ``Z3TDGtemplate`` object. Args: solv: Z3 solver instance gName: the name of the graph vSorts: a set of vertex sort names eSorts: a set of edge sort names stSortsDict: a dictionary with entries of the form ``eSrt: (sSrt, tSrt)``, where ``eSrt`` is \ the name of an edge sort, while ``sSrt`` and ``tSrt`` are the names of the source/target \ vertex sort names for edges of sort ``eSrt`` """ self.solv = solv self.name = gName self.stDict = {} self.stSortsDct = stSortsDict if bool(vSorts) and not bool(eSorts): # special case: non-empty vertex sort set, but empty edge sprt set vSorts = vSorts.union({'_DEFAULT'}) eSorts = {'_DEFAULT'} self.stDict = {'nil|_DEFAULT': ('nil|_DEFAULT', 'nil|_DEFAULT')} else: # otherwise, instantiate nil-edge to nil-endpoint mappings according to the stSortsDict input: self.stDict = {'nil|' + e: ('nil|' + vs[0], 'nil|' + vs[1]) for e, vs in stSortsDict.items()} # set up Z3TypedSet template instances for sets of vertices and edges: self.vertices = Z3TypedSetTemplate(solv, gName + '|V', vSorts) self.edges = Z3TypedSetTemplate(solv, gName + '|E', eSorts) # set up Z3TypedFunction instances for source and target maps: self.src = Z3TypedFunction(solv, gName + '|src', self.edges, self.vertices, {e: vs[0] for e, vs in self.stDict.items()}, isInjective=False) self.trgt = Z3TypedFunction(solv, gName + '|trgt', self.edges, self.vertices, {e: vs[1] for e, vs in self.stDict.items()}, isInjective=False)
[docs]class Z3TDGmorphism: """(Template of) a TDG homomorphism. Note that if besides nil-element mappings no mapping data is provided during the initialization of an \ instance of this class, the instance in effect encodes a *template* of a TDG morphism. """
[docs] def __init__(self, mName: str, domG: Z3TypedDirectedGraph, codomG: Z3TypedDirectedGraph, vMap: Dict[str, str], eMap: Dict[str, str], isInjective: bool = False, isSurjective: bool = False) -> None: """Initialization of a TDG morphism (template). .. note:: Especially for the purpose of instantiating *templates* of TDGs, it is possible \ to utilize ``Z3TDGtemplate`` instances for ``domG`` and/or ``codomG`` (which type-checks \ due to the fact that the class ``Z3TDGtemplate`` is a subclass of the class \ ``Z3TypedDirectedGraph``). Args: mName: a string encoding the name of the morphism domG: a ``Z3TypedDirectedGraph`` instance encoding the domain graph codomG: a ``Z3TypedDirectedGraph`` instance encoding the codomain graph vMap: a dictionary encoding the vertex-mapping eMap: a dictionary encoding the edge-mapping isInjective: (optional) a boolean value encoding whether or not the morphism is injective isSurjective: (optional) a boolean value encoding whether or not the morphism is surjective Examples: **Case 1:** instantiating a concrete morphism between two fully specified TDGs: >>> S = z3.Solver() >>> gA = Z3TypedDirectedGraph(S, 'gA', {'a1':'X', 'a2':'Y'}, {'e1':'Z'}, {'e1':('a1','a2')}) >>> gB = Z3TypedDirectedGraph(S, 'gB', {'b1':'X', 'b2':'Y', 'b3':'Y'}, {'f1':'Z'}, \ {'f1':('b1','b3')}) >>> mAB = Z3TDGmorphism('mAB', gA, gB, {'a1':'b1', 'a2':'b3'}, {'e1':'f1'}) **Case 2:** instantiating an injective morphism template between two TDG templates: >>> S = z3.Solver() >>> tgA = Z3TDGtemplate(S, 'tgA', {'X', 'Y'}, {'Z'}, {'Z':('X','Y')}) >>> tgB = Z3TDGtemplate(S, 'tgB', {'X', 'Y'}, {'Z'}, {'Z':('X','Y')}) >>> mABtempl = Z3TDGmorphism('mAB', tgA, tgB, {'nil|X':'nil|X', 'nil|Y':'nil|Y'}, \ {'nil|Z':'nil|Z'}) **Case 3:** instantiating an injective morphism template between two fully specified TDGs \ for the purpose of using ``Z3`` to search for subgraph embeddings; to this end, the TDG \ morphism template has to be specified with nil-element mappings only: >>> S = z3.Solver() >>> gA = Z3TypedDirectedGraph(S, 'gA', {'a1':'X', 'a2':'Y'}, {'e1':'Z'}, {'e1':('a1','a2')}) >>> gB = Z3TypedDirectedGraph(S, 'gB', {'b1':'X', 'b2':'Y', 'b3':'Y'}, {'f1':'Z', 'f2':'Z'}, ... {'f1':('b1','b2'), 'f2':('b1','b3')}) >>> mAB = Z3TDGmorphism('mAB', gA, gB, {'nil|X':'nil|X', 'nil|Y':'nil|Y'}, {'nil|Z':'nil|Z'}) >>> chk = S.check() >>> if chk == z3.sat: ... print(S.model()) """ # input data consistency checks: # raise an error if the explicitly provided mapping data vMap and eMap is inconsistent # (i.e. if there exists an edge eD in domG whose endpoints are not mapped via vMap into the # endpoints of the target map eC = eMap[eD]) for eD, eC in eMap.items(): if eD[:3] != 'nil': if eD not in domG.stDict.keys(): raise ValueError('Edge-map with incomplete source/target vertex map detexted!') else: vDsrc, vDtrgt = domG.stDict[eD] if (vMap[vDsrc], vMap[vDtrgt]) != codomG.stDict[eC]: raise ValueError('Inconsistency in edge- and source-/target-mapping detected!') # the morphism must be sort-preserving: vMapNonNil = {vD: vC for vD, vC in vMap.items() if vD[:3] != 'nil' and vC[:3] != 'nil'} eMapNonNil = {eD: eC for eD, eC in eMap.items() if eD[:3] != 'nil' and eC[:3] != 'nil'} if bool({v for v in vMapNonNil.keys() if domG.vertices.els[v] != codomG.vertices.els[vMap[v]]}) \ or bool({e for e in eMapNonNil.keys() if domG.edges.els[e] != codomG.edges.els[eMap[e]]}): raise ValueError('Sort-preservation constraint violated!') # the solver instances over which domG and codomG are instantiated must match: if domG.solv.ctx != codomG.solv.ctx: raise ValueError('Context mismatch of solver instances of domG and codomG detected!') # store some input data into fields: self.name = mName self.domG = domG self.codomG = codomG self.solv = domG.solv self.asts = [] # instantiate typed functions: self.vMorph = Z3TypedFunction(self.solv, mName + '|V', domG.vertices, codomG.vertices, vMap, isInjective, isSurjective) self.eMorph = Z3TypedFunction(self.solv, mName + '|E', domG.edges, codomG.edges, eMap, isInjective, isSurjective) self.asts += self.vMorph.asts + self.eMorph.asts # assert the TDG homomorphism property (in a form that is also compatible with the special case # of instantiating a template of a TDG morphism, i.e. via only referring to constants implicitly): for eSrt, vSrts in domG.stSortsDct.items(): # Z3 functions encoding the edge, source- and target-vertex mappings: eMorph = self.eMorph.subFunctions[(eSrt, eSrt)] srcMorph = self.vMorph.subFunctions[(vSrts[0], vSrts[0])] trgtMorph = self.vMorph.subFunctions[(vSrts[1], vSrts[1])] # the domain/codomain source/target functions of the relevant sorts: sD = domG.src.subFunctions[(eSrt, vSrts[0])] tD = domG.trgt.subFunctions[(eSrt, vSrts[1])] sC = codomG.src.subFunctions[(eSrt, vSrts[0])] tC = codomG.trgt.subFunctions[(eSrt, vSrts[1])] # part of the homomorphism constraint originating from edges of sort eSrt: eD = z3.Const('eD|' + eSrt, domG.edges.subsets[eSrt]) self.asts += [z3.ForAll(eD, z3.And( srcMorph(sD(eD)) == sC(eMorph(eD)), trgtMorph(tD(eD)) == tC(eMorph(eD))) ) ] # add all assertions to the local solver instance: self.solv.add(self.asts)
[docs]class Z3TypedSetPredicate: """An assembly of boolean predicates (one per sort-wise subset)."""
[docs] def __init__(self, pName: str, domSet: Z3TypedSet) -> None: """Instantiation of a boolean predicate (template) on a typed set. Args: pName: a string containing the name of the predicate domSet: a ``Z3TypedSet`` instance encoding the domain set of the rpedicate Example: >>> S = z3.Solver() >>> setD = Z3TypedSet(S, 'domSet', {'a1': 'sortX', 'a2': 'sortX', 'a3': 'sortY'}) >>> predD = Z3TypedSetPredicate('predD', setD) """ self.name = pName self.domSet = domSet self.asts = [] self.subPs = {} self.solv = domSet.solv ctx = self.solv.ctx for srt in domSet.sorts: # render a sub-predicate: self.subPs[srt] = z3.Function(pName + '|' + srt, domSet.subsets[srt], z3.BoolSort(ctx)) # add assertion that the nil-element of the sort must have predicate "True": self.asts += [self.subPs[srt](domSet.nilEls[srt]) == True] # update all assertions to the solver instance: self.solv.add(self.asts)
[docs] def f(self, elName: str) -> z3.BoolRef: """Refer to the value of the predicate by element name.""" dEl, dSrt = self.domSet.getElandSrtName(elName) return self.subPs[dSrt](dEl)
[docs]class Z3TDGpredicate: """A (template for a) predicate for ``Z3TypedDirectedGraphs``. For each vertex and edge sort, the predicate contains a boolean sub-predicate, and moreover \ assertions are included to ensure that an edge with predicate ``True`` also has its endpoints \ carrying the predicates ``True``. """
[docs] def __init__(self, pName: str, domG: Z3TypedDirectedGraph) -> None: """Instantiation of a template of a boolean predicate over a typed directed graph. Args: pName: the name of the predicate domG: a typed directed graph over which the predicate is to be constructed Example: >>> S = z3.Solver() >>> vDictA = {'v1':'sortX', 'v2': 'sortX', 'v3': 'sortY'} >>> eDictA = {'e1':'sortW', 'e2': 'sortT'} >>> stDictA = {'e1':('v1','v2'), 'e2':('v3','v1')} >>> graphA = Z3TypedDirectedGraph(S, 'gA', vDictA, eDictA, stDictA) >>> predGraphA = Z3TDGpredicate('predGraphA', graphA) """ self.name = pName self.domG = domG self.asts = [] # set up typed predicates for vertices and edges: self.vPred = Z3TypedSetPredicate(pName + '|V', domG.vertices) self.ePred = Z3TypedSetPredicate(pName + '|E', domG.edges) self.asts += self.vPred.asts + self.ePred.asts # assert that an edge can only the predicate "True" if also its endpoints do so: for e, vs in domG.stDict.items(): self.asts += [z3.Implies(self.ePred.f(e) == True, z3.And(self.vPred.f(vs[0]) == True, self.vPred.f(vs[1]) == True) )] domG.solv.add(self.asts)
[docs]class Z3TypedSetAutomorphismTemplate: """A template for an automorphism of a typed set."""
[docs] def __init__(self, aName: str, domSet: Z3TypedSet) -> None: """Instantiates assertions that permit to find an automorphism of the given set. Args: aName: the name of the resulting ``Z3TypedAutomorphismTemplate`` instance domSet: the ``Z3TypedSet`` of which to render the automorphism assertions Example: >>> S = z3.Solver() >>> setD = Z3TypedSet(S, 'domSet', {'a1': 'sortX', 'a2': 'sortX', 'a3': 'sortY'}) >>> autD = Z3TypedSetAutomorphismTemplate('autD', setD) """ self.name = aName self.domSet = domSet self.solv = domSet.solv ctx = self.solv.ctx self.sfNames = {} self.subFunctions = {} self.asts = [] for srt in domSet.sorts: sfName = aName + '|' + srt self.sfNames[srt] = sfName subsetSrt = domSet.subsets[srt] sf = z3.Function(sfName, subsetSrt, subsetSrt) self.subFunctions[srt] = sf # assert nil-element mapping: nilD = domSet.nilEls[srt] self.asts += [sf(nilD) == nilD] # assert injectivity and surjectivity: x = z3.Const('x|' + srt, subsetSrt) y = z3.Const('y|' + srt, subsetSrt) self.asts += [z3.ForAll([x, y], z3.Implies(sf(x) == sf(y), x == y)), z3.ForAll(y, z3.Exists(x, sf(x) == y))] # finally, update all assertions to the solver instance: self.solv.add(self.asts)
[docs] def f(self, elName: str) -> z3.ExprRef: """Retrieve the Z3 value of an automorphism application by domain element name.""" el, srt = self.domSet.getElandSrtName(elName) return self.subFunctions[srt](el)
[docs]class Z3TDGautomorphismTemplate: """Automorphism template for ``Z3TypedDirectedGraph`` instances.""" def __init__(self, aName: str, domG: Z3TypedDirectedGraph) -> None: self.name = aName self.domG = domG self.solv = domG.solv self.asts = [] # set up the vertex- and edge-automorphism "templates" and assertions: self.vMorph = Z3TypedSetAutomorphismTemplate(aName + '|V', domG.vertices) self.eMorph = Z3TypedSetAutomorphismTemplate(aName + '|E', domG.edges) self.asts += self.vMorph.asts + self.eMorph.asts # assert the TDG homomorphism property (i.e. source and target morphism consistency): for e, vs in domG.stDict.items(): eSrt = domG.edges.els[e] vSrcSrt, vTrgtSrt = domG.stSortsDct[eSrt] eSrc = domG.src.subFunctions[(eSrt, vSrcSrt)] eTrgt = domG.trgt.subFunctions[(eSrt, vTrgtSrt)] self.asts += [z3.And(self.vMorph.f(vs[0]) == eSrc(self.eMorph.f(e)), self.vMorph.f(vs[1]) == eTrgt(self.eMorph.f(e)) )] # update all assertions to the solver instance: self.solv.add(self.asts)
[docs]def genForbiddenTDGrelations(fG: Z3TypedDirectedGraph) -> \ Tuple[List[Dict[str, str]], List[Tuple[float, str]], str]: """Generation of forbidden relation from a forbidden TDG pattern. Args: fG: a "forbidden TDG pattern" Returns: A list of predicate dictionaries, where each dictionary represents a possible decomposition \ of the "forbidden TDG pattern" ``fG`` into a monic span where none of the entries in the \ entries in the span coincide with ``fG`` itself, and with predicates (per vertex/edge in ``fG``) \ taking values ``'<'`` (left), ``'='`` (both) and ``'>'`` (right) according to which "leg" of the \ span an element is included in. The list is curated by *isomorphisms*, i.e. only one representative \ per isomorphism-class induced by automorphisms of ``fG`` is included. """ # read out the defining data of the "forbidden TDG pattern" fG: gName = fG.name vDict = fG.vertices.els eDict = fG.edges.els stDict = fG.stDict # instantiate a local Z3 solver instance: solv = z3.Solver() ctx = solv.ctx asts = [] # render a Z3TDG instance from the provided fG data: N = Z3TypedDirectedGraph(solv, gName, vDict, eDict, stDict) logging.info('\n--|\tForbidden TDG pattern set up in local Z3 solver instance.\n') # set up predicate functions for the left/right "legs" of the forbidden relations indA = Z3TDGpredicate('indA', N) indB = Z3TDGpredicate('indB', N) logging.info('\n--|\tTemplates for the two TDG predicates instantiated.\n') # collect all assertions (concerning the nil-element predicate values etc.) into the "global" asts list: asts += indA.asts + indB.asts logging.info('\n--|\tAssertions rendered thus far:\n') for ast in asts: logging.info(' |\t\t %s\n' % ast) # assert that both subgraphs in the span are proper subgraphs of N: astAproperSG = [] astBproperSG = [] for srt in N.vertices.sorts: x = z3.Const('x|V|' + srt, N.vertices.subsets[srt]) astAproperSG += [z3.Exists(x, indA.vPred.subPs[srt](x) != True)] astBproperSG += [z3.Exists(x, indB.vPred.subPs[srt](x) != True)] # assert that the union of all vertices of each sort equals all vertices of N: asts += [z3.ForAll(x, z3.Or(indA.vPred.subPs[srt](x) == True, indB.vPred.subPs[srt](x) == True))] # analogous construction for the edges: for srt in N.edges.sorts: x = z3.Const('x|E|' + srt, N.edges.subsets[srt]) astAproperSG += [z3.Exists(x, indA.ePred.subPs[srt](x) != True)] astBproperSG += [z3.Exists(x, indB.ePred.subPs[srt](x) != True)] # assert that the union of all edges of each sort equals all edges of N: asts += [z3.ForAll(x, z3.Or(indA.ePred.subPs[srt](x) == True, indB.ePred.subPs[srt](x) == True))] # assemble all parts and update it into the "global" assertion list: asts += [z3.Or(astAproperSG), z3.Or(astBproperSG)] logging.info('\n--|\tAssertions including the proper subgraph assertions:\n') for ast in asts: logging.info(' |\t\t %s\n' % ast) # add all assertions to the solver instance: solv.add(asts) # start of teh forbidden relation search algorithm: frList = [] # eventually, this list will contain the forbidden relations! cTimes = [] # list of computation times (for performance evaluation) # initial solver run: start = timeit.default_timer() chk = solv.check() t = timeit.default_timer() - start logging.info('\n\n**|\tFirst solver run returned\t %s\n' % chk) if chk == z3.sat: cTimes += [(t, 'fr')] else: cTimes += [(t, 'unsat')] while chk == z3.sat: M = solv.model() # render dictionaries for easy access to model data: indAdctV = {v + '|V': M.eval(indA.vPred.f(v)) for v in N.vertices.els.keys()} indAdctE = {e + '|E': M.eval(indA.ePred.f(e)) for e in N.edges.els.keys()} indAdct = {**indAdctV, **indAdctE} indBdctV = {v + '|V': M.eval(indB.vPred.f(v)) for v in N.vertices.els.keys()} indBdctE = {e + '|E': M.eval(indB.ePred.f(e)) for e in N.edges.els.keys()} indBdct = {**indBdctV, **indBdctE} # determine the "sort signature" of the TDGs in encoded by indA and indB, i.e. # the counts of the vertices and edges of each sort (for optimizing the isomorphism checkes): # Type signature of indA (counting only elements that carry the predicate "True"): sigA = Counter( [(vSrt + '|V') for vName, vSrt in N.vertices.els.items() if indAdctV[vName + '|V'] == True] + [(eSrt + '|E') for eName, eSrt in N.edges.els.items() if indAdctE[eName + '|E'] == True] ) sigB = Counter( [(vSrt + '|V') for vName, vSrt in N.vertices.els.items() if indBdctV[vName + '|V'] == True] + [(eSrt + '|E') for eName, eSrt in N.edges.els.items() if indBdctE[eName + '|E'] == True] ) sigAB = (sigA, sigB) logging.info('\n--|\tSort-signature of indA:\n \t%s\n--|\tSort-signature of indB:\n \t%s\n' % (sigAB[0], sigAB[1])) # determine whether or not an isomorphic span decomposition of N has already been found before: isNewFr = True frIsoCandidateListAB = [frCD for frCD in frList if frCD[2][0] == sigAB[0] and frCD[2][1] == sigAB[1]] logging.info('\n--|\tfrCandidateListAB:\n |\t %s' % frIsoCandidateListAB) for (indC, indD, sigCD) in frIsoCandidateListAB: solv.push() # set up a TDG automorphism template over N (for quotienting by automorphisms later on): Phi = Z3TDGautomorphismTemplate('Phi', N) astsIso = z3.And( [z3.And( M.eval(indA.vPred.subPs[srt](Phi.vMorph.f(v))) == indC[v+'|V'], M.eval(indB.vPred.subPs[srt](Phi.vMorph.f(v))) == indD[v+'|V'] ) for v, srt in N.vertices.els.items()] + [z3.And( M.eval(indA.ePred.subPs[srt](Phi.eMorph.f(e))) == indC[e + '|E'], M.eval(indB.ePred.subPs[srt](Phi.eMorph.f(e))) == indD[e + '|E'] ) for e, srt in N.edges.els.items()] ) solv.add(astsIso) start = timeit.default_timer() chk = solv.check() t = timeit.default_timer() - start cTimes += [(t, 'iso')] solv.pop() if chk == z3.sat: isNewFr = False logging.info('\n***The current decomposition is isomorphic to a previous one and thus discarded.\n') break else: msgStr = ('\n***The solver returned the status\t%s\n' % chk) msgStr += '\n Assertions used:\n%s\n' % astsIso msgStr += (' The relation described by the dictionaries...\n\tindA = %s\n\tindB = %s\n' + ' ...is not isomorphic to the one ' + 'described by the dictionaries\n\tindC = %s\n\tindD = %s\n') % (indAdct, indBdct, indC, indD) logging.info(msgStr) # if the forbidden relation is indeed new, add it to the list of forbidden relations: if isNewFr: frList += [(indAdct, indBdct, sigAB)] # then "block" the present solution to search for other solutions astsBlock = [] for srt in N.vertices.sorts: z = z3.Const('z|V|' + srt, N.vertices.subsets[srt]) astsBlock += [z3.Exists(z, z3.Or(indA.vPred.subPs[srt](z) != M.eval(indA.vPred.subPs[srt](z)), indB.vPred.subPs[srt](z) != M.eval(indB.vPred.subPs[srt](z)) ) ) ] for srt in N.edges.sorts: z = z3.Const('z|E|' + srt, N.edges.subsets[srt]) astsBlock += [z3.Exists(z, z3.Or(indA.ePred.subPs[srt](z) != M.eval(indA.ePred.subPs[srt](z)), indB.ePred.subPs[srt](z) != M.eval(indB.ePred.subPs[srt](z)) ) ) ] solv.add(z3.Or(astsBlock)) # next solver run: start = timeit.default_timer() chk = solv.check() t = timeit.default_timer() - start if chk == z3.sat: cTimes += [(t, 'fr')] else: cTimes += [(t, 'unsat')] # after the solver finally returns 'unsat', "compile" the complete frList into '</=/>' predicate format: frListAlt = [] for indA, indB, sigAB in frList: frListAlt += [{k: ('=' if (indA[k] and indB[k]) else '<' if (indA[k]) else '>') for k in indA.keys()}] # note: by construction, indA.keys() == indB.keys() # extract some info on Z3 solver statistics: statsStr = solv.statistics() logging.info('\n||||\t Z3 solver statistics summary:\t||||\n%s' % pformat(statsStr)) # clear out the local solver instance to save memory etc.: solv.reset() # render a log message on forbidden relations rendered: logging.info('\n||||\t Decompositions produced:\t||||\n%s' % pformat(frListAlt)) return frListAlt, cTimes, statsStr
[docs]class Z3TDGfrContainer: """An auxiliary class to render forbidden TDG relation data more accessible."""
[docs] def __init__(self, solv: z3.Solver, N: Z3TypedDirectedGraph, frData: List[Dict[str,str]] = None) -> None: """The container initialization. .. note:: Since the search for forbidden TDG relations is computationally intensive, the container \ class initialization is designed such that the forbidden relation data may be specified directly \ (e.g. from stored data), or alternatively without the optional ``frData`` parameter. In the \ latter case, one may then in a second step trigger the class method ``genTDGfrData`` to generate \ the forbidden relation data for the specified forbidden TDG pattern ``N``. """ # NOTE: the Z3 solver instance saved in the field self.solv below will in applications # of the functionality of Z3TDGfrContainer instances NOT be the one which is utilized # temporarily when generating the forbidden relations via the self.genTDFfrData() method # (which will be an independent instance), but the one within which the host graphs # for an overlap search are instantiated; this is for purely technical abd effucuebct # reasons, as it will permit to separate *static* parts of computations (i.e. the # forbidden relation generation from forbidden patterns) from the *dynamic* parts such # as the overlap search self.solv = solv self.N = N if frData is None: self.frData = {} else: self.frData = frData
[docs] def genTDGfrData(self) -> None: """Method to trigger the forbidden relation search for the given forbidden TDG pattern ``self.N``.""" self.frData, self.cTimes, self.statsStr = genForbiddenTDGrelations(self.N)
[docs] def genFRasts(self, P: 'Z3TDGspanPredicateTemplate') -> None: """Method to render the non-DPE assertions generated from the forbidden TDG relations.""" gA, gB = P.gA, P.gB # some data consistency checks: if not bool(self.frData): # if frData is empty raise ValueError('Please generate the forbidden relations data before executing this method!') if (gA.solv.ctx != gB.solv.ctx) or (self.N.solv.ctx != gA.solv.ctx): raise ValueError('The contexts of teh TDGs gA, gB and N do not match!') # if N is not sort-compatible with gA and/or gB, the forbidden relations for N # do not contribute any non-trivial assertions to the span search problem: if not (gA.isSortCompatibleWith(gB) and gA.isSortCompatibleWith(self.N)): logging.info('The forbidden relations for N do not contribute non-trivial assertions here!') return # else, the main part of the method begins: self.asts = [] for fr in self.frData: # initialize some dictionaries to hold "fresh constants": fAv, fAvSorts, fAvClists, fAe, fAeSorts, fAeClists = {}, {}, {}, {}, {}, {} fBv, fBvSorts, fBvClists, fBe, fBeSorts, fBeClists = {}, {}, {}, {}, {}, {} astsSpans = [] for x, p in fr.items(): if x[-2:] == '|V': # if x is a vertex name vName = x[:-2] if p != '>': vAsrt = self.N.vertices.els[vName] fAv[vName] = z3.Const(vName + '|V|' + vAsrt, gA.vertices.subsets[vAsrt]) fAvSorts[vName] = vAsrt # note: the get method below ensures correct typing in case the dictionary entry # is empty at the first time of adding to it (in which case an empty set is # return as specified by the second entry of the get method) fAvClists[vAsrt] = fAvClists.get(vAsrt, []) + [fAv[vName]] if p == '<': vBel = z3.Const('b|V|' + vAsrt, gB.vertices.subsets[vAsrt]) astsSpans += [z3.ForAll(vBel, P.sV.subPs[vAsrt](fAv[vName], vBel) == False)] if p != '<': vBsrt = self.N.vertices.els[vName] fBv[vName] = z3.Const(vName + '|V|' + vBsrt, gB.vertices.subsets[vBsrt]) fBvSorts[vName] = vBsrt fBvClists[vBsrt] = fBvClists.get(vBsrt, list()) + [fBv[vName]] if p == '=': astsSpans += [P.sV.subPs[vBsrt](fAv[vName], fBv[vName]) == True] if p == '>': vAel = z3.Const('a|V|' + vBsrt, gA.vertices.subsets[vBsrt]) astsSpans += [z3.ForAll(vAel, P.sV.subPs[vBsrt](vAel, fBv[vName]) == False)] if x[-2:] == '|E': # if x is an edge name eName = x[:-2] if p != '>': eAsrt = self.N.edges.els[eName] fAe[eName] = z3.Const(eName + '|E|' + eAsrt, gA.edges.subsets[eAsrt]) fAeSorts[eName] = eAsrt fAeClists[eAsrt] = fAeClists.get(eAsrt, list()) + [fAe[eName]] if p == '<': eBel = z3.Const('b|E|' + eAsrt, gB.edges.subsets[eAsrt]) astsSpans += [z3.ForAll(eBel, P.sE.subPs[eAsrt](fAe[eName], eBel) == False)] if p != '<': eBsrt = self.N.edges.els[eName] fBe[eName] = z3.Const(eName + '|E|' + eBsrt, gB.edges.subsets[eBsrt]) fBeSorts[eName] = eBsrt fBeClists[eBsrt] = fBeClists.get(eBsrt, list()) + [fBe[eName]] if p == '=': astsSpans += [P.sE.subPs[eBsrt](fAe[eName], fBe[eName]) == True] if p == '>': eAel = z3.Const('a|E|' + eBsrt, gA.edges.subsets[eBsrt]) astsSpans += [z3.ForAll(eAel, P.sE.subPs[eBsrt](eAel, fBe[eName]) == False)] # step 2: render distinctness assertions astsDistinct = \ [z3.Distinct(lst + [gA.vertices.nilEls[srt]]) for srt, lst in fAvClists.items()] \ + [z3.Distinct(lst + [gB.vertices.nilEls[srt]]) for srt, lst in fBvClists.items()] \ + [z3.Distinct(lst + [gA.edges.nilEls[srt]]) for srt, lst in fAeClists.items()] \ + [z3.Distinct(lst + [gB.edges.nilEls[srt]]) for srt, lst in fBeClists.items()] # step 3: render TDG homomorphism assertions astsTDGhom = [] for eAname in fAe.keys(): eAsrt = fAeSorts[eAname] vAsrtSrc, vAsrtTrgt = gA.stSortsDct[eAsrt] srcA = gA.src.subFunctions[(eAsrt, vAsrtSrc)] trgtA = gA.trgt.subFunctions[(eAsrt, vAsrtTrgt)] astsTDGhom += [z3.And(srcA(fAe[eAname]) == fAv[self.N.stDict[eAname][0]], trgtA(fAe[eAname]) == fAv[self.N.stDict[eAname][1]])] for eBname in fBe.keys(): eBsrt = fBeSorts[eBname] vBsrtSrc, vBsrtTrgt = gB.stSortsDct[eBsrt] srcB = gB.src.subFunctions[(eBsrt, vBsrtSrc)] trgtB = gB.trgt.subFunctions[(eBsrt, vBsrtTrgt)] astsTDGhom += [z3.And(srcB(fBe[eBname]) == fBv[self.N.stDict[eBname][0]], trgtB(fBe[eBname]) == fBv[self.N.stDict[eBname][1]])] # step 4: assemble all components into a non-DPE assertion: fullFRasts = [z3.Not(z3.Exists( list(fAv.values()) + list(fAe.values()) + list(fBv.values()) + list(fBe.values()), z3.And(astsSpans + astsDistinct + astsTDGhom) ))] self.solv.add(fullFRasts) self.asts += fullFRasts
[docs]class Z3TypedSetSpanPredicateTemplate: """A template for a monic span of TDGs and of the required assertions."""
[docs] def __init__(self, sName: str, setA: Z3TypedSet, setB: Z3TypedSet, isMonic: bool = False) -> None: """Instantiation of a span predicate template between two typed sets. Args: sName: the name of the span predicate template setA: the first ``Z3TypedSet`` instance setB: the second ``Z3TypedSet`` instance isMonic: a Boolean indicating whether the span is to be *minic* (default: ``False``) Example: >>> S = z3.Solver() >>> setA = Z3TypedSet(S, 'setA', {'a1':'sortX', 'a2':'sortX', 'a3': 'sortY'}) >>> setB = Z3TypedSet(S, 'setB', {'b1':'sortX', 'b2':'sortX', 'b3': 'sortY', 'b4':'sortZ'}) Note that one must ensure that both sets have the exact same sort content, which may be \ achieved via "patching" suitable nil-elements if necessary as follows: >>> setA.patchNilElements(setB) >>> setB.patchNilElements(setA) >>> spanPredAB = Z3TypedSetSpanPredicateTemplate('predAB', setA, setB, isMonic=True) """ # input data consistency check: if setA.solv.ctx != setB.solv.ctx: raise ValueError('Inconsistency of contexts detected between setA and setB!') # IMPORTANT: the span must only match elements of the *same* sort, # which is why an exception is raised if the sort-sets of # setA and setB do not coincide if setA.sorts != setB.sorts: raise ValueError('The sort-sets of setA and setB must match!') else: self.sorts = setA.sorts self.name = sName self.setA = setA self.setB = setB self.solv = setA.solv ctx = self.solv.ctx self.subPs = {} self.asts = [] for srt in self.sorts: pred = z3.Function(sName + '|' + srt, setA.subsets[srt], setB.subsets[srt], z3.BoolSort(ctx)) self.subPs[srt] = pred self.asts += [pred(setA.nilEls[srt], setB.nilEls[srt]) == True] # if the span is in addition monic, render the corresponding "bi-injectivity" assertions: if isMonic: a1 = z3.Const('a1|' + srt, setA.subsets[srt]) a2 = z3.Const('a2|' + srt, setA.subsets[srt]) b1 = z3.Const('b1|' + srt, setB.subsets[srt]) b2 = z3.Const('b2|' + srt, setB.subsets[srt]) self.asts += [z3.ForAll([a1, a2, b1], z3.Implies(z3.And(pred(a1, b1) == True, pred(a2, b1) == True), a1 == a2)), z3.ForAll([a1, b1, b2], z3.Implies(z3.And(pred(a1, b1) == True, pred(a1, b2) == True), b1 == b2)) ] # add all assertions to the solver: self.solv.add(self.asts)
[docs] def evaluateModel(self, M: z3.ModelRef) -> Set[Tuple[str, str]]: """Evaluate the structure of a model for a typed set overlap. Args: M: a Z3 model reference (whose context must coincide with ``self.solv.ctx``) Returns: A set of tuples of element names encoding the overlap structure. """ # consistency check: verify that the context of the model coincides with the one of the # solver instance of the predicate: if M.ctx != self.solv.ctx: raise ValueError('Mismatch of model and predicate Z3 solver contexts detected!') # render a representation of the overlap structure from the model: oSet = set() for srt in self.sorts: # extract the non-nil element-name/Z3 constant dictionary for each set: Aels = {elName: el for elName, el in self.setA.subsetNamesToEls[srt].items() if elName[:3] != 'nil'} Bels = {elName: el for elName, el in self.setB.subsetNamesToEls[srt].items() if elName[:3] != 'nil'} # the sub-predicate for the sort: pred = self.subPs[srt] for aName, aEl in Aels.items(): for bName, bEl in Bels.items(): if M.eval(pred(aEl, bEl)): # if the predicate evaluates to True... oSet = oSet.union({(aName, bName)}) # ... add this pair of elements to oSet # return the overlap data: return oSet
[docs] def genModelBlockingAsts(self, M: z3.ModelRef) -> z3.BoolRef: """Generate assertions to block a given model. Args: M: a Z3 model instance (whose context must coincide with ``self.solv.ctx``) Returns: A ``z3.Or`` expression over a list of assertions of inequality of the predicate values \ and the model values over the range of all non-nil elements. """ # consistency check: verify that the context of the model coincides with the one of the # solver instance of the predicate: if M.ctx != self.solv.ctx: raise ValueError('Mismatch of model and predicate Z3 solver contexts detected!') # generate the list of sub-assertions: blockAsts = [] for srt in self.sorts: # extract the non-nil element-name/Z3 constant dictionary for each set: Aels = {elName: el for elName, el in self.setA.subsetNamesToEls[srt].items() if elName[:3] != 'nil'} Bels = {elName: el for elName, el in self.setB.subsetNamesToEls[srt].items() if elName[:3] != 'nil'} pred = self.subPs[srt] for a in Aels.values(): for b in Bels.values(): blockAsts += [pred(a, b) != M.eval(pred(a, b))] # return the full "blocking" assertion: return z3.Or(blockAsts)
[docs]class Z3TDGspanPredicateTemplate: """A class for TDG monic span predicate templates to search for spans of TDGs. .. note:: The typing of vertices and edges implies that the monic span must be "sort-compatible", \ i.e. encodes a sort-preserving injective partial function. """
[docs] def __init__(self, sName: str, gA: Z3TypedDirectedGraph, gB: Z3TypedDirectedGraph) -> None: """Initialization of a boolean span predicate template. It is important to note that the two input TDGs must be **sort-compatible**, i.e. if the \ input graphs do not satisfy this property a priori, one must invoke the \ ``.patchNilElements`` method in order to fix the sort-matching. Args: sName: the name of the boolean span predicate gA: the first ``Z3TypedDirectedGraph`` instnce ("left leg" of the span) gB: the second ``Z3TypedDirectedGraph`` instnce ("right leg" of the span) Example: >>> S = z3.Solver() >>> vDictA = {'v1':'sortX', 'v2': 'sortX', 'v3': 'sortY', 'v4': 'sortZ'} >>> eDictA = {'e1':'sortW', 'e2': 'sortT'} >>> stDictA = {'e1':('v1','v2'), 'e2':('v3','v1')} >>> graphA = Z3TypedDirectedGraph(S, 'gA', vDictA, eDictA, stDictA) >>> vDictB = {'v1':'sortX', 'v2': 'sortX', 'v3': 'sortX', 'v4': 'sortY'} >>> eDictB = {'e1':'sortW', 'e2': 'sortT', 'e3': 'sortT'} >>> stDictB = {'e1':('v2','v1'), 'e2':('v4','v3'), 'e3':('v4','v1')} >>> graphB = Z3TypedDirectedGraph(S, 'gB', vDictB, eDictB, stDictB) Next, we must adapt the sorts suitably via patching of nil-elements: >>> graphA.patchNilElements(graphB) >>> graphB.patchNilElements(graphA) Finally, we may instantiate the ``Z3TDGspanPredicateTemplate`` for the two TDGs: >>> spanPredAB = Z3TDGspanPredicateTemplate('spanPredAB', graphA, graphB) """ # verify the sort-compatibility of gA and gB: if not gA.isSortCompatibleWith(gB): raise ValueError('Sort-mismatch of input graphs detected!') self.name = sName self.gA = gA self.gB = gB # note: the Z3 API will automatically raise an exception if gA.solv != gB.solv later in the code! self.solv = gA.solv ctx = self.solv.ctx # vertex-part of span predicate template: self.sV = Z3TypedSetSpanPredicateTemplate(sName + '|V', gA.vertices, gB.vertices, isMonic=True) # edge-part of span predicate template: self.sE = Z3TypedSetSpanPredicateTemplate(sName + '|E', gA.edges, gB.edges, isMonic=True) # store all assertions up until now in a field for later convenience: self.asts = self.sV.asts + self.sE.asts # rendering of TDG homomorphism assertions (i.e. if an edge is in the overlap of gA and gB # encoded by the span predicate, then also its endpoints must be): astsAux = [] for eSrt in gA.edges.sorts: eA = z3.Const('a|E|' + eSrt, gA.edges.subsets[eSrt]) eB = z3.Const('b|E|' + eSrt, gB.edges.subsets[eSrt]) sSrt, tSrt = gA.stSortsDct[eSrt] # read out the source/target sorts for edge sort eSrt srcA, trgtA = gA.src.subFunctions[(eSrt, sSrt)], gA.trgt.subFunctions[(eSrt, tSrt)] srcB, trgtB = gB.src.subFunctions[(eSrt, sSrt)], gB.trgt.subFunctions[(eSrt, tSrt)] astsAux += [z3.ForAll([eA, eB], z3.Implies(self.sE.subPs[eSrt](eA, eB) == True, z3.And( self.sV.subPs[sSrt](srcA(eA), srcB(eB)) == True, self.sV.subPs[tSrt](trgtA(eA), trgtB(eB)) == True ) ) ) ] self.solv.add(astsAux) self.asts += astsAux
[docs] def evaluateModel(self, M: z3.ModelRef) -> Tuple[Set[Tuple[str, str]], Set[Tuple[str, str]]]: """Evaluate the structure of a model for a TDG overlap. Args: M: a Z3 model reference (whose context must coincide with ``self.solv.ctx``) Returns: A pair of sets of tuples of element names encoding the overlap structure, with the \ first set for the vertices and the second one for the edges. """ vertexOverlap = self.sV.evaluateModel(M) edgeOverlap = self.sE.evaluateModel(M) return vertexOverlap, edgeOverlap
[docs] def genModelBlockingAsts(self, M: z3.ModelRef) -> z3.BoolRef: """Generate assertions to block a given model. Args: M: a Z3 model instance (whose context must coincide with ``self.solv.ctx``) Returns: A ``z3.Or`` expression over a list of assertions of inequality of the predicate values \ and the model values over the range of all non-nil elements. """ astsVertexPart = self.sV.genModelBlockingAsts(M) astsEdgePart = self.sE.genModelBlockingAsts(M) # return the full "blocking" assertion: return z3.Or(astsVertexPart, astsEdgePart)
[docs]def generateTDGpushout(gA: Z3TypedDirectedGraph, gB: Z3TypedDirectedGraph, PhiData: Tuple[Set[Tuple[str, str]], Set[Tuple[str, str]]]) -> \ Tuple[Dict[str, str], Dict[str, str], Dict[str, Tuple[str, str]]]: """Generate the pushout of two TDGs along a given (monic) overlap. Args: gA: a ``Z3TypedDirectedGraph`` instance gB: a ``Z3TypedDirectedGraph`` instance PhiData: a tuple of dictionaries encoding vertex- and edge-overlaps (i.e. every \ element name pair encodes that these elements are in the overlap of ``gA`` and ``gB``) Returns: A triple of data encoding the pushout object, consisting of - a dictionary of ``vertex name: vertex sort`` entries - a dictionary of ``edge name: edge sort`` entries - a dictionary of ``edge name: (source vertex name, target vertex name)`` entries. .. note:: This data may be utilized to instantiate a TDG that represents this pushout object \ (e.g. for purposes of visualization). """ # input data consistency checks: if gA.solv.ctx != gB.solv.ctx: raise ValueError('gA and gB do not share the same solver instance!') if not gA.isSortCompatibleWith(gB): raise ValueError('gA and gB are not sort-compatible!') # set up auxiliary dictionaries encoding the vertex- and edge-parts of the overlap data: phiV = dict(PhiData[0]) phiE = dict(PhiData[1]) # assemble the vertex- and edge-dictionaries (with element name: element sort entries) # for the pushout object (following a certain naming convention): # NOTE: it is here (in the first line below) that we explicitly utilize that Phi only overlaps elements # of the *same* sort, and that the sorts of gA and gB match! vsP = {**{(a, b): gA.vertices.els[a] for a, b in phiV.items()}, **{(a, 'nil|' + aSrt): aSrt for a, aSrt in gA.vertices.els.items() if a not in phiV.keys()}, **{('nil|' + bSrt, b): bSrt for b, bSrt in gB.vertices.els.items() if b not in phiV.values()}} esP = {**{(a, b): gA.edges.els[a] for a,b in phiE.items()}, **{(a, 'nil|' + aSrt): aSrt for a, aSrt in gA.edges.els.items() if a not in phiE.keys()}, **{('nil|' + bSrt, b): bSrt for b, bSrt in gB.edges.els.items() if b not in phiE.values()}} # set up the source-target dictionaries for the pushout object: stP = {} # part for edges in the overlap: for eA, eB in phiE.items(): stP[(eA, eB)] = (lambda x, y: ((x[0], y[0]), (x[1], y[1])))(gA.stDict[eA], gB.stDict[eB]) # part for edges only in gA (i.e. not in the overlap): esAonly = {eA: eSrt for eA, eSrt in gA.edges.els.items() if eA not in phiE.keys()} vsAB = {a:b for a,b in vsP.keys() if a[:3] != 'nil'} for eA, eSrt in esAonly.items(): eB = 'nil|' + eSrt stP[(eA, eB)] = (lambda x: ((x[0], vsAB[x[0]]), (x[1], vsAB[x[1]])))(gA.stDict[eA]) # part for edges only in gB (i.e. not in the overlap): esBonly = {eB: eSrt for eB, eSrt in gB.edges.els.items() if eB not in phiE.values()} vsBA = {b: a for a, b in vsP.keys() if b[:3] != 'nil'} for eB, eSrt in esBonly.items(): eA = 'nil|' + eSrt stP[(eA, eB)] = (lambda x: ((vsBA[x[0]], x[0]), (vsBA[x[1]], x[1])))(gB.stDict[eB]) # return the data that encodes the pushout object P (in a format where all keys are converted to strings): vsPout = {str(v): vSrt for v, vSrt in vsP.items()} esPout = {str(e): eSrt for e, eSrt in esP.items()} stPout = {str(e): (str(vs[0]), str(vs[1])) for e, vs in stP.items()} return vsPout, esPout, stPout
#%% Data classes and auxiliary methods to store solver experiment data
[docs]class Z3statsData: """Auxiliary data class to store and evaluate Z3 statistics data. The main purpose of this class is to facilitate the evaluation of ``Z3`` solver statistics output \ from the various algorithms in this package, and to provide a template for various extensions \ and custom data access methods. """ def __init__(self) -> None: self.stats = {}
[docs] def storeStatsAsDict(self, rKey: str, stats: z3.Statistics) -> None: """Store Z3 statistics data from a solver run into a dictionary. Args: rKey: the key under which to store the statistics data of the solver run stats: a ``z3.Statistics`` instance Returns: A dictionary with one key-valye pair per entry in ``self.stats[rKey]``. Example: >>> S = z3.Solver() >>> x, y = z3.Ints('x y') >>> S.add(x > y) >>> S.check() >>> statsData = Z3statsData() >>> statsData.storeStatsAsDict('run1', 'test', S.statistics()) >>> statsDct = statsData.stats """ sDct = {key: str(stats.get_key_value(key)) for key in stats.keys()} self.stats[rKey] = sDct
[docs]class Z3directMethodStatsData(Z3statsData): """Auxiliary data class to store and evaluate Z3 statistics data for the "direct" method. In contrast to the ``Z3statsData`` parent class, instances of this subclass store some additional \ data specific to the "direct" (i.e. the "generate-and-test") method, in particular run-times and \ statistics data from the two different phases. For convenience in comparing the *overall* performance \ of this method against other methods, the subclass exposes the same fields as the parent class, \ which hold the joint information on both phases of the "direct" method. """ def __init__(self) -> None: Z3statsData.__init__(self) self.statsPhases = {}
[docs] def storeStatsAsDictForPhase(self, rKey: str, stats: z3.Statistics, phase: str) -> None: """Store Z3 statistics data from a solver run into a dictionary. Args: rKey: the key under which to store the statistics data of the solver run stats: a ``z3.Statistics`` instance phase: a string (either ``'generate'`` or ``'test'``) for encoding the phase in which the \ statistics data was collected Returns: A dictionary with one key-valye pair per entry in ``self.stats[rKey]`` as well as a copu \ in the phase-specific statistics dictionary field ``self.statsPhases[phase][rKey]``. Example: >>> S = z3.Solver() >>> x, y = z3.Ints('x y') >>> S.add(x > y) >>> S.check() >>> statsData = Z3directMethodStatsData() >>> statsData.storeStatsAsDictForPhase('run1', 'test', S.statistics()) >>> statsDct = statsData.stats """ sDct = {key: str(stats.get_key_value(key)) for key in stats.keys()} self.stats[phase + '-' + rKey] = sDct # for the special case that this is the first call to # the self.statsPhases dictionary at key 'phase', instantiate # an empty dictionary first: if self.statsPhases.get(phase, {}) == {}: self.statsPhases[phase] = {} self.statsPhases[phase][rKey] = sDct
[docs]class Z3overlapsData: """Data class for storing and evaluating overlap data of typed directed graphs."""
[docs] def __init__(self, gA: Z3TypedDirectedGraph, gB: Z3TypedDirectedGraph, overlaps: List[Tuple[Set[Tuple[str, str]], Set[Tuple[str, str]]]]) -> None: """Store overlap data into a Z3overlapData class instance. Args: gA: a ``Z3TypedDirectedGraph`` instance gB: a ``Z3TypedDirectedGraph`` instance overlaps: a list of overlaps of ``gA`` and ``gB``, where each overlap is encoded as a \ tuple of sets, the first of which encodes the vertex overlaps (in the format ``(vA, vB)`` \ if ``vA`` is overlapped with ``vB``), and analogously for the edges """ self.gA = gA self.gB = gB self.overlaps = overlaps
[docs] def renderPushoutObject(self, S: z3.Solver, idx: int, gPOname: str) -> Z3TypedDirectedGraph: """Render a ``Z3TypedDirectedGraph`` instance encoding a pushout from overlap data. Args: S: a ``Z3`` solver instance idx: the index of the overlap in the list of overlaps gPOname: the name to be given to the pushout object Returns: A ``Z3TypedDirectedGraph`` instance encoding the pushout object of ``self.gA`` with \ ``self.gB`` along the overlap ``self.overlaps[idx]``. """ # input data consistency check: if idx < 0 or idx > (len(self.overlaps) - 1): raise ValueError('Index out of range! Permissible values:\t[0,%s]' % (len(self.overlaps) - 1)) POvs, POes, POst = generateTDGpushout(self.gA, self.gB, self.overlaps[idx]) return Z3TypedDirectedGraph(S, gPOname, POvs, POes, POst)
[docs]class Z3experimentsData: """Data class for storing outputs of individual experiments."""
[docs] def __init__(self, cTimes: List[float], statsData: Z3statsData, statsStr: str, overlapsData: Z3overlapsData, timeOutRunID: str) -> None: """Store data from experiments. Args: cTimes: a list of computation times (in seconds), where the \ last entry encodes the time needed to determine ``unsat`` statsData: a dictionary with keys the run-ids and values dictionaries \ encoding statistical information on the individual solver runs statsStr: the string of statistics data generated from teh last solver run overlapsData: a ``Z3overlapsData`` instance encoding a list of overlaps (as well as \ the data on the two input TDGs of the given routine necessary in order to \ permit e.g. rendering pushout objects) timeOutRunID: a string encoding the id of the run that led to a solver time-out; \ in case no time-out occurred, the default value of this entry is "-" """ self.cTimes = cTimes self.statsData = statsData self.statsStr = statsStr self.overlapsData = overlapsData self.timeOutRunID = timeOutRunID
[docs] def getTotalRuntime(self) -> float: return sum(self.cTimes)
[docs] def getAverageRuntime(self) -> float: """Compute the average run-time (excluding the ``unsat`` run).""" return sum(self.cTimes[:-1]) / (len(self.cTimes) - 1)
[docs] def getMaxMemory(self, rKey: str) -> float: """Maximum memory value for a specific run. Args: rKey: the key identifying the solver run in the ``self.statsData.stats`` dictionary Returns: The maximum memory value of the run (in MB). """ return float(self.statsData.stats[rKey])
[docs] def getOverallMaxMemory(self) -> float: """Maximum memory consumption value over all runs.""" return max([float(self.statsData.stats[rKey]['max memory']) for rKey in self.statsData.stats.keys()])
[docs] def getAverageMemory(self) -> float: """Average memory consumption over all runs (including ``unsat`` run).""" rKeys = self.statsData.stats.keys() return sum([float(self.statsData.stats[rKey]['memory']) for rKey in rKeys])/len(rKeys)
#%% Three different strategies to compute TDG overlaps ("DPE", "direct" and "implicit"; cf. GCM 2020 paper # by Behr et al. for background information on these algorithm variants)
[docs]def generateTDGoverlapsDPEstrategy(gA: Z3TypedDirectedGraph, gB: Z3TypedDirectedGraph, frClist: List[Z3TDGfrContainer]) -> Z3experimentsData: """Generate overlaps of TDGs compliant with forbidden relation constraints. Args: gA: a ``Z3TypedDirectedGraph`` instance gB: a ``Z3TypedDirectedGraph`` instance frClist: a list of ``Z3TDGfrContainer`` instances encoding the forbidden relations Returns: The following data is generated: - A list of computation times for each individual result. - A dictionary with a complete set of statistics information for ecah ``Z3`` solver run - A string containing some statistics on the overall performance of the ``Z3`` solver runs. - A list of valid partial overlaps of the two TDGs, encoded as Tuples of sets of element name pairs \ (with the first set coding for the vertices, and the second for the edges). The data is returned in the form of a ``Z3experimentsData`` instance. .. note:: The design choice to encode the forbidden relation data in the form of ``Z3TDGfrContainer`` \ instances has been taken in order to permit the pre-computation of the (static) data on forbidden \ relations given the particular set of negative pattern constraints for the problem at hand. For the \ overlap finding routine itself, only a computationally inexpensive translation of the resulting \ double-pullback embedding (DPE) constraints for the input TDGs ``gA`` and ``gB`` is rendered, so \ that the computational price for computing the forbidden relation data must only be paid once before \ a possibly large set of overlap-searching runs is executed. """ # consistency check: verify that the solver instances of both TDGs coincide: if gA.solv != gB.solv: raise ValueError('Only TDGs instantiated over the same Z3 solver instance can be accepted for input!') else: solv = gA.solv logging.info('-----------|\tSTART OF EXPERIMENT (DPE STRATEGY)\t|-----------\n\n') # render a TDG boolean monic span predicate template: PT = Z3TDGspanPredicateTemplate(gA.name + '|span|' + gB.name, gA, gB) # instantiate forbidden relation non-DPE assertions specific to this template: for frC in frClist: frC.genFRasts(PT) # initialize an empty list of computation times and an empty list of TDG overlaps: cTimes = [] overlaps = [] # instantiate a Z3statsData instance for storing statistics information on the Z3 solver runs: statsData = Z3statsData() # initial solver run: n = 0 timeOutRunId = '-' start = timeit.default_timer() chk = solv.check() t = timeit.default_timer() - start cTimes += [t] # for the special case that the solver timed out (if the external solver instance had a time-out set), # this information is recorded: if chk == z3.unknown: ru = str(solv.reason_unknown()) logging.info('\txxx Timeout detected in run no. %s (reason unknown:\t%s)! xxx' % (n + 1, ru)) print('\txxx Timeout detected in run no. %s (reason unknown:\t%s)! xxx' % (n + 1, ru)) timeOutRunId = 'run#' + str(1) while chk == z3.sat: n += 1 # render a model: M = solv.model() # store the statistics data from the current solver run into statsData: statsData.storeStatsAsDict('run#' + str(n), solv.statistics()) # extract the overlap structure encoded in the model: overlaps += [PT.evaluateModel(M)] logging.info('--|\tOverlap detected (exec time\t%ss):\n\t\t\t%s\n' % (round(t,6), overlaps[-1])) # "block" the present solution in preparation of the next solver run: solv.add(PT.genModelBlockingAsts(M)) # next solver run: start = timeit.default_timer() chk = solv.check() t = timeit.default_timer() - start cTimes += [t] # for the special case that the solver timed out (if the external solver instance had a time-out set), # this information is recorded: if chk == z3.unknown: ru = str(solv.reason_unknown()) logging.info('\txxx Timeout detected in run no. %s (reason unknown:\t%s)! xxx' % (n+1, ru)) print('\txxx Timeout detected in run no. %s (reason unknown:\t%s)! xxx' % (n + 1, ru)) timeOutRunId = 'run#' + str(n + 1) break # output some logging information on the overlap search: logging.info('\nTime needed to reach %s:\t%ss\n' % (chk, round(t, 5))) logging.info('Number of solutions found:\t%s\n' % (len(cTimes) - 1)) logging.info('Total computation time:\t%ss\n' % round(sum(cTimes), 6)) logging.info('\n -----------|\tEND OF EXPERIMENT\t|-----------') # extract some statistics information on the overall performance of the Z3 solver runs: statsStr = str(solv.statistics())[1:-1] # final output of results: overlapsData = Z3overlapsData(gA, gB, overlaps) return Z3experimentsData(cTimes, statsData, statsStr, overlapsData, timeOutRunId)
[docs]def generateTDGoverlapsDirectStrategy(gA: Z3TypedDirectedGraph, gB: Z3TypedDirectedGraph, forbiddenPatternsList: List[Z3TypedDirectedGraph])\ -> Z3experimentsData: """Generate overlaps of TDG modulo forbidden patterns via the "direct" strategy. The "direct" strategy (also referred to as the "generate-and-test" strategy) for finding overlaps \ of two typed directed graphs such that each overlap posesses a pushout that does not contain any \ of a list of "forbidden patterns" consists of the following phases: 1. *Generation phase:* determine all (monic) overlaps *without* taking into account the forbidden \ pattern constraints 2. *Test phase:* for each candidate overlap, form the pushout object and determine whether or not \ the pushout object possesses any of the "forbidden patterns" as a subgraph; if not, include the \ overlap in the list of results Args: gA: a ``Z3TypedDirectedGraph`` instance gB: a ``Z3TypedDirectedGraph`` instance forbiddenPatternsList: a list of ``Z3TypedDirectedGraph`` instances, each of which encodes one \ "forbidden pattern" Returns: The output is divided into three parts: - a list of computation times for the individual ``Z3`` solver runs for determining the \ unconstrainted overlaps in the "generation phase" - statsistics data on the ``Z3`` solver runs in the "generation" phase - a string containing some information on the performance of the ``Z3`` solver runs (such as \ memory consumption etc.) in the first phase - a list of tuples of the form ``(t, b, gPO)``, where ``t`` is the computation time needed \ by the ``Z3`` solver to determine the "test phase" computation for the given candidate \ overlap, ``b`` is a Boolean value (where ``True`` indicates that the overlap is consistent \ with the constraints), and where ``gPO`` is the pushout graph of the overlap - statsistics data on the ``Z3`` solver runs in the "test" phase - a string containing some information on the performance of the ``Z3`` solver runs (such as \ memory consumption etc.) in the second phase This data is returned in the form of a ``Z3experimentsData`` instance. """ # consistency check: verify that the solver instances of both TDGs coincide: if gA.solv != gB.solv: raise ValueError('Only TDGs instantiated over the same Z3 solver instance can be accepted for input!') else: solv = gA.solv logging.info('-----------|\tSTART OF EXPERIMENT (DIRECT STRATEGY)\t|-----------\n\n') logging.info('>>>>>>>>>>>|\tPhase 1: generation of candidate overlaps\t|<<<<<<<<<<<\n\n') # NOTE: this phase is precisely a run of the DPE overlap generation method for an # empty list of "forbidden relations"! frList = [] # <- the empty list of "forbidden relations" phase1Data = generateTDGoverlapsDPEstrategy(gA, gB, frList) cTimes = phase1Data.cTimes statsDataPhase1 = phase1Data.statsData statsStr = phase1Data.statsStr rawOverlapsData = phase1Data.overlapsData timeOutDataPhase1 = phase1Data.timeOutRunID # transfer the statistics data from phase 1 into a Z3directMethodStatsData data container: mainStatsData = Z3directMethodStatsData() mainStatsData.statsPhases['generate'] = {} for rKey, data in statsDataPhase1.stats.items(): mainStatsData.stats['generate-' + rKey] = data mainStatsData.statsPhases['generate'][rKey] = data logging.info('>>>>>>>>>>>|\tPhase 2: testing the candidate overlaps\t|<<<<<<<<<<<\n\n') # a list of computation times for each of the verification runs: vTimes = [] # a list for storing the overlap data output of the method: output = [] timeOutDataPhase2 = '-' for idx, o in enumerate(rawOverlapsData.overlaps): start = timeit.default_timer() # instantiate an auxiliary solver instance in order to check for the forbidden pattern non-embedding # constraint satisfaction: Saux = z3.Solver() logging.info('\n\t--> Entering verification of overlap candidate no.\t%s:\n' % (idx+1)) logging.info('\n\t Overlap specification:\n\n\t \t%s\n' % list(o)) # form the pushout object of the candidate overlap: # POvs, POes, POst = generateTDGpushout(gA, gB, o) # gPO = Z3TypedDirectedGraph(Saux, 'gPO', POvs, POes, POst) gPO = rawOverlapsData.renderPushoutObject(Saux, idx, 'gPO') # generate the negative-pattern non-embedding assertions: astsNE = [] for gN in forbiddenPatternsList: astsNE += gPO.renderNonEmbeddingAssertion(gN) Saux.add(astsNE) logging.info('\n\n\t\t ----> Entering Z3 solver run...\n\n') chk = Saux.check() t = timeit.default_timer() - start vTimes += [t] # store the statistics information for the current solver run into mainStatsData: mainStatsData.storeStatsAsDictForPhase('run#' + str(idx), Saux.statistics(), 'test') # store the outcome of the solver check into a field of the statistics dictionary: mainStatsData.statsPhases['test']['run#' + str(idx)]['chk result'] = str(chk) if chk == z3.sat: b = True logging.info('\nThe pushout is consistent with the constraints!') elif chk == z3.unsat: b = False logging.info('\nThe pushout is NOT consistent with the constraints!') elif chk == z3.unknown: ru = str(solv.reason_unknown()) logging.info('\txxx Timeout detected in run no. %s (reason unknown:\t%s)! xxx' % (idx, ru)) print('\txxx Timeout detected in run no. %s (reason unknown:\t%s)! xxx' % (idx, ru)) timeOutDataPhase2 = 'run#' + str(idx) break # store the data into the output list: output += [(t, b, o, gPO)] # extract some statistics etc. from the last used Saux solver instance (such as memory consumption etc.): statsStrPhase2 = str(Saux.statistics())[1:-1] # output some logging information on the overlap search: logging.info('\n\n\t\t--> Total computation time:\t%ss\n' % round(sum(vTimes), 6)) logging.info('\t\t--> Z3 solver statistics for phase 2:\n%s\n\n' % statsStrPhase2) logging.info('\n -----------|\tEND OF EXPERIMENT\t|-----------') # final output of results: runTimes = cTimes + vTimes overlapsDataFinal = Z3overlapsData(gA, gB, [o for t, b, o, gPO in output if b == True]) timeOutRunId = '(%s,%s)' % (timeOutDataPhase1, timeOutDataPhase2) return Z3experimentsData(runTimes, mainStatsData, statsStrPhase2, overlapsDataFinal, timeOutRunId)
[docs]def generateTDGoverlapsImplicitStrategy(gA: Z3TypedDirectedGraph, gB: Z3TypedDirectedGraph, forbiddenPatternsList: List[Z3TypedDirectedGraph])\ -> Z3experimentsData: """Generate overlaps of TDGs compliant with forbidden pattern constraints via the "implicit" strategy. For the "implicit" strategy, certain features of the ``Z3`` API (i.e. the ability to utilize \ ``z3.EnumSort`` encodigns for the explicitly specified input TDGs ``gA`` and ``gB``, but an \ *implicit* encoding for a pushout graph via ``z3.Const`` calls) permits to verify "on-the-fly" \ whether or not a given candidate overlap would violate the "forbidden pattern" constraints. \ As such, this strategy poses an alternative to the DPE strategy, and differs from the "direct" \ strategy in that the verification of pushout consistency is only performed in the latter *after* \ generating all candidate overlaps without the constraints. Args: gA: a ``Z3TypedDirectedGraph`` instance gB: a ``Z3TypedDirectedGraph`` instance forbiddenPatternsList: a list of ``Z3TypedDirectedGraph`` instances encoding the "forbidden patterns" Returns: The following data is generated: - A list of computation times for each individual result. - A dictionary with a complete set of statistics information for ecah ``Z3`` solver run - A string containing some statistics on the overall performance of the ``Z3`` solver runs. - A list of valid partial overlaps of the two TDGs, encoded as Tuples of sets of element name pairs \ (with the first set coding for the vertices, and the second for the edges). This set of data is returned in the form of a ``Z3experimentsData`` instance. """ # consistency check: verify that the solver instances of both TDGs coincide: if gA.solv != gB.solv: raise ValueError('Only TDGs instantiated over the same Z3 solver instance can be accepted for input!') else: solv = gA.solv logging.info('-----------|\tSTART OF EXPERIMENT (IMPLICIT STRATEGY)\t|-----------\n\n') # set up a TDG template for the pushout object gPO: vSortsAB = gA.vertices.sorts eSortsAB = gA.edges.sorts stSortsDctAB = gA.stSortsDct gPO = Z3TDGtemplate(solv, 'gPO', vSortsAB, eSortsAB, stSortsDctAB) # instantiate templates for injective TDG homomorphisms (encoding the cospan morphisms into # the pushout object gPO): alpha = Z3TDGmorphism('alpha', gA, gPO, {('nil|' + vSrt): ('nil|' + vSrt) for vSrt in vSortsAB}, {('nil|' + eSrt): ('nil|' + eSrt) for eSrt in eSortsAB}, isInjective=True) beta = Z3TDGmorphism('beta', gB, gPO, {('nil|' + vSrt): ('nil|' + vSrt) for vSrt in vSortsAB}, {('nil|' + eSrt): ('nil|' + eSrt) for eSrt in eSortsAB}, isInjective=True) # render a TDG boolean monic span predicate template: PT = Z3TDGspanPredicateTemplate(gA.name + '|span|' + gB.name, gA, gB) # render the assertions that alpha and beta are # 1) jointly epimorphic, and # 2) compatible with the overlap data that will be contained in Phi astsJE = [] astsPC = [] for vSrt in vSortsAB: x = z3.Const('_x|V|' + vSrt, gA.vertices.subsets[vSrt]) y = z3.Const('_y|V|' + vSrt, gB.vertices.subsets[vSrt]) z = z3.Const('_z|V|' + vSrt, gPO.vertices.subsets[vSrt]) alphaF = alpha.vMorph.subFunctions[(vSrt, vSrt)] betaF = beta.vMorph.subFunctions[(vSrt, vSrt)] PhiF = PT.sV.subPs[vSrt] astsJE += [z3.ForAll(z, z3.Or(z3.Exists(x, alphaF(x) == z), z3.Exists(y, betaF(y) == z)))] astsPC += [z3.ForAll([x, y], z3.If(PhiF(x, y) == True, alphaF(x) == betaF(y), alphaF(x) != betaF(y)))] for eSrt in eSortsAB: x = z3.Const('_x|E|' + eSrt, gA.edges.subsets[eSrt]) y = z3.Const('_y|E|' + eSrt, gB.edges.subsets[eSrt]) z = z3.Const('_z|E|' + eSrt, gPO.edges.subsets[eSrt]) alphaF = alpha.eMorph.subFunctions[(eSrt, eSrt)] betaF = beta.eMorph.subFunctions[(eSrt, eSrt)] PhiF = PT.sE.subPs[eSrt] astsJE += [z3.ForAll(z, z3.Or(z3.Exists(x, alphaF(x) == z), z3.Exists(y, betaF(y) == z)))] astsPC += [z3.ForAll([x, y], z3.If(PhiF(x, y) == True, alphaF(x) == betaF(y), alphaF(x) != betaF(y)))] # upload the two assertion sets into the solver: solv.add(astsJE + astsPC) # instantiate the "forbidden pattern" non-embedding assertions on the implicit pushout object gPO: astsNE = [] for gN in forbiddenPatternsList: astsNE += gPO.renderNonEmbeddingAssertion(gN) # upload the non-embedding assertions into the solver: solv.add(astsNE) # initialize an empty list of computation times and an empty list of TDG overlaps: cTimes = [] overlaps = [] # instantiate a Z3statsData instance for storing statistics information on the Z3 solver runs: statsData = Z3statsData() # initial solver run: n = 0 timeOutRunId = '-' start = timeit.default_timer() chk = solv.check() t = timeit.default_timer() - start cTimes += [t] # for the special case that the solver timed out (if the external solver instance had a time-out set), # this information is recorded: if chk == z3.unknown: ru = str(solv.reason_unknown()) logging.info('\txxx Timeout detected in run no. %s (reason unknown:\t%s)! xxx' % (n + 1, ru)) print('\txxx Timeout detected in run no. %s (reason unknown:\t%s)! xxx' % (n + 1, ru)) timeOutRunId = 'run#' + str(n + 1) while chk == z3.sat: n += 1 # render a model: M = solv.model() # store the statistics data from the current solver run into statsData: statsData.storeStatsAsDict('run#' + str(n), solv.statistics()) # extract the overlap structure encoded in the model: overlaps += [PT.evaluateModel(M)] logging.info('--|\tOverlap detected (exec time\t%ss):\n\t\t\t%s\n' % (round(t, 6), overlaps[-1])) # "block" the present solution in preparation of the next solver run: solv.add(PT.genModelBlockingAsts(M)) # next solver run: start = timeit.default_timer() chk = solv.check() t = timeit.default_timer() - start cTimes += [t] # for the special case that the solver timed out (if the external solver instance had a time-out set), # this information is recorded: if chk == z3.unknown: ru = str(solv.reason_unknown()) logging.info('\txxx Timeout detected in run no. %s (reason unknown:\t%s)! xxx' % (n + 1, ru)) print('\txxx Timeout detected in run no. %s (reason unknown:\t%s)! xxx' % (n + 1, ru)) timeOutRunId = 'run#' + str(n + 1) break # output some logging information on the overlap search: logging.info('\nTime needed to reach %s:\t%ss\n' % (chk, round(t, 5))) logging.info('Number of solutions found:\t%s\n' % (len(cTimes) - 1)) logging.info('Total computation time:\t%ss\n' % round(sum(cTimes), 6)) logging.info('\n -----------|\tEND OF EXPERIMENT\t|-----------') # extract some statistics information on the overall performance of the Z3 solver runs: statsStr = str(solv.statistics())[1:-1] # final output of results: overlapsData = Z3overlapsData(gA, gB, overlaps) return Z3experimentsData(cTimes, statsData, statsStr, overlapsData, timeOutRunId)
#%% Auxiliary methods for facilitating the generation of experimental data
[docs]class ExperimentSpec: """"Auxiliary data class to collect parameters for experiments with the overlap finding methods."""
[docs] def __init__(self, expName: str, graphDataFname: str, forbiddenPatternDataFname: str, strategyName: str, numReps: int, expsData: List[Tuple[str, str]], maxRunTime: int = 0) -> None: """Initialize a specification of experimental data. Args: expName: a string containing the name of the experiment graphDataFname: filename (possibly including a path) of a JSON file containing data defining \ TDGs forbiddenPatternDataFname: filename (possibly including a path) of a JSON file containing data \ defining forbidden patterns (plus potentially also forbidden relation data) strategyName: a string that evaluates to one of ``'direct'``, ``'DPE'``, ``'implicit'`` \ or ``'noConstraints'`` (encoding the strategy to be utilized in the given sub-experiment) numReps: an integer denoting the number of repetitions of the sub-experiment expsData: a list of tuples of the form ``(gAname, gBname)``, with ``gAname`` and ``gBname`` \ the names of TDGs in ``graphDataFname`` whose overlaps should be computed maxRunTime: (optional) an **integer** encoding a limit (in s) for the individual run times \ (default: ``0``, encoding no limit on the run times) """ self.expName = expName self.graphDataFname = graphDataFname self.forbiddenPatternDataFname = forbiddenPatternDataFname self.strategyName = strategyName self.numReps = numReps self.expsData = expsData self.maxRunTime = maxRunTime
[docs] def storeExpsDataToJSONfile(self, fName: str) -> None: """Log the info of a given set of experiments to the file that will hold the results data. The method logs all input data specifying the experiment to the JSON file identified via \ ``resultsFname``, i.e. it adds to the key ``expName`` a new field ``'expSpecs'`` containing \ a dictionary containing all data necessary to specify the given experiment. Args: fName: the file name (possibly including a path) to a JSON file """ # determine if the data file already exists, and if so, read out its contents: if os.path.exists(fName + '.json'): with open(fName + '.json', 'r') as f: resData = json.load(f) else: resData = {} # update the key self.expName to a data dictionary generated from the input data: expSpecs = {'graphDataFname': self.graphDataFname, 'forbiddenPatternDataFname': self.forbiddenPatternDataFname, 'strategyName': self.strategyName, 'numReps': self.numReps, 'expsData': self.expsData, 'maxRunTime': self.maxRunTime} if resData.get(self.expName, {}) == {}: resData[self.expName] = {} resData[self.expName]['expSpces'] = expSpecs # write the data into the JSON file with file name resultsFname: with open(fName + '.json', 'w') as f: json.dump(resData, f, sort_keys=True, indent=2)
[docs]def loadExpsDataFromJSONfile(fName: str, expName: str) -> ExperimentSpec: """Load the data defining an experiment from a JSON data file. Args: fName: the file name (possibly including a path) to a JSON file expName: the name of the experiment to load """ # determine if the data file exists, and if so, read out its contents: if os.path.exists(fName + '.json'): with open(fName + '.json', 'r') as f: resData = json.load(f) else: raise ValueError('The input data file does not exist!') # load the data for the experiment with id expName (if it exists): if resData.get(expName, {}) == {}: raise ValueError('There is no dataset available for the key \'%s\'!' % str(expName)) else: expSpecData = resData[expName]['expSpces'] # since JSON files cannot store tuples (but instead convert any tuple entries to lists at time # of creation of/writing to the file), we must manually convert some entries back to tuples manually: expSpecData['expsData'] = [(gs[0], gs[1]) for gs in expSpecData['expsData']] # render an ExperimentSpec instance from the data: expSpec = ExperimentSpec(expName, expSpecData['graphDataFname'], expSpecData['forbiddenPatternDataFname'], expSpecData['strategyName'], int(expSpecData['numReps']), expSpecData['expsData'], int(expSpecData['maxRunTime'])) return expSpec
[docs]def runExperiments(experimentSpecs: List[ExperimentSpec], resultsFname: str) -> None: """Auxiliary method to facilitate conducting experiments. Args: experimentSpecs: a list of ``ExperimentSpec`` instances resultsFname: the name (possibly with a path) of a JSON file for storing the experimental results """ logging.info('\n\n-------|EXPERIMENTS STARTED|------\n\n') print('\n\n-------|EXPERIMENTS STARTED|------\n\n') # iterate over individual experiments: for expSpec in experimentSpecs: logging.info('\n\n\t>>> Entering computation for experiment %s...\n\n' % expSpec.expName) print('\n\n\t>>> Entering computation for experiment %s...\n\n' % expSpec.expName) # >> store the experiment specification to the results file: expSpec.storeExpsDataToJSONfile(resultsFname) # >> prepare the forbidden pattern and (if the strategy is 'DPE') the forbidden relation data: fpData = renderTDGdataFromJSONfile(expSpec.forbiddenPatternDataFname) Saux = z3.Solver() fpList = [Z3TypedDirectedGraph(Saux, x['gName'], x['vDict'], x['eDict'], x['stDict']) for key, x in fpData.items()] logging.info('\n\t\t- Strategy:\t%s\n' % expSpec.strategyName) print('\n\t\t- Strategy:\t%s\n' % expSpec.strategyName) if expSpec.strategyName == 'DPE': frClist = [] for gN in fpList: # if the forbidden relation data is not yet present in fpData, generate it: if fpData[gN.name].get('frData', {}) == {}: frC = Z3TDGfrContainer(Saux, gN) frC.genTDGfrData() frClist += [frC] # store the forbidden relation data to the fpData (e.g. for use in the next runs): fpData[gN.name]['frData'] = frC.frData # for buffering the computationally expensive forbidden relation computations, save all the fpData # to the JSON file with name expSpec.forbiddenPatternDataFname: with open(expSpec.forbiddenPatternDataFname + '.json', 'w') as f: json.dump(fpData, f, sort_keys=True, indent=2) # >> run the individual experiments: for experiment in expSpec.expsData: gAname, gBname = experiment runsData = {} logging.info('\n\n>>> Starting experiment for gA = %s, gB = %s:\n\n' % (gAname, gBname)) print('\n\n>>> Starting experiment for gA = %s, gB = %s:\n\n' % (gAname, gBname)) # start the repetitions of the experiment: for idx in range(expSpec.numReps): logging.info('\n\n>>>>>> Run no.\t%s' % str(idx)) print('\n\n>>>>>> Run no.\t%s' % str(idx + 1)) # instantiate a Z3 solver with the chosen timeout: S = z3.Solver() if expSpec.maxRunTime != 0: S.set(timeout=expSpec.maxRunTime*1000) # <= the Z3 API unit is ms! # instantiate the TDGs for the given run: gA = renderTDGfromJSONfile(expSpec.graphDataFname, gAname, S) gB = renderTDGfromJSONfile(expSpec.graphDataFname, gBname, S) # run the experiment: if expSpec.strategyName == 'direct': runResults = generateTDGoverlapsDirectStrategy(gA, gB, fpList) elif expSpec.strategyName == 'implicit': runResults = generateTDGoverlapsImplicitStrategy(gA, gB, fpList) elif expSpec.strategyName == 'DPE': # assemble the forbidden relation data: frClist = [] for gN in fpList: frClist += [Z3TDGfrContainer(S, gN, fpData[gN.name]['frData'])] runResults = generateTDGoverlapsDPEstrategy(gA, gB, frClist) elif expSpec.strategyName == 'noConstraints': # NOTE: this type of run is typically utilized in order to determine # the number of overlaps without taking into account any constraints; as # such, it would be possible ot use any of the three strategies with # empty constraint data on input, so w.l.o.g. we choose here the DPE strategy runResults = generateTDGoverlapsDPEstrategy(gA, gB, []) else: raise ValueError('Unrecognized strategy name - must be one of \'direct\', ' + '\'DPE\', \'implicit\' or \'noCOnstraints\'!') # extract some key data from the run results: # number of overlaps: nOls = len(runResults.overlapsData.overlaps) # total computation time: totalT = runResults.getTotalRuntime() # time out information: toi = runResults.timeOutRunID # max. memory consumption: maxMem = runResults.getOverallMaxMemory() # store the results: runsData['run#' + str(idx + 1)] = { 'n': str(nOls), 't': str(totalT), 'toi': str(toi), 'm': str(maxMem) } print('\n\nResults for run no.\t%s:\n%s' % (idx+1, runsData['run#' + str(idx + 1)])) # final curation of results for the experiment: # average over run times (should return "N/A" in case a timeout had occurred # in any of the runs): tTotalAllRuns = 0.0 tAvgStr = '' for key, x in runsData.items(): if x['toi'] not in {'-', '(-,-)'}: tAvgStr = 'N/A' break else: tTotalAllRuns += float(x['t']) if tAvgStr == '': tAvgStr = str(tTotalAllRuns/len(runsData.keys())) # join all toi strings for easy reference: toiStrs = '|'.join([x['toi'] for key, x in runsData.items()]) # maximum memory consumption over all runs: mStr = str(max([float(x['m']) for key, x in runsData.items()])) # store these results into a field 'final': runsData['final'] = {'n': runsData['run#1']['n'], 'tAvg': tAvgStr, 'toi': toiStrs, 'mMax': mStr} logging.info('\n\nFinal results for the experiments:\n%s' % runsData['final']) print('\n\nFinal results for the experiments:\n%s' % runsData['final']) # record all of the information runsData into the global results dictionary: expKey = '(%s|%s)' % (gAname, gBname) results = { 'graphDataFname': expSpec.graphDataFname, 'forbiddenPatternDataFname': expSpec.forbiddenPatternDataFname, 'numRepetitions': str(expSpec.numReps), 'totalTime': str(tTotalAllRuns), 'runsData': runsData } logging.info('\n\n<<< Experiment finished!\n\n\n') print('\n\n<<< Experiment finished!\n\n\n') # >> write out the results to a JSON file with name resultsFname: # determine if the data file already exists, and if so, read out its contents: if os.path.exists(resultsFname + '.json'): with open(resultsFname + '.json', 'r') as f: resData = json.load(f) else: resData = {} if resData.get(expSpec.expName, {}) == {}: resData[expSpec.expName] = {} # update the data from the runs of the experiment into the resData dictionary: resData[expSpec.expName][expKey] = results # write the data into the JSON file with file name resultsFname: with open(resultsFname + '.json', 'w') as f: json.dump(resData, f, sort_keys=True, indent=2) logging.info('\n\n-------|EXPERIMENTS FINISHED|------\n\n') print('\n\n-------|EXPERIMENTS FINISHED|------\n\n')