ibm-watson-data-lab / ibmos2spark

Facilitates Data I/O between Spark and IBM Object Storage services.
10 stars 8 forks source link

Rewriting configuration with old name doesn't seem to have an effect #42

Open gadamc opened 6 years ago

gadamc commented 6 years ago

I've seen a problem where a new CloudObjectStorage instance with a previously used configuration_name and NEW/different credential values, didn't seem to change that particular configuration. That is, after correcting my credentials, my connection to pull data still failed. However, when I created a new COS object but with a new configuration name and the correct credentials, I was able to pull data.

For example:

Bad Creds

bad_creds = {
    'endpoint': "s3-api.us-geo.objectstorage.softlayer.net",
    'api_key': "abcdef1234567890",
    'service_id': "ServiceId-xxxx"
}

configuration_name = 'my_service' 
bmos = ibmos2spark.CloudObjectStorage(sc, bad_creds, configuration_name=configuration_name, cos_type='bluemix_cos')

bucket_name = 'advo-beta-alternate'
filename = 'login_2017_06_29_05_07_44.csv'

path = bmos.url(filename, bucket_name)

print 'Loading clickstream data for login events from {} ...'.format(path)

login_df = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'false')\
  .option('timestampFormat', 'yyyy-MM-dd HH:mm:ss z')\
  .load(path, schema = login_schema)

This produced an error "Path not found"

Loading clickstream data for login events from cos://advo-beta-alternate.my_service/login_2017_06_29_05_07_44.csv ...

AnalysisExceptionTraceback (most recent call last)
<ipython-input-44-d5f793e64016> in <module>()
     17 print 'Loading clickstream data for login events from {} ...'.format(path)
     18 
---> 19 login_df = spark.read  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')  .option('header', 'false')  .option('timestampFormat', 'yyyy-MM-dd HH:mm:ss z')  .load(path, schema = login_schema)
     20 
     21 if login_df:

/usr/local/src/spark20master/spark/python/pyspark/sql/readwriter.py in load(self, path, format, schema, **options)
    145         self.options(**options)
    146         if isinstance(path, basestring):
--> 147             return self._df(self._jreader.load(path))
    148         elif path is not None:
    149             if type(path) != list:

/usr/local/src/spark20master/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/usr/local/src/spark20master/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     67                                              e.java_exception.getStackTrace()))
     68             if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
     70             if s.startswith('org.apache.spark.sql.catalyst.analysis'):
     71                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)

AnalysisException: u'Path does not exist: cos://advo-beta-alternate.my_service/login_2017_06_29_05_07_44.csv;'

Good Creds, Same configuration_name

Now, if I change my credentials and try again, I get the same error

good_creds = {
    'endpoint': "s3-api.us-geo.objectstorage.softlayer.net",
    'api_key': "abcdef1234567890",
    'service_id': "ServiceId-zzzz-aaaa-bbbb"
}

configuration_name = 'my_service' 
bmos = ibmos2spark.CloudObjectStorage(sc, good_creds, configuration_name=configuration_name, cos_type='bluemix_cos')

bucket_name = 'advo-beta-alternate'
filename = 'login_2017_06_29_05_07_44.csv'

path = bmos.url(filename, bucket_name)

print 'Loading clickstream data for login events from {} ...'.format(path)

login_df = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'false')\
  .option('timestampFormat', 'yyyy-MM-dd HH:mm:ss z')\
  .load(path, schema = login_schema)

Exact same error as before:

Loading clickstream data for login events from cos://advo-beta-alternate.my_service/login_2017_06_29_05_07_44.csv ...

AnalysisExceptionTraceback (most recent call last)
<ipython-input-47-d5f793e64016> in <module>()
     17 print 'Loading clickstream data for login events from {} ...'.format(path)
     18 
---> 19 login_df = spark.read  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')  .option('header', 'false')  .option('timestampFormat', 'yyyy-MM-dd HH:mm:ss z')  .load(path, schema = login_schema)
     20 
     21 if login_df:

/usr/local/src/spark20master/spark/python/pyspark/sql/readwriter.py in load(self, path, format, schema, **options)
    145         self.options(**options)
    146         if isinstance(path, basestring):
--> 147             return self._df(self._jreader.load(path))
    148         elif path is not None:
    149             if type(path) != list:

/usr/local/src/spark20master/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/usr/local/src/spark20master/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     67                                              e.java_exception.getStackTrace()))
     68             if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
     70             if s.startswith('org.apache.spark.sql.catalyst.analysis'):
     71                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)

AnalysisException: u'Path does not exist: cos://advo-beta-alternate.my_service/login_2017_06_29_05_07_44.csv;'

Good Creds, new configuration_name

good_creds = {
    'endpoint': "s3-api.us-geo.objectstorage.softlayer.net",
    'api_key': "abcdef1234567890",
    'service_id': "ServiceId-zzzz-aaaa-bbbb"
}

configuration_name = 'my_new_service_name' 
bmos = ibmos2spark.CloudObjectStorage(sc, good_creds, configuration_name=configuration_name, cos_type='bluemix_cos')

bucket_name = 'advo-beta-alternate'
filename = 'login_2017_06_29_05_07_44.csv'

path = bmos.url(filename, bucket_name)

print 'Loading clickstream data for login events from {} ...'.format(path)

login_df = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'false')\
  .option('timestampFormat', 'yyyy-MM-dd HH:mm:ss z')\
  .load(path, schema = login_schema)

print 'Data structure: {}'.format(login_df)
print 'Event count: {:,}'.format(login_df.count())
print 'Example event: {}'.format(login_df.head())

Success

Loading clickstream data for login events from cos://advo-beta-alternate.my_new_service_name/login_2017_06_29_05_07_44.csv ...
Data structure: DataFrame[customer_id: string, click_event_type: string, total_price_of_basket: decimal(10,2), total_number_of_items_in_basket: int, total_number_of_distinct_items_in_basket: int, click_event_time: timestamp]
Event count: 21
Example event: Row(customer_id=u'10592', click_event_type=u'login', total_price_of_basket=Decimal('0.00'), total_number_of_items_in_basket=0, total_number_of_distinct_items_in_basket=0, click_event_time=datetime.datetime(2017, 6, 28, 22, 5, 48))

BONUS Check: Bad credentials with previously configured successful configuration

bad_creds = {
    'endpoint': "s3-api.us-geo.objectstorage.softlayer.net",
    'api_key': "abcdef1234567890",
    'service_id': "ServiceId-zzzz"
}

configuration_name = 'my_new_service_name' 
bmos = ibmos2spark.CloudObjectStorage(sc, bad_creds, configuration_name=configuration_name, cos_type='bluemix_cos')

bucket_name = 'advo-beta-alternate'
filename = 'login_2017_06_29_05_07_44.csv'

path = bmos.url(filename, bucket_name)

print 'Loading clickstream data for login events from {} ...'.format(path)

login_df = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'false')\
  .option('timestampFormat', 'yyyy-MM-dd HH:mm:ss z')\
  .load(path, schema = login_schema)

print 'Data structure: {}'.format(login_df)
print 'Event count: {:,}'.format(login_df.count())
print 'Example event: {}'.format(login_df.head())

With the BAD credentials, I successfully connect to the data.

Loading clickstream data for login events from cos://advo-beta-alternate.my_new_service_name/login_2017_06_29_05_07_44.csv ...
Data structure: DataFrame[customer_id: string, click_event_type: string, total_price_of_basket: decimal(10,2), total_number_of_items_in_basket: int, total_number_of_distinct_items_in_basket: int, click_event_time: timestamp]
Event count: 21
Example event: Row(customer_id=u'10592', click_event_type=u'login', total_price_of_basket=Decimal('0.00'), total_number_of_items_in_basket=0, total_number_of_distinct_items_in_basket=0, click_event_time=datetime.datetime(2017, 6, 28, 22, 5, 48))

Conclusion

It seems that you cannot update a new Spark/COS connection with a same name as a previously created connection.

I've not explicitly tested this with the other services (Swift Bluemix, Swift Softlayer).