Closed KeHe01 closed 2 years ago
22/10/07 19:59:24 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 22/10/07 19:59:24 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042. 22/10/07 19:59:24 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
you may need to close your other spark instances that are running
Closing other spark instances fixed this. Closing
(base) ke@Kes-MacBook-Pro generic-buy-now-pay-later-project-group-19 % conda activate ML (ML) ke@Kes-MacBook-Pro generic-buy-now-pay-later-project-group-19 % /Users/ke/opt/anaconda3/env s/ML/bin/python "/Users/ke/Desktop/2022 Study S2/Applied Data Science/generic-buy-now-pay-later- project-group-19/scripts/etl_script.py" /Users/ke/opt/anaconda3/envs/ML/lib/python3.9/site-packages/geopandas/_compat.py:112: UserWarning: The Shapely GEOS version (3.10.2-CAPI-1.16.0) is incompatible with the GEOS version PyGEOS was compiled with (3.10.1-CAPI-1.16.0). Conversions between both will be slow. warnings.warn( [nltk_data] Downloading package omw-1.4 to /Users/ke/nltk_data... [nltk_data] Package omw-1.4 is already up-to-date! [nltk_data] Downloading package stopwords to /Users/ke/nltk_data... [nltk_data] Unzipping corpora/stopwords.zip. [nltk_data] Downloading package words to /Users/ke/nltk_data... [nltk_data] Package words is already up-to-date! [nltk_data] Downloading package punkt to /Users/ke/nltk_data... [nltk_data] Unzipping tokenizers/punkt.zip. [nltk_data] Downloading package wordnet to /Users/ke/nltk_data... [nltk_data] Package wordnet is already up-to-date! <19:59:22 | print_utilities.py:54 | INFO> === CREATING THE SPARK SESSION
WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/Users/ke/opt/anaconda3/envs/ML/lib/python3.9/site-packages/pyspark/jars/spark-unsafe_2.12-3.1.2.jar) to constructor java.nio.DirectByteBuffer(long,int) WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 22/10/07 19:59:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 22/10/07 19:59:24 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 22/10/07 19:59:24 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042. 22/10/07 19:59:24 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043. <19:59:25 | print_utilities.py:54 | INFO> === READING IN THE RAW DATASETS
<19:59:46 | read_utilities.py:90 | INFO> 3643266 ROWS READ FROM ./data/tables/transactions_20210228_20210827_snapshot <19:59:46 | read_utilities.py:90 | INFO> 499999 ROWS READ FROM ./data/tables/consumer_user_details.parquet <20:00:07 | read_utilities.py:90 | INFO> 6044133 ROWS READ FROM ./data/tables/transactions_20220228_20220828_snapshot <20:00:08 | read_utilities.py:90 | INFO> 499999 ROWS READ FROM ./data/tables/tbl_consumer.csv <20:00:08 | read_utilities.py:90 | INFO> 4026 ROWS READ FROM ./data/tables/tbl_merchants.parquet <20:00:23 | read_utilities.py:90 | INFO> 4508106 ROWS READ FROM ./data/tables/transactions_20210828_20220227_snapshot <20:00:25 | read_utilities.py:97 | INFO> 18442 ROWS READ FOR POSTCODES DATA
<20:00:26 | read_utilities.py:99 | INFO> 2472 ROWS READ FOR CENSUS DATA <20:00:26 | print_utilities.py:54 | INFO> === SUMMARY INFORMATION
<20:00:26 | print_utilities.py:72 | INFO> Summary of transactions <20:00:26 | print_utilities.py:75 | INFO> StructType(List(StructField(user_id,LongType,true),StructField(merchant_abn,LongType,true),StructField(dollar_value,DoubleType,true),StructField(order_id,StringType,true),StructField(order_datetime,StringType,false))) <20:00:26 | print_utilities.py:72 | INFO> Summary of consumer_user_mappings <20:00:26 | print_utilities.py:75 | INFO> StructType(List(StructField(user_id,LongType,true),StructField(consumer_id,LongType,true))) <20:00:26 | print_utilities.py:72 | INFO> Summary of consumers <20:00:26 | print_utilities.py:75 | INFO> StructType(List(StructField(name,StringType,true),StructField(address,StringType,true),StructField(state,StringType,true),StructField(postcode,StringType,true),StructField(gender,StringType,true),StructField(consumer_id,StringType,true))) <20:00:26 | print_utilities.py:72 | INFO> Summary of merchants <20:00:26 | print_utilities.py:75 | INFO> StructType(List(StructField(name,StringType,true),StructField(tags,StringType,true),StructField(merchant_abn,LongType,true))) <20:00:26 | print_utilities.py:72 | INFO> Summary of postcodes <20:00:26 | print_utilities.py:75 | INFO> StructType(List(StructField(postcode,LongType,true),StructField(sa2_code,DoubleType,true))) <20:00:26 | print_utilities.py:72 | INFO> Summary of census <20:00:26 | print_utilities.py:75 | INFO> StructType(List(StructField(sa2_code,IntegerType,true),StructField(median_age_persons,StringType,true),StructField(median_mortgage_repay_monthly,StringType,true),StructField(median_tot_prsnl_inc_weekly,StringType,true),StructField(median_rent_weekly,StringType,true),StructField(median_tot_fam_inc_weekly,StringType,true),StructField(average_num_psns_per_bedroom,StringType,true),StructField(median_tot_hhd_inc_weekly,StringType,true),StructField(average_household_size,StringType,true))) <20:00:26 | print_utilities.py:54 | INFO> === CLEANING THE DATA
<20:00:26 | clean_utilities.py:58 | INFO> Removing transaction outliers 22/10/07 20:00:36 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB 22/10/07 20:00:42 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB <20:00:45 | clean_utilities.py:81 | INFO> Outlier Removal Summary:
22/10/07 20:00:51 WARN DAGScheduler: Broadcasting large task binary with size 2013.2 KiB 22/10/07 20:01:03 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB <20:01:08 | clean_utilities.py:84 | INFO> New range of dollar per transaction: 1.90 - 2064.62 <20:01:08 | clean_utilities.py:85 | INFO> Number of instances after outlier removal: 13879775 <20:01:08 | clean_utilities.py:86 | INFO> Number of outliers removed: 315730 <20:01:08 | clean_utilities.py:87 | INFO> % data removed: 2.22% <20:01:08 | clean_utilities.py:97 | INFO> Extracting merchant tags /Users/ke/opt/anaconda3/envs/ML/lib/python3.9/site-packages/sklearn/utils/deprecation.py:87: FutureWarning: Function get_feature_names is deprecated; get_feature_names is deprecated in 1.0 and will be removed in 1.2. Please use get_feature_names_out instead. warnings.warn(msg, category=FutureWarning) <20:01:43 | print_utilities.py:72 | INFO> Summary of transactions <20:01:43 | print_utilities.py:75 | INFO> StructType(List(StructField(user_id,LongType,true),StructField(merchant_abn,LongType,true),StructField(dollar_value,DoubleType,true),StructField(order_id,StringType,true),StructField(order_datetime,StringType,false),StructField(log(dollar_value),DoubleType,true))) <20:01:43 | print_utilities.py:72 | INFO> Summary of merchants <20:01:43 | print_utilities.py:75 | INFO> StructType(List(StructField(name,StringType,true),StructField(tags,StringType,true),StructField(merchant_abn,LongType,true),StructField(tag,StringType,true),StructField(revenue_level,StringType,true),StructField(take_rate,DoubleType,true))) <20:01:43 | print_utilities.py:72 | INFO> Summary of merchant_tags <20:01:43 | print_utilities.py:54 | INFO> === AGGREGATING THE DATA
<20:01:43 | agg_utilities.py:32 | INFO> Computing merchant_sales <20:01:43 | agg_utilities.py:36 | INFO> Computing customer_accounts <20:01:43 | agg_utilities.py:40 | INFO> Computing merchant_consumers <20:01:44 | agg_utilities.py:44 | INFO> Computing consumer_regions <20:01:44 | agg_utilities.py:48 | INFO> Computing consumer_region_incomes <20:01:50 | agg_utilities.py:53 | INFO> Computing merchant_metrics
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "refresh progress"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "RemoteBlock-temp-file-clean-thread"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "netty-rpc-env-timeout" Exception in thread "Spark Context Cleaner" java.lang.OutOfMemoryError: Java heap space 22/10/07 20:15:46 ERROR Utils: Uncaught exception in thread executor-heartbeater java.lang.OutOfMemoryError: Java heap space at java.base/java.util.stream.StreamSupport.stream(StreamSupport.java:69) at java.base/java.util.Collection.stream(Collection.java:711) at java.management/sun.management.spi.PlatformMBeanProvider$PlatformComponent.getMBeans(PlatformMBeanProvider.java:195) at java.management/java.lang.management.ManagementFactory.getPlatformMXBean(ManagementFactory.java:686) at java.management/java.lang.management.ManagementFactory.getMemoryMXBean(ManagementFactory.java:343) at org.apache.spark.metrics.JVMOffHeapMemory$.getMetricValue(ExecutorMetricType.scala:79) at org.apache.spark.metrics.SingleValueExecutorMetricType.getMetricValues(ExecutorMetricType.scala:46) at org.apache.spark.metrics.SingleValueExecutorMetricType.getMetricValues$(ExecutorMetricType.scala:44) at org.apache.spark.metrics.JVMOffHeapMemory$.getMetricValues(ExecutorMetricType.scala:77) at org.apache.spark.executor.ExecutorMetrics$.$anonfun$getCurrentMetrics$1(ExecutorMetrics.scala:103) at org.apache.spark.executor.ExecutorMetrics$.$anonfun$getCurrentMetrics$1$adapted(ExecutorMetrics.scala:102) at org.apache.spark.executor.ExecutorMetrics$$$Lambda$2041/0x0000000800ea6840.apply(Unknown Source) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at org.apache.spark.executor.ExecutorMetrics$.getCurrentMetrics(ExecutorMetrics.scala:102) at org.apache.spark.executor.ExecutorMetricsPoller.poll(ExecutorMetricsPoller.scala:82) at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:974) at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:212) at org.apache.spark.executor.Executor$$Lambda$646/0x00000008005e2440.apply$mcV$sp(Unknown Source) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1996) at org.apache.spark.Heartbeater$$anon$1.run(Heartbeater.scala:46) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) 22/10/07 20:15:46 ERROR Utils: Uncaught exception in thread driver-heartbeater java.lang.OutOfMemoryError: Java heap space 22/10/07 20:15:46 WARN QueuedThreadPool: java.lang.OutOfMemoryError: Java heap space Traceback (most recent call last): File "/Users/ke/Desktop/2022 Study S2/Applied Data Science/generic-buy-now-pay-later-project-group-19/scripts/etl_script.py", line 147, in
output = etl(spark, input_path, output_path)
File "/Users/ke/Desktop/2022 Study S2/Applied Data Science/generic-buy-now-pay-later-project-group-19/scripts/etl_script.py", line 71, in etl AGG.compute_aggregates(spark, data_dict) File "/Users/ke/Desktop/2022 Study S2/Applied Data Science/generic-buy-now-pay-later-project-group-19/scripts/utilities/agg_utilities.py", line 54, in compute_aggregates data_dict['merchant_metrics'] = compute_merchant_metrics(spark, File "/Users/ke/Desktop/2022 Study S2/Applied Data Science/generic-buy-now-pay-later-project-group-19/scripts/utilities/agg_utilities.py", line 292, in compute_merchant_metrics date_range = merchant_sales.select(F.min("order_datetime"), File "/Users/ke/opt/anaconda3/envs/ML/lib/python3.9/site-packages/pyspark/sql/dataframe.py", line 1601, in first return self.head() File "/Users/ke/opt/anaconda3/envs/ML/lib/python3.9/site-packages/pyspark/sql/dataframe.py", line 1587, in head rs = self.head(1) File "/Users/ke/opt/anaconda3/envs/ML/lib/python3.9/site-packages/pyspark/sql/dataframe.py", line 1589, in head return self.take(n) File "/Users/ke/opt/anaconda3/envs/ML/lib/python3.9/site-packages/pyspark/sql/dataframe.py", line 728, in take return self.limit(num).collect() File "/Users/ke/opt/anaconda3/envs/ML/lib/python3.9/site-packages/pyspark/sql/dataframe.py", line 677, in collect sock_info = self._jdf.collectToPython() File "/Users/ke/opt/anaconda3/envs/ML/lib/python3.9/site-packages/py4j/java_gateway.py", line 1304, in call return_value = get_return_value( File "/Users/ke/opt/anaconda3/envs/ML/lib/python3.9/site-packages/pyspark/sql/utils.py", line 111, in deco return f(*a, **kw) File "/Users/ke/opt/anaconda3/envs/ML/lib/python3.9/site-packages/py4j/protocol.py", line 326, in get_return_value raise Py4JJavaError( py4j.protocol.Py4JJavaError: An error occurred while calling o4703.collectToPython. : java.lang.OutOfMemoryError: Java heap space