awslabs / amazon-neptune-tools

Tools and utilities to enable loading data and building graph applications with Amazon Neptune.
Apache License 2.0
300 stars 151 forks source link

Got _pickle.PicklingError when upsert vertices in glue #114

Closed zxkane closed 3 years ago

zxkane commented 3 years ago

I’m following this guide to insert vertices into Neptune in Glue ETL job.

I’m suffering _pickle.PicklingError: Could not serialize object: TypeError: can't pickle SSLContext objects when calling selectfields1.toDF().foreachPartition(gremlin_client.upsert_vertices('MyTest', batch_size=100))

Below is a simple glue job to reproduce this issue,

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.transforms import RenameField, SelectFields
from io import BytesIO, StringIO
import pandas as pd
import numpy as np
import boto3
from urllib.parse import urlparse
from neptune_python_utils.gremlin_utils import GremlinUtils
from neptune_python_utils.endpoints import Endpoints
from neptune_python_utils.glue_gremlin_client import GlueGremlinClient

sc = SparkContext.getOrCreate()
sc.setLogLevel("INFO")
glueContext = GlueContext(sc)
logger = glueContext.get_logger()

logger.info(f'Before resolving options...')

args = getResolvedOptions(sys.argv,
                          ['region',
                           'neptune_endpoint',
                           'neptune_port'])

logger.info(f'Resolved options are: {args}')

spark = glueContext.spark_session
logger.info(f'###########Create df of pyshark')
data = [
    ('2987000',1.8356905714924256,19.0),
    ('2987001',1.462397997898956,0.0),
]
dataColumns = ['~id','TransactionAmt','dist1']
dataDF = spark.createDataFrame(data=data, schema = dataColumns)
dDF = DynamicFrame.fromDF(dataDF, glueContext, 'Data')
logger.info(f'###########Schema is {dDF.schema()}')
logger.info(f'###########Iterate the dataset’)
def printRows(rows):
    for row in rows:
        print(f'Processing row is {row}')
dDF.toDF().foreachPartition(printRows)

GremlinUtils.init_statics(globals())
endpoints = Endpoints(neptune_endpoint=args['neptune_endpoint'], neptune_port=args['neptune_port'], region_name=args['region'])
logger.info(f'Initializing gremlin client to Neptune ${endpoints.gremlin_endpoint()}.')
gremlin_client = GlueGremlinClient(endpoints)
logger.info(f'#####TESTING gremlin conn')
gremlin_utils = GremlinUtils(endpoints)
conn = gremlin_utils.remote_connection()
g = gremlin_utils.traversal_source(connection=conn)
logger.info(f'Gremlin vertices: {g.V().limit(10).valueMap().toList()}')
conn.close()
logger.info(f'#####Gremlin conn test is successful')
logger.info(f'Initializing gremlin client to Neptune ${endpoints.gremlin_endpoint()}.')
selectfields1 = SelectFields.apply(frame = dDF, paths = dataColumns, transformation_ctx = "selectfields1")
selectfields1.toDF().foreachPartition(gremlin_client.upsert_vertices('MyTest', batch_size=100))

The issue can be reproduced by using both glue 1.0 + py3 and 2.0. The neptune_python_utils lib zip is built from latest source of repo by build.sh script.

Any suggestion is appreciated.

iansrobinson commented 3 years ago

Hi @zxkane - thanks for reporting this. I'll look into the issue and work on a fix if necessary.

zxkane commented 3 years ago

Thx for the quick fix #115 .