davebshow / goblin

A Python 3.5 rewrite of the TinkerPop 3 OGM Goblin
Other
93 stars 21 forks source link

Task was destroyed but it is pending! #43

Closed softwarevamp closed 7 years ago

softwarevamp commented 7 years ago

i have wrote goblin ogm sample into my spark app, but i got Task was destroyed but it is pending! below is my code

def savePartition(p):
    from goblin import element, properties

    class Brand(element.Vertex):
        name = properties.Property(properties.String)

    import asyncio

    loop = asyncio.get_event_loop()

    from goblin.app import Goblin
    app = loop.run_until_complete(Goblin.open(loop))
    app.register(Brand)

    async def go(app):
        session = await app.session()

        for i in p:
            if i['brand']:
                traversal = session.traversal(Brand)
                result = await traversal.has(Brand.name, i['brand']).oneOrNone()

                if not result:
                    brand = Brand()
                    brand.name = i['brand']
                    session.add(brand)

        await session.flush()

    loop.run_until_complete(go(app))

rdd = rdd.foreachPartition(savePartition)

i am wonder how to close the response in above code. thanks very much.

davebshow commented 7 years ago

Looks like you don't close the app. Call await app.close() when you are done using goblin to close the underlying pool and connections.

davebshow commented 7 years ago

I'm going to go ahead and close this.