davebshow / goblin

A Python 3.5 rewrite of the TinkerPop 3 OGM Goblin
Other
93 stars 21 forks source link

timestamps and goblin OGM (with Janusgraph-Cassandra) #69

Closed John-Boik closed 6 years ago

John-Boik commented 7 years ago

Hi Dave. Can you suggest how I might use timestamps with goblin OGM? In the gremlin console, I might use something like: g.addV("User").property("createdDate",System.currentTimeMillis())

In general, I would like to traverse based on whether a timestamp for an edge/vertex is gt/lt than that of another edge/vertex, and whether a timedelta between two edges/vertexes is gt/lt some number of minutes. The following, from here: does not address these traverses, but it does suggest how a timestamp for an entry in the database might be created and used for a different kind of traverse. For all three traverses, is there a way to use goblin OGM?

gremlin> import java.util.concurrent.TimeUnit
gremlin> import com.thinkaurelius.titan.core.attribute.Timestamp
...
gremlin> g = TitanFactory.open("conf/titan-cassandra-es.properties")
==>titangraph[cassandrathrift:[127.0.0.1]]
gremlin> v1 = g.addVertex(null)
==>v[256]
gremlin> v2 = g.addVertex(null)
==>v[512]
gremlin> v1.addEdge("knows", v2)
==>e[dc-74-1lh-e8][256-knows->512]
gremlin> g.commit()
==>null
gremlin> yesterday = System.currentTimeMillis() - 1000 * 60 * 60 * 24
==>1420758191198
gremlin> g.V().bothE().has('$timestamp', Compare.GREATER_THAN_EQUAL, new Timestamp(yesterday, TimeUnit.MILLISECONDS))
==>e[dc-74-1lh-e8][256-knows->512]
==>e[dc-74-1lh-e8][256-knows->512]

For this, the following line is added to janusgraph-cassandra-es.properties:

storage.meta.edgestore.timestamps=true

davebshow commented 7 years ago

I'll have to look into this a bit. Hopefully I can do it this weekend. I am considerably busier than usual right now, as I am in the process of submitted a dissertation and moving 2000 miles.

John-Boik commented 7 years ago

I look forward to any insights you can offer. Obviously, timestamp data can be saved in the graph as integers (or floats), but it would be ideal to have time data saved as timestamp type.

davebshow commented 7 years ago

Sure, if you want to do that, it is easy to create a new data type. Simply inherit from abc.DataType and implement the validate, to_db, and to_ogm methods. Here is a simple example of an IP address data type:

class IPAddress(abc.DataType):

    def to_ogm(self, val):
        return ipaddress.ip_address(val)

    def to_db(self, val=None):
        return str(super().to_db(val=val))

    def validate(self, address):
        if not address:
            return
        try:
            val = ipaddress.ip_address(address)
        except ValueError as e:
            raise exception.ValidationError('Not a valid IP address') from e
        return val
davebshow commented 7 years ago

Quick question, do you want to traverse element based on when they were inserted into the graph? Or do you want to add a datetime to an element and use it as the basis for comparison in traversal? Or both..?

John-Boik commented 7 years ago

Both would be nice, but the latter is more important to me. One option might be to use numpy datetime objects to hold time information in a property for a vertex or edge.

davebshow commented 7 years ago

Here is a quick example of how to do this using a custom DateTime datatype. Here, the user can add a datetime to a Goblin, which is stored in the graph as Posix timestamp. You can then use any type of comparison you want to query the graph. This approach could be easily extended to traverse timestamps generated by the db, or to use timezone aware datetimes, or datetimes from other modules like numpy. Hopefully this helps you get going. Please let me know if you have any questions:

import asyncio
import datetime
import goblin

from aiogremlin.gremlin_python.process.traversal import P

def get_hashable_id(val):
  if isinstance(val, dict) and "@type" in val and "@value" in val:
      if val["@type"] == "janusgraph:RelationIdentifier":
          val = val["@value"]["value"]
  return val

loop = asyncio.get_event_loop()
app = loop.run_until_complete(
    goblin.Goblin.open(loop, get_hashable_id=get_hashable_id))

class DateTime(goblin.abc.DataType):

    def validate(self, val):
        if not isinstance(val, datetime.datetime):
            raise goblin.exception.ValidationError(
                "Not a valid datetime.datetime: {}".format(val))
        return val

    def to_ogm(self, val):
        return datetime.datetime.fromtimestamp(val)

    def to_db(self, val):
        return val.timestamp()

class Event(goblin.Vertex):
    name = goblin.Property(goblin.String)
    datetime = goblin.Property(DateTime)

app.register(Event)

async def go():
    session = await app.session()
    # Create an event with a datetime property
    event1 = Event()
    event1.name = 'event1'
    event1.datetime = datetime.datetime.now()

    # Get a timestamp for comparisons
    ts = datetime.datetime.now().timestamp()

    # Create an event with a later datetime attribute
    event2 = Event()
    event2.name = 'event2'
    event2.datetime = datetime.datetime.now()

    # Add event verts to DB
    session.add(event1, event2)
    await session.flush()

    # Query based on datetime
    earlier_event = await session.g.V().has('datetime', P.lt(ts)).next()
    print("{} occured at {}".format(earlier_event.name, earlier_event.datetime))

    later_event = await session.g.V().has('datetime', P.gt(ts)).next()
    print("{} occured at {}".format(later_event.name, later_event.datetime))

loop.run_until_complete(go())
loop.run_until_complete(app.close())
davebshow commented 7 years ago

I had a minute to reread this thread, and I realized I only gave you a partial answer. The above code is an easy workaround to get going. To send the timestamps to the server as timestamp type, you would need to serialize a datetime object to a graphson2 timestamp type. As gremlin_python doesn't currently implement a serializer for timestamp, you would have to implement a simple custom serializer class, and pass it to the Cluster used by the Goblin app. I think the end result of this solution is pretty similar to the one I posted above at least for the time being, but it is an issue that needs to be addressed in gremlin-python at some point, so it is worth experimenting with it here. I'll try to post an example later tonight or tomorrow morning.

davebshow commented 7 years ago

I implemented an example using custom GraphSON de/serializers that works great with TinkerGraph. However, when I run it against Janus I get the following error:

GremlinServerError: 500: Property value [2017-08-24 19:18:32.865] is of type class java.sql.Timestamp is not supported

I would have to look into Janus graph support for timestamp to figure exactly what is going on, maybe I can ask around during work hours tomorrow. For reference, the above code modified to use the custom serializers is as follows. Since it works with the reference implementation, it should, in theory, work with Janus as well:

import asyncio
import datetime
import goblin
from aiogremlin import Cluster
from aiogremlin.gremlin_python.driver import serializer
from aiogremlin.gremlin_python.process.traversal import P
from aiogremlin.gremlin_python.structure.io import graphson

def get_hashable_id(val):
  if isinstance(val, dict) and "@type" in val and "@value" in val:
      if val["@type"] == "janusgraph:RelationIdentifier":
          val = val["@value"]["value"]
  return val

class DateTimeSerializer:

    def dictify(self, obj, writer):
        # Java timestamp expects miliseconds
        ts = round(obj.timestamp() * 1000)
        return graphson.GraphSONUtil.typedValue('Timestamp', ts)

class DateTimeDeserializer:

    def objectify(self, ts, reader):
        # Python timestamp expects seconds
        dt = datetime.datetime.fromtimestamp(ts / 1000.0)
        return dt

reader = graphson.GraphSONReader({'g:Timestamp': DateTimeDeserializer()})
writer = graphson.GraphSONWriter({datetime.datetime: DateTimeSerializer()})

message_serializer = serializer.GraphSONMessageSerializer(reader=reader,
                                                          writer=writer)

loop = asyncio.get_event_loop()
cluster = loop.run_until_complete(
    Cluster.open(loop, message_serializer=message_serializer))
app = goblin.Goblin(cluster)

class DateTime(goblin.abc.DataType):

    def validate(self, val):
        if not isinstance(val, datetime.datetime):
            raise goblin.exception.ValidationError(
                "Not a valid datetime.datetime: {}".format(val))
        return val

    # Note that these methods are different than in the previous example.
    def to_ogm(self, val):
        return super().to_ogm(val)

    def to_db(self, val):
        return super().to_db(val)

class Event(goblin.Vertex):
    name = goblin.Property(goblin.String)
    datetime = goblin.Property(DateTime)

app.register(Event)

async def go():
    session = await app.session()
    # Create an event with a datetime property
    event1 = Event()
    event1.name = 'event1'
    event1.datetime = datetime.datetime.now()
    # Get a timestamp for comparisons
    await asyncio.sleep(0.001)
    ts = datetime.datetime.now()  # here we don't need to convert to timestamp
    await asyncio.sleep(0.001)
    # Create an event with a later datetime attribute
    event2 = Event()
    event2.name = 'event2'
    event2.datetime = datetime.datetime.now()

    # Add event verts to DB
    session.add(event1, event2)
    await session.flush()

    # Query based on datetime
    earlier_event = await session.g.V().has('datetime', P.lt(ts)).next()
    print("{} occured at {}".format(earlier_event.name, earlier_event.datetime))

    later_event = await session.g.V().has('datetime', P.gt(ts)).next()
    print("{} occured at {}".format(later_event.name, later_event.datetime))

loop.run_until_complete(go())
loop.run_until_complete(app.close())

It is also of note that in my previous example, the custom data type should probably convert the between the Python (seconds from epoch) and Java (miliseconds from epoch) timestamps.

davebshow commented 7 years ago

It occurred to me this morning that Janus is probably just like Titan, so the real way to control types in the db is through the schema definition. Glancing at the docs it appears that this is correct. I'll put up an example of this a bit later.

davebshow commented 7 years ago

Ok. Generating a schema in Janus, we see that it uses Date and not Timestamp. Here is an example that creates a simple schema and implements a custom Date GraphSON serializer. I believe that this is the most correct way to handle Janus dates with Goblin:

import asyncio
import datetime
import goblin
from aiogremlin import Cluster
from aiogremlin.gremlin_python.driver import serializer
from aiogremlin.gremlin_python.process.traversal import P
from aiogremlin.gremlin_python.structure.io import graphson

def get_hashable_id(val):
  if isinstance(val, dict) and "@type" in val and "@value" in val:
      if val["@type"] == "janusgraph:RelationIdentifier":
          val = val["@value"]["value"]
  return val

class DateSerializer:

    def dictify(self, obj, writer):
        # Java timestamp expects miliseconds
        ts = round(obj.timestamp() * 1000)
        return graphson.GraphSONUtil.typedValue('Date', ts)

class DateDeserializer:

    def objectify(self, ts, reader):
        # Python timestamp expects seconds
        dt = datetime.datetime.fromtimestamp(ts / 1000.0)
        return dt

reader = graphson.GraphSONReader({'g:Date': DateDeserializer()})
writer = graphson.GraphSONWriter({datetime.datetime: DateSerializer()})

message_serializer = serializer.GraphSONMessageSerializer(reader=reader,
                                                          writer=writer)

loop = asyncio.get_event_loop()
cluster = loop.run_until_complete(
    Cluster.open(loop, message_serializer=message_serializer))
app = goblin.Goblin(cluster)

class DateTime(goblin.abc.DataType):

    def validate(self, val):
        if not isinstance(val, datetime.datetime):
            raise goblin.exception.ValidationError(
                "Not a valid datetime.datetime: {}".format(val))
        return val

    def to_ogm(self, val):
        return super().to_ogm(val)

    def to_db(self, val):
        return super().to_db(val)

class Event(goblin.Vertex):
    name = goblin.Property(goblin.String)
    datetime = goblin.Property(DateTime)

app.register(Event)

async def create_schema():
    client = await cluster.connect()
    schema_msg = """mgmt = graph.openManagement()
                    datetime = mgmt.makePropertyKey('datetime').dataType(Date.class).cardinality(Cardinality.SINGLE).make()
                    mgmt.commit()"""
    await client.submit(schema_msg)

async def go():
    session = await app.session()
    # Create an event with a datetime property
    event1 = Event()
    event1.name = 'event1'
    event1.datetime = datetime.datetime.now()
    # Get a timestamp for comparisons
    await asyncio.sleep(0.001)
    ts = datetime.datetime.now()
    await asyncio.sleep(0.001)
    # Create an event with a later datetime attribute
    event2 = Event()
    event2.name = 'event2'
    event2.datetime = datetime.datetime.now()

    # Add event verts to DB
    session.add(event1, event2)
    await session.flush()

    # Query based on datetime
    earlier_event = await session.g.V().has('datetime', P.lt(ts)).next()
    print("{} occured at {}".format(earlier_event.name, earlier_event.datetime))

    later_event = await session.g.V().has('datetime', P.gt(ts)).next()
    print("{} occured at {}".format(later_event.name, later_event.datetime))

loop.run_until_complete(create_schema())
loop.run_until_complete(go())
loop.run_until_complete(app.close())
StanYaha commented 7 years ago

@John-Boik @davebshow Hello guys! Do you have a demo about the Goblin for Front end development Showcase?