vliz-be-opsci / py-sema

Overall parent of all packages involving semantic manipulation of RDF data.
MIT License
0 stars 0 forks source link

Add prov tracking into sema.commons.prov #63

Open cedricdcc opened 3 weeks ago

cedricdcc commented 3 weeks ago

We need to enhance our monorepo to include provenance tracking using a provonance ontology. This involves creating a Prov class that can track the provenance of function calls and class operations within our Python codebase. Additionally, we need a translation step to export the recorded provenance data to a TTL (Turtle) file format.

For the example below I've taken the liberty to use prov-o. NOTE that there can be mistakes in the terms used but its the technical impementation that counts.

Python script example braindump not tested

import functools
import uuid
from datetime import datetime
from rdflib import Graph, Namespace, URIRef, Literal
from rdflib.namespace import RDF, XSD, FOAF, PROV

EX = Namespace("http://example.org#")

class Prov:
    def __init__(self):
        self.graph = Graph()
        self.graph.bind("ex", EX)
        self.graph.bind("foaf", FOAF)
        self.graph.bind("prov", PROV)

    def add_entity(self, entity_id, entity_type, generated_by=None, derived_from=None, attributed_to=None):
        entity = URIRef(EX[entity_id])
        self.graph.add((entity, RDF.type, PROV.Entity))
        if generated_by:
            self.graph.add((entity, PROV.wasGeneratedBy, URIRef(EX[generated_by])))
        if derived_from:
            self.graph.add((entity, PROV.wasDerivedFrom, URIRef(EX[derived_from])))
        if attributed_to:
            self.graph.add((entity, PROV.wasAttributedTo, URIRef(EX[attributed_to])))

    def add_activity(self, activity_id, activity_type, used=None, informed_by=None, started_at_time=None, ended_at_time=None):
        activity = URIRef(EX[activity_id])
        self.graph.add((activity, RDF.type, PROV.Activity))
        if used:
            for entity_id in used:
                self.graph.add((activity, PROV.used, URIRef(EX[entity_id])))
        if informed_by:
            self.graph.add((activity, PROV.wasInformedBy, URIRef(EX[informed_by])))
        if started_at_time:
            self.graph.add((activity, PROV.startedAtTime, Literal(started_at_time, datatype=XSD.dateTime)))
        if ended_at_time:
            self.graph.add((activity, PROV.endedAtTime, Literal(ended_at_time, datatype=XSD.dateTime)))

    def add_agent(self, agent_id, agent_type):
        agent = URIRef(EX[agent_id])
        self.graph.add((agent, RDF.type, PROV.Agent))
        self.graph.add((agent, RDF.type, agent_type))

    def record_generation(self, entity_id, agent_id, activity_id):
        self.add_entity(entity_id, PROV.Entity, generated_by=activity_id, attributed_to=agent_id)

    def record_activity(self, activity_id, activity_type, used_entities, informed_by=None, start_time=None, end_time=None):
        self.add_activity(activity_id, activity_type, used=used_entities, informed_by=informed_by, started_at_time=start_time, ended_at_time=end_time)

    @property
    def prov_graph(self):
        return self.graph

    @staticmethod
    def generate_id():
        return str(uuid.uuid4())

    def prov_decorator(self, activity_type):
        def decorator_function(func):
            @functools.wraps(func)
            def wrapper(*args, **kwargs):
                activity_id = self.generate_id()
                start_time = datetime.now().isoformat()

                used_entities = [self.generate_id() for _ in args]  # Simulate input entities
                self.record_activity(activity_id, activity_type, used_entities, start_time=start_time)

                result = func(*args, **kwargs)

                end_time = datetime.now().isoformat()
                entity_id = self.generate_id()
                self.record_generation(entity_id, func.__name__, activity_id)

                # Record output entity
                output_entity_id = self.generate_id()
                self.add_entity(output_entity_id, PROV.Entity, generated_by=activity_id)

                return result
            return wrapper
        return decorator_function

    def prov_class_decorator(self, cls):
        class Wrapped(cls):
            def __init__(self, *args, **kwargs):
                super().__init__(*args, **kwargs)
                entity_id = Prov.generate_id()
                timestamp = datetime.now().isoformat()
                activity_id = Prov.generate_id()
                Prov.record_activity(self, activity_id, 'instantiation', [])
                Prov.record_generation(self, entity_id, cls.__name__, activity_id)

            def __setattr__(self, name, value):
                super().__setattr__(name, value)
                entity_id = Prov.generate_id()
                timestamp = datetime.now().isoformat()
                activity_id = Prov.generate_id()
                Prov.record_activity(self, activity_id, 'set_attribute', [])
                Prov.record_generation(self, entity_id, f"{self.__class__.__name__}.{name}", activity_id)

        return Wrapped

# Example usage
prov = Prov()

@prov.prov_decorator(activity_type='calculation')
def add(a, b):
    return a + b

@prov.prov_class_decorator
class MyClass:
    def __init__(self, value):
        self.value = value

    def set_value(self, value):
        self.value = value

# Function call example
result = add(1, 2)

# Class instantiation and method call example
obj = MyClass(10)
obj.set_value(20)

# Get provenance information
provenance_graph = prov.prov_graph
print(provenance_graph.serialize(format='turtle').decode('utf-8'))

With the provenance data stored as an RDF graph, you can run SPARQL queries to analyze it:

from rdflib import Graph

query = """
PREFIX prov: <http://www.w3.org/ns/prov#>
SELECT ?entity ?activity ?time
WHERE {
    ?entity a prov:Entity .
    ?entity prov:wasGeneratedBy ?activity .
    ?activity prov:endedAtTime ?time .
}
"""

results = provenance_graph.query(query)
for row in results:
    print(f"Entity: {row.entity}, Activity: {row.activity}, Time: {row.time}")

Explanation code

1. Output Entity Recording:

2. Provenance Data Structure:

Example Usage:

This setup ensures that entities generated as outputs of functions are included in the provenance records, capturing the complete data lineage as per the PROV-O ontology.

Additional required tasks

References

marc-portier commented 3 weeks ago

many interesting thoughts to discuss in here - but don't completely see how this works out

some upfront remarks

furthering this topic I would like to attack advancing this with some top-down thought as well:

unclear: what is the relation to the 'required task' on rdflib.Store() object

cedricdcc commented 3 weeks ago

do we actually want a dict of list managing the entities, activities, agents? could we not go full board triples and just build an internal graph? fololwing that line of tinking the get_provenance() should return an rdflib.Graph or even be exposed as a @property prov_graph ?

We can go full triples on first , my last remark on having the rdflib.Store() can then be dropped since this would be the internal graph then.

the recurring Prov.method_name(self) constructs look strange? what is the benefit over self.method_name() ?

Naming can be discussed , this was a rough first draw but the prov part can be dropped in a final implementation.

self.generate_id() does not (never will) use self - so why not make it @staticmethod and ommit the self argument?

Good remark , in the final version this can be the case.

the instance bound decorator looks a bit fishy? is that common thing? some guides / best practices there?

also unclear how we are to manage those as member instance variables inside our classes, and not as globals to the source code file / module ?

Managing provenance data as member variables inside classes, rather than as globals, is a better practice. It ensures encapsulation and avoids potential conflicts or unintended side effects. By using instance variables, we can also maintain cleaner and more modular code.

cedricdcc commented 3 weeks ago

from output-side --> what provenance triples do we want from the various processes we have? (query, subyt, harvest, syncfs, ...) @laurianvm could you prepare some cases, examples for those? (@cedricdcc I guess this reflects your suggestion to 'discuss prov modal for py-sema ?)

@marc-portier yes I would like this to be a joint effort of the whole team to decide upon the prov model

cedricdcc commented 3 weeks ago

from programmer pov --> how do we see this kind of common prov package actually make the work easier in the modules that need it? what would the effect be on query, subyt, syncfs, harvest, ...

Use the decorator in the main functions of all the top level fodlers like query, discovery , sema, bench, query to track functions that produce some resource or write some away like in commons.store.

All practical usecases need to be overviewed though I think this is a good starting point since the decorators are easely modified to our needs.

cedricdcc commented 3 weeks ago

I've taken the liberty to update my first comment on the issue and modified the code according to some of your suggestions @marc-portier

cedricdcc commented 1 week ago

inspiration from ROCrate community https://arxiv.org/pdf/2312.07852v2

cedricdcc commented 1 week ago

services to consider + tracking: