PacktPublishing / Machine-Learning-on-Kubernetes

Machine Learning on Kubernetes, published by packt
MIT License
71 stars 45 forks source link

Chapter 9. Building Your Data Pipeline | Error on execute Chapter09/explore_data.ipynb #9

Closed webmakaka closed 2 years ago

webmakaka commented 2 years ago

Hello!

When i run block

import os
from pyspark.sql import SparkSession

os.environ['PYSPARK_SUBMIT_ARGS'] = f"\
--conf spark.hadoop.fs.s3a.endpoint=http://minio-ml-workshop:9000 \
--conf spark.hadoop.fs.s3a.access.key=minio \
--conf spark.hadoop.fs.s3a.secret.key=minio123 \
--conf spark.hadoop.fs.s3a.path.style.access=true \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.hadoop.fs.s3a.multipart.size=104857600 \
--packages org.apache.hadoop:hadoop-aws:3.2.0,org.postgresql:postgresql:42.3.3 \
--master spark://{os.environ['SPARK_CLUSTER']}:7077 pyspark-shell "

# Create the spark application
spark = SparkSession \
    .builder \
    .appName("Python Spark S3 example") \
    .getOrCreate()

dfAirlines = spark.read\
                .options(delimeter=',', inferSchema='True', header='True') \
                .csv("s3a://airport-data/airlines.csv")
dfAirlines.printSchema()

dfAirports = spark.read\
                .options(delimiter=',', inferSchema='True', header='True') \
                .csv("s3a://airport-data/airports.csv")
dfAirports.printSchema()

dfAirports.show(truncate=False)
dfAirlines.show(truncate=False)

print(dfAirports.count())
print(dfAirlines.count())

spark.stop()

I am getting next error:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-7-426019de4890> in <module>
     19     .getOrCreate()
     20 
---> 21 dfAirlines = spark.read\
     22                 .options(delimeter=',', inferSchema='True', header='True') \
     23                 .csv("s3a://airport-data/airlines.csv")

/opt/app-root/lib/python3.8/site-packages/pyspark/sql/readwriter.py in csv(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine, charToEscapeQuoteEscaping, samplingRatio, enforceSchema, emptyValue, locale, lineSep, pathGlobFilter, recursiveFileLookup)
    533             path = [path]
    534         if type(path) == list:
--> 535             return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    536         elif isinstance(path, RDD):
    537             def func(iterator):

/opt/app-root/lib/python3.8/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

/opt/app-root/lib/python3.8/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
    126     def deco(*a, **kw):
    127         try:
--> 128             return f(*a, **kw)
    129         except py4j.protocol.Py4JJavaError as e:
    130             converted = convert_exception(e.java_exception)

/opt/app-root/lib/python3.8/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o74.csv.
: java.nio.file.AccessDeniedException: airport-data: org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS Credentials provided by SimpleAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: Unable to load credentials from service endpoint
    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:187)
    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
    at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
    at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
    at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261)
    at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:236)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.verifyBucketExists(S3AFileSystem.java:375)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:311)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3303)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
    at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:46)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:366)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:297)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:286)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:286)
    at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:723)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS Credentials provided by SimpleAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: Unable to load credentials from service endpoint
    at org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:159)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1166)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:724)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4368)
    at com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:5129)
    at com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:5103)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4352)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4315)
    at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1344)
    at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1284)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$verifyBucketExists$1(S3AFileSystem.java:376)
    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
    ... 30 more
Caused by: com.amazonaws.SdkClientException: Unable to load credentials from service endpoint
    at com.amazonaws.auth.EC2CredentialsFetcher.handleError(EC2CredentialsFetcher.java:183)
    at com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:162)
    at com.amazonaws.auth.EC2CredentialsFetcher.getCredentials(EC2CredentialsFetcher.java:82)
    at com.amazonaws.auth.InstanceProfileCredentialsProvider.getCredentials(InstanceProfileCredentialsProvider.java:164)
    at org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:137)
    ... 47 more
Caused by: java.net.NoRouteToHostException: No route to host (Host unreachable)
    at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:399)
    at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:242)
    at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:224)
    at java.base/java.net.Socket.connect(Socket.java:609)
    at java.base/sun.net.NetworkClient.doConnect(NetworkClient.java:177)
    at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:474)
    at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:569)
    at java.base/sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
    at java.base/sun.net.www.http.HttpClient.New(HttpClient.java:341)
    at java.base/sun.net.www.http.HttpClient.New(HttpClient.java:362)
    at java.base/sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1253)
    at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1232)
    at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1081)
    at java.base/sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:1015)
    at com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:54)
    at com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:116)
    at com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:87)
    at com.amazonaws.auth.InstanceProfileCredentialsProvider$InstanceMetadataCredentialsEndpointProvider.getCredentialsEndpoint(InstanceProfileCredentialsProvider.java:189)
    at com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:122)
    ... 50 more

Screenshot from 2022-08-20 22-21-42

How to fix it? Thanks!

Previous steps in this script finished correctly

webmakaka commented 2 years ago

it seems it was a local problem with my computer resources.

I set grafana and prometheus-operator scale to 0. Run Elyra Notebook Image with Spark with a small container size.

And steps

RUN -> Chapter09/explore_data.ipynb
RUN -> Chapter09/merge_data.ipynb
RUN -> Chapter09/clean_data.ipynb

are OK for now.

=========================

My working config is:


$ export \
    PROFILE=marley-minikube \
    CPUS=8 \
    MEMORY=30G \
    HDD=80G \
    DRIVER=docker \
    KUBERNETES_VERSION=v1.24.4

And there is not enough resources

webmakaka commented 1 year ago

Could be a solution.

import os
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark S3 example") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio-ml-workshop:9000")\
    .config("spark.hadoop.fs.s3a.access.key", 'minio')\
    .config("spark.hadoop.fs.s3a.secret.key", 'minio123')\
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")\
    .config("spark.hadoop.fs.s3a.multipart.size", "104857600")\
    .config("spark.hadoop.fs.s3a.path.style.access", "true")\
    .getOrCreate()

dfAirlines = spark.read\
                .options(delimeter=',', inferSchema='True', header='True') \
                .csv("s3a://airport-data/airlines.csv")

dfAirlines.printSchema()

dfAirports = spark.read \
                .options(delimiter=',', inferSchema='True', header='True') \
                .csv("s3a://airport-data/airports.csv")

dfAirports.printSchema()

dfAirports.show(truncate=False)
dfAirlines.show(truncate=False)

print(dfAirports.count())
print(dfAirlines.count())

spark.stop()