delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.18k stars 1.62k forks source link

Error writing streaming data to delta lake #46

Closed colinglaes closed 4 years ago

colinglaes commented 5 years ago

I've created my first delta lake table succesfully but i'm having some trouble writing streaming data to it. I generated the table using a set of json files and was testing by copying over one of those files into the streaming directory after creating the file but ran into this error: ERROR:log:An error occurred while calling o326.save.. The full logs are below. I'm running locally on a pseudo-distributed mode with the below setup. Please let me know if any additional details would be helpful!

Java

(mve) [cglaes@blackened : mny : 19:12] java -version
openjdk version "1.8.0_191"
OpenJDK Runtime Environment (build 1.8.0_191-8u191-b12-2ubuntu0.18.04.1-b12)
OpenJDK 64-Bit Server VM (build 25.191-b12, mixed mode)
(mve) [cglaes@blackened : mny : 19:13] 

Hadoop

(mve) [cglaes@blackened : mny : 19:10] hadoop version
Hadoop 2.7.6
Subversion https://shv@git-wip-us.apache.org/repos/asf/hadoop.git -r 085099c66cf28be31604560c376fa282e69282b8
Compiled by kshvachk on 2018-04-18T01:33Z
Compiled with protoc 2.5.0
From source with checksum 71e2695531cb3360ab74598755d036
This command was run using /home/cglaes/bin/hadoop/share/hadoop/common/hadoop-common-2.7.6.jar
(mve) [cglaes@blackened : mny : 19:10] 

Spark

(mve) [cglaes@blackened : mny : 19:12] pyspark --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.2
      /_/

Using Scala version 2.12.8, OpenJDK 64-Bit Server VM, 1.8.0_191
Branch 
Compiled by user  on 2019-04-18T23:18:13Z
Revision 
Url 
Type --help for more information.
(mve) [cglaes@blackened : mny : 19:12]

Below are the logs from my resource manager

ERROR:log:received empty C json rdd, skipping
INFO:log:transforming C json rdd to DataFrame
INFO:log:writing below streaming forex_quote_log data to delta
+-------+-----------+---------+---------+-------------+----------+
|   pair|exchange_id|ask_price|bid_price|        epoch|      date|
+-------+-----------+---------+---------+-------------+----------+
|HUF/EUR|         48|0.0030891|0.0030825|1557290584000|2019-05-08|
|XAU/JPY|         48| 141550.0| 141449.0|1557290584000|2019-05-08|
|ZAR/BWP|         48|  0.74758|  0.74714|1557290584000|2019-05-08|
|NZD/EUR|         48|  0.58779|  0.58759|1557290587000|2019-05-08|
|HRK/CAD|         48|  0.20356|  0.20353|1557290585000|2019-05-08|
|USD/EUR|         48| 0.892618| 0.892379|1557290585000|2019-05-08|
|BWP/ZAR|         48|   1.3384|   1.3376|1557290584000|2019-05-08|
|TRY/CHF|         48| 0.165202| 0.165084|1557290585000|2019-05-08|
|USD/SGD|         48|  1.36094|  1.36079|1557290587000|2019-05-08|
|NGN/ZAR|         48| 0.040118| 0.040034|1557290586000|2019-05-08|
|NZD/DKK|         48|   4.3885|   4.3868|1557290587000|2019-05-08|
|DKK/EUR|         48|  0.13395|  0.13393|1557290586000|2019-05-08|
|XAG/AUD|         48|   21.338|   21.232|1557290584000|2019-05-08|
|AUD/PLN|         48|  2.68999|  2.68832|1557290586000|2019-05-08|
|ZAR/PKR|         48|   9.8109|   9.8051|1557290584000|2019-05-08|
|THB/SGD|         48| 0.042785| 0.042774|1557290587000|2019-05-08|
|MAD/AUD|         48|  0.14728|  0.14727|1557290585000|2019-05-08|
|JPY/ZAR|         48|  0.13115|  0.13107|1557290584000|2019-05-08|
|DKK/THB|         48|   4.7744|   4.7735|1557290585000|2019-05-08|
|SGD/ZAR|         48|  10.5989|  10.5903|1557290586000|2019-05-08|
+-------+-----------+---------+---------+-------------+----------+
only showing top 20 rows

ERROR:log:An error occurred while calling o326.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 17 in stage 13.0 failed 4 times, most recent failure: Lost task 17.3 in stage 13.0 (TID 188, blackened, executor 1): java.lang.AssertionError: File (file:/data/stream/polygon/C/C_20190508044339_155729061998421.json) doesn't belong in the transaction log at hdfs://localhost:9000/data/lake/polygon/forex_quote_log/_delta_log. Please contact Databricks Support.
    at org.apache.spark.sql.delta.Snapshot$.$anonfun$assertLogBelongsToTable$1(Snapshot.scala:216)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.serializefromobject_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1889)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1877)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1876)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:274)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3383)
    at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2544)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3364)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2551)
    at org.apache.spark.sql.Dataset.first(Dataset.scala:2558)
    at org.apache.spark.sql.delta.Snapshot.<init>(Snapshot.scala:141)
    at org.apache.spark.sql.delta.DeltaLog.$anonfun$updateInternal$2(DeltaLog.scala:318)
    at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode(DeltaProgressReporter.scala:30)
    at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode$(DeltaProgressReporter.scala:25)
    at org.apache.spark.sql.delta.DeltaLog.withStatusCode(DeltaLog.scala:59)
    at org.apache.spark.sql.delta.DeltaLog.$anonfun$updateInternal$1(DeltaLog.scala:251)
    at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:75)
    at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:65)
    at org.apache.spark.sql.delta.DeltaLog.recordOperation(DeltaLog.scala:59)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:105)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:91)
    at org.apache.spark.sql.delta.DeltaLog.recordDeltaOperation(DeltaLog.scala:59)
    at org.apache.spark.sql.delta.DeltaLog.updateInternal(DeltaLog.scala:251)
    at org.apache.spark.sql.delta.DeltaLog.$anonfun$update$1(DeltaLog.scala:211)
    at org.apache.spark.sql.delta.DeltaLog.lockInterruptibly(DeltaLog.scala:181)
    at org.apache.spark.sql.delta.DeltaLog.update(DeltaLog.scala:211)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$doCommit$1(OptimisticTransaction.scala:323)
    at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
    at org.apache.spark.sql.delta.DeltaLog.lockInterruptibly(DeltaLog.scala:181)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommit(OptimisticTransaction.scala:316)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$commit$1(OptimisticTransaction.scala:232)
    at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
    at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:75)
    at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:65)
    at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:66)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:105)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:91)
    at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:66)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl.commit(OptimisticTransaction.scala:218)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl.commit$(OptimisticTransaction.scala:216)
    at org.apache.spark.sql.delta.OptimisticTransaction.commit(OptimisticTransaction.scala:66)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:72)
    at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:146)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:676)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:290)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.AssertionError: File (file:/data/stream/polygon/C/C_20190508044339_155729061998421.json) doesn't belong in the transaction log at hdfs://localhost:9000/data/lake/polygon/forex_quote_log/_delta_log. Please contact Databricks Support.
    at org.apache.spark.sql.delta.Snapshot$.$anonfun$assertLogBelongsToTable$1(Snapshot.scala:216)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.serializefromobject_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

First 10 lines of the file in question

(mve) [cglaes@blackened : mny : 19:17] head -n 10 /data/stream/polygon/C/C_20190508044339_155729061998421.json
{"pair": "HUF/EUR", "exchange_id": 48, "ask_price": 0.0030891, "bid_price": 0.0030825, "epoch": 1557290584000}
{"pair": "XAU/JPY", "exchange_id": 48, "ask_price": 141550, "bid_price": 141449, "epoch": 1557290584000}
{"pair": "ZAR/BWP", "exchange_id": 48, "ask_price": 0.74758, "bid_price": 0.74714, "epoch": 1557290584000}
{"pair": "NZD/EUR", "exchange_id": 48, "ask_price": 0.58779, "bid_price": 0.58759, "epoch": 1557290587000}
{"pair": "HRK/CAD", "exchange_id": 48, "ask_price": 0.20356, "bid_price": 0.20353, "epoch": 1557290585000}
{"pair": "USD/EUR", "exchange_id": 48, "ask_price": 0.892618, "bid_price": 0.892379, "epoch": 1557290585000}
{"pair": "BWP/ZAR", "exchange_id": 48, "ask_price": 1.3384, "bid_price": 1.3376, "epoch": 1557290584000}
{"pair": "TRY/CHF", "exchange_id": 48, "ask_price": 0.165202, "bid_price": 0.165084, "epoch": 1557290585000}
{"pair": "USD/SGD", "exchange_id": 48, "ask_price": 1.36094, "bid_price": 1.36079, "epoch": 1557290587000}
{"pair": "NGN/ZAR", "exchange_id": 48, "ask_price": 0.040118, "bid_price": 0.040034, "epoch": 1557290586000}
(mve) [cglaes@blackened : mny : 19:17] 
brkyvz commented 5 years ago

Hi @colinglaes, could you share your code on how you've created the table and are writing to the table as well please?

colinglaes commented 5 years ago

Sure thing, one extra note is that I ran the table creation via the pyspark shell.

from pyspark.sql.functions import input_file_name 

# read data
forex_quote_log_raw = spark.read.json('file:///data/stream/polygon/C/')

# add source file path & register temp view
forex_quote_log_raw = forex_quote_log_raw.withColumn('source_file_path', input_file_name())
forex_quote_log_raw.createOrReplaceTempView('forex_quote_log_raw')

# generate ultimate delta schema
forex_quote_log = spark.sql(
'''
SELECT pair,
       exchange_id,
       ask_price,
       bid_price,
       epoch,
       cast(from_unixtime(epoch / 1000.0) as date) as date,
       source_file_path
FROM forex_quote_log_raw
'''
)

# save to delta
forex_quote_log.write\
       .partitionBy('date')\
       .format('delta')\
       .save('/data/lake/polygon/forex_quote_log')
spark.sql(
'''
CREATE TABLE forex_quote_log
USING DELTA
LOCATION "/data/lake/polygon/forex_quote_log/"
'''
)
import json
import logging

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import input_file_name 
from pyspark.sql.types import *

logging.basicConfig()
logger = logging.getLogger('log')
logger.setLevel(logging.INFO)

TRANSFORM_SQL = '''
SELECT pair,
       exchange_id,
       ask_price,
       bid_price,
       epoch,
       cast(from_unixtime(epoch / 1000.0) as date) as date,
       source_file_path
FROM forex_quote_log_raw
'''

def process(time, forex_json_rdd):
    try:
        if forex_json_rdd.isEmpty():
            raise Exception('received empty C json rdd, skipping')            
        logger.info('transforming C json rdd to DataFrame')            
        sql_context = SQLContext(forex_json_rdd.context)
        forex_quote_log_raw = sql_context.read.json(forex_json_rdd)
        forex_quote_log_raw = forex_quote_log_raw.withColumn(
            'source_file_path',
            input_file_name()
        )
        forex_quote_log_raw.createOrReplaceTempView('forex_quote_log_raw')
        forex_quote_log = sql_context.sql(TRANSFORM_SQL)
        logger.info('writing below streaming forex_quote_log data to delta')
        forex_quote_log.show()
        forex_quote_log.write\
            .format('delta')\
            .mode('append')\
            .save('/data/lake/polygon/forex_quote_log')
    except Exception as e:
        logger.error(e)

def main():
    # setup configuration / context
    configuration = SparkConf()
    spark_context = SparkContext(conf = configuration)
    streaming_context = StreamingContext(spark_context, 1)
    # transform data
    forex_json_raw = streaming_context.textFileStream('file:///data/stream/polygon/C')
    forex_json_parsed = forex_json_raw.map(lambda v: json.loads(v))
    forex_json_parsed.foreachRDD(process)
    # launch long-running job
    streaming_context.start()
    streaming_context.awaitTermination()
main()
tdas commented 5 years ago

Hello @colinglaes, I think this is something to do with how the output path /data/lake/polygon/forex_quote_log is being interpreted. Since you have not specified "file://" to explicitly make it a local path, something in the hadoop configuration is misinterpreting it to be a hadoop file, thus causing downstream inconsistencies in the processing of the paths of data files.

Could you try writing out to file:///data/lake/polygon/forex_quote_log (maybe first delete the directory if it exists)?

colinglaes commented 5 years ago

that’s a good point, i’m in the process of reinstalling hadoop and upgrading to spark 2.4.3; i’ll check once it’s setup and let you know

tdas commented 5 years ago

@colinglaes any update on this?

colinglaes commented 5 years ago

i’ve had a busy couple of days at work, may be able to get to this tomorrow night but more likely to be sometime this weekend

zsxwing commented 5 years ago

@colinglaes we found a bug in pyspark may lead to such failure: https://issues.apache.org/jira/browse/SPARK-27711

colinglaes commented 5 years ago

@zsxwing @tdas I figured that may have been causing an issue so i ran the pipeline without that call / field on sunday but it had the same failure behavior. more fires at work so i won’t get to this until the weekend

colinglaes commented 5 years ago

i just installed hadoop 3.03 and spark 2.4.3 and was trying to run this on yarn but ran into an error which seems to be centered around netty; the error is below

Traceback (most recent call last):
  File "test.py", line 51, in <module>
    spark_context = SparkContext(conf = configuration)
  File "/home/cglaes/bin/spark/python/lib/pyspark.zip/pyspark/context.py", line 136, in __init__
  File "/home/cglaes/bin/spark/python/lib/pyspark.zip/pyspark/context.py", line 198, in _do_init
  File "/home/cglaes/bin/spark/python/lib/pyspark.zip/pyspark/context.py", line 306, in _initialize_context
  File "/home/cglaes/bin/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1525, in __call__
  File "/home/cglaes/bin/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.NoSuchMethodError: io.netty.buffer.PooledByteBufAllocator.defaultNumHeapArena()I
    at org.apache.spark.network.util.NettyUtils.createPooledByteBufAllocator(NettyUtils.java:113)
    at org.apache.spark.network.client.TransportClientFactory.<init>(TransportClientFactory.java:106)
    at org.apache.spark.network.TransportContext.createClientFactory(TransportContext.java:99)
    at org.apache.spark.rpc.netty.NettyRpcEnv.<init>(NettyRpcEnv.scala:71)
    at org.apache.spark.rpc.netty.NettyRpcEnvFactory.create(NettyRpcEnv.scala:461)
    at org.apache.spark.rpc.RpcEnv$.create(RpcEnv.scala:57)
    at org.apache.spark.SparkEnv$.create(SparkEnv.scala:249)
    at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:175)
    at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:257)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:424)
    at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:238)
    at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
    at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

Since i ran into this roadblock i just ran the streaming job on standalone, both on hdfs & file delta tables and although there weren't any errors, when i checked the data it wasn't there

Output from Streaming

-------------------------------------------
Time: 2019-05-20 07:29:52
-------------------------------------------

2019-05-20 07:29:52,030 INFO scheduler.JobScheduler: Finished job streaming job 1558337392000 ms.0 from job set of time 1558337392000 ms
2019-05-20 07:29:52,030 INFO python.PythonRDD: Removing RDD 1565 from persistence list
2019-05-20 07:29:52,030 INFO scheduler.JobScheduler: Total delay: 0.030 s for time 1558337392000 ms (execution: 0.001 s)
2019-05-20 07:29:52,030 INFO rdd.MapPartitionsRDD: Removing RDD 1564 from persistence list
2019-05-20 07:29:52,031 INFO storage.BlockManager: Removing RDD 1564
2019-05-20 07:29:52,031 INFO storage.BlockManager: Removing RDD 1565
2019-05-20 07:29:52,031 INFO rdd.UnionRDD: Removing RDD 1380 from persistence list
2019-05-20 07:29:52,031 INFO storage.BlockManager: Removing RDD 1380
2019-05-20 07:29:52,031 INFO dstream.FileInputDStream: Cleared 1 old files that were older than 1558337332000 ms: 1558337331000 ms
2019-05-20 07:29:52,031 INFO scheduler.ReceivedBlockTracker: Deleting batches:
2019-05-20 07:29:52,031 INFO scheduler.InputInfoTracker: remove old batch metadata: 1558337331000 ms
2019-05-20 07:29:53,023 INFO dstream.FileInputDStream: Finding new files took 23 ms 
2019-05-20 07:29:53,023 INFO dstream.FileInputDStream: New files at time 1558337393000 ms:

2019-05-20 07:29:53,031 INFO scheduler.JobScheduler: Added jobs for time 1558337393000 ms 
2019-05-20 07:29:53,031 INFO scheduler.JobScheduler: Starting job streaming job 1558337393000 ms.0 from job set of time 1558337393000 ms
-------------------------------------------
Time: 2019-05-20 07:29:53
-------------------------------------------

2019-05-20 07:29:53,032 INFO scheduler.JobScheduler: Finished job streaming job 1558337393000 ms.0 from job set of time 1558337393000 ms
2019-05-20 07:29:53,032 INFO python.PythonRDD: Removing RDD 1568 from persistence list
2019-05-20 07:29:53,032 INFO scheduler.JobScheduler: Total delay: 0.032 s for time 1558337393000 ms (execution: 0.001 s)
2019-05-20 07:29:53,032 INFO storage.BlockManager: Removing RDD 1568
2019-05-20 07:29:53,032 INFO rdd.MapPartitionsRDD: Removing RDD 1567 from persistence list
2019-05-20 07:29:53,032 INFO storage.BlockManager: Removing RDD 1567
2019-05-20 07:29:53,032 INFO rdd.UnionRDD: Removing RDD 1383 from persistence list
2019-05-20 07:29:53,032 INFO storage.BlockManager: Removing RDD 1383
2019-05-20 07:29:53,032 INFO dstream.FileInputDStream: Cleared 1 old files that were older than 1558337333000 ms: 1558337332000 ms
2019-05-20 07:29:53,032 INFO scheduler.ReceivedBlockTracker: Deleting batches:
2019-05-20 07:29:53,032 INFO scheduler.InputInfoTracker: remove old batch metadata: 1558337332000 ms
2019-05-20 07:29:54,017 INFO dstream.FileInputDStream: Finding new files took 17 ms 
2019-05-20 07:29:54,017 INFO dstream.FileInputDStream: New files at time 1558337394000 ms:

2019-05-20 07:29:54,023 INFO scheduler.JobScheduler: Added jobs for time 1558337394000 ms 
2019-05-20 07:29:54,023 INFO scheduler.JobScheduler: Starting job streaming job 1558337394000 ms.0 from job set of time 1558337394000 ms
-------------------------------------------
Time: 2019-05-20 07:29:54
-------------------------------------------

2019-05-20 07:29:54,024 INFO scheduler.JobScheduler: Finished job streaming job 1558337394000 ms.0 from job set of time 1558337394000 ms
2019-05-20 07:29:54,024 INFO python.PythonRDD: Removing RDD 1571 from persistence list
2019-05-20 07:29:54,024 INFO scheduler.JobScheduler: Total delay: 0.024 s for time 1558337394000 ms (execution: 0.001 s)
2019-05-20 07:29:54,024 INFO storage.BlockManager: Removing RDD 1571
2019-05-20 07:29:54,024 INFO rdd.MapPartitionsRDD: Removing RDD 1570 from persistence list
2019-05-20 07:29:54,024 INFO storage.BlockManager: Removing RDD 1570
2019-05-20 07:29:54,024 INFO rdd.UnionRDD: Removing RDD 1386 from persistence list
2019-05-20 07:29:54,024 INFO storage.BlockManager: Removing RDD 1386
2019-05-20 07:29:54,024 INFO dstream.FileInputDStream: Cleared 1 old files that were older than 1558337334000 ms: 1558337333000 ms
2019-05-20 07:29:54,024 INFO scheduler.ReceivedBlockTracker: Deleting batches:
2019-05-20 07:29:54,024 INFO scheduler.InputInfoTracker: remove old batch metadata: 1558337333000 ms
2019-05-20 07:29:55,021 INFO dstream.FileInputDStream: Finding new files took 21 ms 
2019-05-20 07:29:55,021 INFO dstream.FileInputDStream: New files at time 1558337395000 ms:
file:/data/stream/polygon/C/C_20190520072954_155833739410724.json
2019-05-20 07:29:55,023 INFO memory.MemoryStore: Block broadcast_48 stored as values in memory (estimated size 279.8 KB, free 14.8 GB) 
2019-05-20 07:29:55,029 INFO memory.MemoryStore: Block broadcast_48_piece0 stored as bytes in memory (estimated size 23.2 KB, free 14.8 GB)
2019-05-20 07:29:55,029 INFO storage.BlockManagerInfo: Added broadcast_48_piece0 in memory on blackened.lan:36881 (size: 23.2 KB, free: 14.8 GB)
2019-05-20 07:29:55,029 INFO spark.SparkContext: Created broadcast 48 from textFileStream at NativeMethodAccessorImpl.java:0
2019-05-20 07:29:55,029 INFO input.FileInputFormat: Total input paths to process : 1
2019-05-20 07:29:55,036 INFO scheduler.JobScheduler: Added jobs for time 1558337395000 ms
2019-05-20 07:29:55,036 INFO scheduler.JobScheduler: Starting job streaming job 1558337395000 ms.0 from job set of time 1558337395000 ms
2019-05-20 07:29:55,040 INFO spark.SparkContext: Starting job: runJob at PythonRDD.scala:153 
2019-05-20 07:29:55,041 INFO scheduler.DAGScheduler: Got job 24 (runJob at PythonRDD.scala:153) with 1 output partitions
2019-05-20 07:29:55,041 INFO scheduler.DAGScheduler: Final stage: ResultStage 24 (runJob at PythonRDD.scala:153)
2019-05-20 07:29:55,041 INFO scheduler.DAGScheduler: Parents of final stage: List()
2019-05-20 07:29:55,041 INFO scheduler.DAGScheduler: Missing parents: List()
2019-05-20 07:29:55,042 INFO scheduler.DAGScheduler: Submitting ResultStage 24 (PythonRDD[1579] at RDD at PythonRDD.scala:53), which has no missing parents
2019-05-20 07:29:55,043 INFO memory.MemoryStore: Block broadcast_49 stored as values in memory (estimated size 7.3 KB, free 14.8 GB)
2019-05-20 07:29:55,045 INFO memory.MemoryStore: Block broadcast_49_piece0 stored as bytes in memory (estimated size 4.3 KB, free 14.8 GB)
2019-05-20 07:29:55,045 INFO storage.BlockManagerInfo: Added broadcast_49_piece0 in memory on blackened.lan:36881 (size: 4.3 KB, free: 14.8 GB)
2019-05-20 07:29:55,046 INFO spark.SparkContext: Created broadcast 49 from broadcast at DAGScheduler.scala:1161
2019-05-20 07:29:55,046 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 24 (PythonRDD[1579] at RDD at PythonRDD.scala:53) (first 15 tasks are for partitions Vector(0))
2019-05-20 07:29:55,046 INFO scheduler.TaskSchedulerImpl: Adding task set 24.0 with 1 tasks
2019-05-20 07:29:55,047 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 24.0 (TID 24, localhost, executor driver, partition 0, PROCESS_LOCAL, 8076 bytes)
2019-05-20 07:29:55,047 INFO executor.Executor: Running task 0.0 in stage 24.0 (TID 24)
2019-05-20 07:29:55,048 INFO rdd.NewHadoopRDD: Input split: file:/data/stream/polygon/C/C_20190520072954_155833739410724.json:0+1066260 
2019-05-20 07:29:55,067 INFO python.PythonRunner: Times: total = 15, boot = 4, init = 11, finish = 0 
2019-05-20 07:29:55,069 INFO executor.Executor: Finished task 0.0 in stage 24.0 (TID 24). 2822 bytes result sent to driver
2019-05-20 07:29:55,070 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 24.0 (TID 24) in 24 ms on localhost (executor driver) (1/1)
2019-05-20 07:29:55,070 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 24.0, whose tasks have all completed, from pool
2019-05-20 07:29:55,070 INFO scheduler.DAGScheduler: ResultStage 24 (runJob at PythonRDD.scala:153) finished in 0.028 s
2019-05-20 07:29:55,070 INFO scheduler.DAGScheduler: Job 24 finished: runJob at PythonRDD.scala:153, took 0.030316 s
-------------------------------------------
Time: 2019-05-20 07:29:55
-------------------------------------------
{'pair': 'CHF/HKD', 'exchange_id': 48, 'ask_price': 7.76585, 'bid_price': 7.7642, 'epoch': 1558337374000}
{'pair': 'USD/CHF', 'exchange_id': 48, 'ask_price': 1.01086, 'bid_price': 1.01071, 'epoch': 1558337374000}
{'pair': 'GBP/CHF', 'exchange_id': 48, 'ask_price': 1.28712, 'bid_price': 1.287, 'epoch': 1558337374000}
{'pair': 'ZAR/NOK', 'exchange_id': 48, 'ask_price': 0.61094, 'bid_price': 0.61083, 'epoch': 1558337374000}
{'pair': 'USD/SEK', 'exchange_id': 48, 'ask_price': 9.63789, 'bid_price': 9.63489, 'epoch': 1558337374000}
{'pair': 'SEK/USD', 'exchange_id': 48, 'ask_price': 0.10379, 'bid_price': 0.10376, 'epoch': 1558337374000}
{'pair': 'EGP/ZAR', 'exchange_id': 48, 'ask_price': 0.84236, 'bid_price': 0.8422, 'epoch': 1558337374000}
{'pair': 'LKR/ZAR', 'exchange_id': 48, 'ask_price': 0.081598, 'bid_price': 0.081583, 'epoch': 1558337374000}
{'pair': 'NGN/ZAR', 'exchange_id': 48, 'ask_price': 0.039825, 'bid_price': 0.039818, 'epoch': 1558337374000}
{'pair': 'CHF/CLP', 'exchange_id': 48, 'ask_price': 691.2, 'bid_price': 691.09, 'epoch': 1558337374000}
...

2019-05-20 07:29:55,073 INFO scheduler.JobScheduler: Finished job streaming job 1558337395000 ms.0 from job set of time 1558337395000 ms
2019-05-20 07:29:55,073 INFO python.PythonRDD: Removing RDD 1574 from persistence list
2019-05-20 07:29:55,073 INFO scheduler.JobScheduler: Total delay: 0.073 s for time 1558337395000 ms (execution: 0.037 s)
2019-05-20 07:29:55,073 INFO storage.BlockManager: Removing RDD 1574
2019-05-20 07:29:55,073 INFO rdd.MapPartitionsRDD: Removing RDD 1573 from persistence list
2019-05-20 07:29:55,074 INFO storage.BlockManager: Removing RDD 1573 
2019-05-20 07:29:55,074 INFO rdd.UnionRDD: Removing RDD 1389 from persistence list
2019-05-20 07:29:55,074 INFO storage.BlockManager: Removing RDD 1389
2019-05-20 07:29:55,074 INFO dstream.FileInputDStream: Cleared 1 old files that were older than 1558337335000 ms: 1558337334000 ms
2019-05-20 07:29:55,074 INFO scheduler.ReceivedBlockTracker: Deleting batches:
2019-05-20 07:29:55,074 INFO scheduler.InputInfoTracker: remove old batch metadata: 1558337334000 ms
2019-05-20 07:29:56,022 INFO dstream.FileInputDStream: Finding new files took 22 ms 
2019-05-20 07:29:56,022 INFO dstream.FileInputDStream: New files at time 1558337396000 ms:

2019-05-20 07:29:56,029 INFO scheduler.JobScheduler: Added jobs for time 1558337396000 ms 
2019-05-20 07:29:56,029 INFO scheduler.JobScheduler: Starting job streaming job 1558337396000 ms.0 from job set of time 1558337396000 ms
-------------------------------------------
Time: 2019-05-20 07:29:56
-------------------------------------------

2019-05-20 07:29:56,030 INFO scheduler.JobScheduler: Finished job streaming job 1558337396000 ms.0 from job set of time 1558337396000 ms
2019-05-20 07:29:56,030 INFO python.PythonRDD: Removing RDD 1578 from persistence list
2019-05-20 07:29:56,030 INFO scheduler.JobScheduler: Total delay: 0.030 s for time 1558337396000 ms (execution: 0.001 s)
2019-05-20 07:29:56,030 INFO storage.BlockManager: Removing RDD 1578
2019-05-20 07:29:56,030 INFO rdd.MapPartitionsRDD: Removing RDD 1577 from persistence list
2019-05-20 07:29:56,030 INFO storage.BlockManager: Removing RDD 1577
2019-05-20 07:29:56,030 INFO rdd.UnionRDD: Removing RDD 1392 from persistence list
2019-05-20 07:29:56,030 INFO storage.BlockManager: Removing RDD 1392
2019-05-20 07:29:56,030 INFO dstream.FileInputDStream: Cleared 1 old files that were older than 1558337336000 ms: 1558337335000 ms
2019-05-20 07:29:56,030 INFO scheduler.ReceivedBlockTracker: Deleting batches:
2019-05-20 07:29:56,030 INFO scheduler.InputInfoTracker: remove old batch metadata: 1558337335000 ms
2019-05-20 07:29:57,023 INFO spark.ContextCleaner: Cleaned accumulator 610 
2019-05-20 07:29:57,024 INFO spark.ContextCleaner: Cleaned accumulator 608
2019-05-20 07:29:57,024 INFO spark.ContextCleaner: Cleaned accumulator 620
2019-05-20 07:29:57,024 INFO spark.ContextCleaner: Cleaned accumulator 605
2019-05-20 07:29:57,024 INFO spark.ContextCleaner: Cleaned accumulator 607
2019-05-20 07:29:57,024 INFO spark.ContextCleaner: Cleaned accumulator 618
2019-05-20 07:29:57,024 INFO spark.ContextCleaner: Cleaned accumulator 621
2019-05-20 07:29:57,024 INFO spark.ContextCleaner: Cleaned accumulator 602
2019-05-20 07:29:57,024 INFO spark.ContextCleaner: Cleaned accumulator 606
2019-05-20 07:29:57,024 INFO spark.ContextCleaner: Cleaned accumulator 615
2019-05-20 07:29:57,024 INFO spark.ContextCleaner: Cleaned accumulator 623
2019-05-20 07:29:57,024 INFO spark.ContextCleaner: Cleaned accumulator 601
2019-05-20 07:29:57,024 INFO spark.ContextCleaner: Cleaned accumulator 613
2019-05-20 07:29:57,024 INFO spark.ContextCleaner: Cleaned accumulator 603
2019-05-20 07:29:57,024 INFO spark.ContextCleaner: Cleaned accumulator 619
2019-05-20 07:29:57,024 INFO spark.ContextCleaner: Cleaned accumulator 611
2019-05-20 07:29:57,024 INFO spark.ContextCleaner: Cleaned accumulator 616
2019-05-20 07:29:57,024 INFO spark.ContextCleaner: Cleaned accumulator 612 
2019-05-20 07:29:57,024 INFO dstream.FileInputDStream: Finding new files took 23 ms
2019-05-20 07:29:57,024 INFO dstream.FileInputDStream: New files at time 1558337397000 ms:

2019-05-20 07:29:57,024 INFO storage.BlockManagerInfo: Removed broadcast_49_piece0 on blackened.lan:36881 in memory (size: 4.3 KB, free: 14.8 GB)
2019-05-20 07:29:57,024 INFO spark.ContextCleaner: Cleaned accumulator 622
2019-05-20 07:29:57,024 INFO spark.ContextCleaner: Cleaned accumulator 604
2019-05-20 07:29:57,024 INFO spark.ContextCleaner: Cleaned accumulator 614
2019-05-20 07:29:57,024 INFO spark.ContextCleaner: Cleaned accumulator 624
2019-05-20 07:29:57,024 INFO spark.ContextCleaner: Cleaned accumulator 617
2019-05-20 07:29:57,024 INFO spark.ContextCleaner: Cleaned accumulator 609
2019-05-20 07:29:57,024 INFO spark.ContextCleaner: Cleaned accumulator 625
2019-05-20 07:29:57,030 INFO scheduler.JobScheduler: Added jobs for time 1558337397000 ms
2019-05-20 07:29:57,030 INFO scheduler.JobScheduler: Starting job streaming job 1558337397000 ms.0 from job set of time 1558337397000 ms
-------------------------------------------
Time: 2019-05-20 07:29:57
-------------------------------------------

2019-05-20 07:29:57,031 INFO scheduler.JobScheduler: Finished job streaming job 1558337397000 ms.0 from job set of time 1558337397000 ms
2019-05-20 07:29:57,031 INFO python.PythonRDD: Removing RDD 1582 from persistence list
2019-05-20 07:29:57,031 INFO scheduler.JobScheduler: Total delay: 0.031 s for time 1558337397000 ms (execution: 0.001 s)
2019-05-20 07:29:57,031 INFO rdd.MapPartitionsRDD: Removing RDD 1581 from persistence list
2019-05-20 07:29:57,031 INFO storage.BlockManager: Removing RDD 1581
2019-05-20 07:29:57,031 INFO rdd.UnionRDD: Removing RDD 1395 from persistence list
2019-05-20 07:29:57,031 INFO storage.BlockManager: Removing RDD 1582
2019-05-20 07:29:57,031 INFO storage.BlockManager: Removing RDD 1395
2019-05-20 07:29:57,031 INFO dstream.FileInputDStream: Cleared 1 old files that were older than 1558337337000 ms: 1558337336000 ms
2019-05-20 07:29:57,031 INFO scheduler.ReceivedBlockTracker: Deleting batches:
2019-05-20 07:29:57,031 INFO scheduler.InputInfoTracker: remove old batch metadata: 1558337336000 ms
2019-05-20 07:29:58,021 INFO dstream.FileInputDStream: Finding new files took 20 ms 
2019-05-20 07:29:58,021 INFO dstream.FileInputDStream: New files at time 1558337398000 ms:

2019-05-20 07:29:58,029 INFO scheduler.JobScheduler: Added jobs for time 1558337398000 ms 
2019-05-20 07:29:58,029 INFO scheduler.JobScheduler: Starting job streaming job 1558337398000 ms.0 from job set of time 1558337398000 ms
-------------------------------------------
Time: 2019-05-20 07:29:58
-------------------------------------------

2019-05-20 07:29:58,030 INFO scheduler.JobScheduler: Finished job streaming job 1558337398000 ms.0 from job set of time 1558337398000 ms
2019-05-20 07:29:58,030 INFO python.PythonRDD: Removing RDD 1585 from persistence list
2019-05-20 07:29:58,030 INFO scheduler.JobScheduler: Total delay: 0.030 s for time 1558337398000 ms (execution: 0.001 s)
2019-05-20 07:29:58,030 INFO rdd.MapPartitionsRDD: Removing RDD 1584 from persistence list
2019-05-20 07:29:58,030 INFO storage.BlockManager: Removing RDD 1584
2019-05-20 07:29:58,030 INFO storage.BlockManager: Removing RDD 1585
2019-05-20 07:29:58,030 INFO rdd.UnionRDD: Removing RDD 1398 from persistence list
2019-05-20 07:29:58,030 INFO storage.BlockManager: Removing RDD 1398
2019-05-20 07:29:58,030 INFO dstream.FileInputDStream: Cleared 1 old files that were older than 1558337338000 ms: 1558337337000 ms
2019-05-20 07:29:58,030 INFO scheduler.ReceivedBlockTracker: Deleting batches:
2019-05-20 07:29:58,030 INFO scheduler.InputInfoTracker: remove old batch metadata: 1558337337000 ms
2019-05-20 07:29:59,018 INFO dstream.FileInputDStream: Finding new files took 18 ms 
2019-05-20 07:29:59,018 INFO dstream.FileInputDStream: New files at time 1558337399000 ms:

2019-05-20 07:29:59,023 INFO scheduler.JobScheduler: Added jobs for time 1558337399000 ms 
2019-05-20 07:29:59,024 INFO scheduler.JobScheduler: Starting job streaming job 1558337399000 ms.0 from job set of time 1558337399000 ms
-------------------------------------------
Time: 2019-05-20 07:29:59
-------------------------------------------

File

In [14]: forex_quote_log.write\
    ...:        .partitionBy('date')\
    ...:               .format('delta')\
    ...:                      .save('file:///data/lake/polygon/forex_quote_log')

In [15]: spark.sql(
    ...: '''
    ...: CREATE TABLE forex_quote_log
    ...: USING DELTA
    ...: LOCATION "file:///data/lake/polygon/forex_quote_log/"
    ...: '''
    ...: )
2019-05-20 07:21:13,193 WARN command.CreateDataSourceTableCommand: It is not recommended to create a table with overlapped data and partition columns, as Spark cannot store a valid table schema and has to infer it at runtime, which hurts performance. Please check your data files and remove the partition columns in it.
2019-05-20 07:21:13,209 WARN hive.HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider DELTA. Persisting data source table `default`.`forex_quote_log` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
Out[15]: DataFrame[] 

In [16]: df = spark.read.format('delta').load('file:///data/lake/polygon/forex_quote_log')

In [17]: df.createOrReplaceTempView('forex_quote_log')

In [18]: result = spark.sql('select max(cast(from_unixtime(epoch / 1000.0) as date)) as date from forex_quote_log')

In [19]: result.show()
+----------+
|      date|
+----------+
|2019-05-10|
+----------+

In [20]: 

HDFS

In [3]: df = spark.read.format('delta').load('/data/lake/polygon/forex_quote_log')

In [4]: df
Out[4]: DataFrame[pair: string, exchange_id: bigint, ask_price: double, bid_price: double, epoch: bigint, date: date, source_file_path: string] 

In [5]: dir(df)

In [6]: df.createOrReplaceTempView('forex_quote_log')

In [7]: df
Out[7]: DataFrame[pair: string, exchange_id: bigint, ask_price: double, bid_price: double, epoch: bigint, date: date, source_file_path: string] 

In [8]: spark.sql('select max(cast(from_unixtime(epoch / 1000.0) as date)) as date from forex_quote_log')
2019-05-20 07:07:54,723 WARN metastore.ObjectStore: Failed to get database global_temp, returning NoSuchObjectException 
Out[8]: DataFrame[date: date] 

In [9]: result = spark.sql('select max(cast(from_unixtime(epoch / 1000.0) as date)) as date from forex_quote_log')

In [10]: result
Out[10]: DataFrame[date: date] 

In [11]: result.show()
+----------+ 
|      date|
+----------+
|2019-05-13|
+----------+

Thanks so much for your patience, let me know if any additional details would be helpful and whether or not you guys have any ideas in regards to the netty issue

zsxwing commented 5 years ago

i just installed hadoop 3.03 and spark 2.4.3 and was trying to run this on yarn but ran into an error which seems to be centered around netty; the error is below

Spark 2.4 doesn't support Hadoop 3 yet. https://issues.apache.org/jira/browse/SPARK-23534 is still in progress.

when i checked the data it wasn't there

Could you clarify this? I saw different answers when using file and HDFS. Which one is the expected answer? And it would be great if you can also provide your streaming query.

Caused by: java.lang.AssertionError: File (file:/data/stream/polygon/C/C_20190508044339_155729061998421.json) doesn't belong in the transaction log at hdfs://localhost:9000/data/lake/polygon/forex_quote_log/_delta_log. Please contact Databricks Support.

Looks like you didn't hit this issue in this test. Right?

colinglaes commented 5 years ago

🤦🏻‍♂️, ok i’ll reinstall hadoop 2.7 and spark 2.4.2 so that i can see if i can reproduce the issue and let you guys know what happens. I ran as a file since that’s what i believe @tdas was recommending

colinglaes commented 5 years ago

I reinstalled w/ hadoop 2.7.7 & spark 2.4.3, was able to reproduce the same error writing to delta with both the file & hdfs format. Let me know if you want any more details, my apologies for the delay.

HDFS

-------------------------------------------
Time: 2019-05-23 07:48:59
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:49:00
-------------------------------------------
{'pair': 'EUR/JPY', 'exchange_id': 60, 'ask_price': 122.874, 'bid_price': 122.836, 'epoch': 1558597719764}
{'pair': 'USD/CNH', 'exchange_id': 60, 'ask_price': 6.93728, 'bid_price': 6.93619, 'epoch': 1558597719835}
{'pair': 'EUR/SEK', 'exchange_id': 60, 'ask_price': 10.73765, 'bid_price': 10.73474, 'epoch': 1558597719833}
{'pair': 'USD/JPY', 'exchange_id': 60, 'ask_price': 110.274, 'bid_price': 110.231, 'epoch': 1558597719723}
{'pair': 'TRY/JPY', 'exchange_id': 60, 'ask_price': 17.918, 'bid_price': 17.902, 'epoch': 1558597719764}
{'pair': 'GBP/JPY', 'exchange_id': 60, 'ask_price': 139.101, 'bid_price': 139.034, 'epoch': 1558597719751}
{'pair': 'SGD/JPY', 'exchange_id': 60, 'ask_price': 79.809, 'bid_price': 79.789, 'epoch': 1558597719723}
{'pair': 'GBP/CAD', 'exchange_id': 48, 'ask_price': 1.69711, 'bid_price': 1.69693, 'epoch': 1558597719000}
{'pair': 'ZAR/SGD', 'exchange_id': 48, 'ask_price': 0.095765, 'bid_price': 0.09575, 'epoch': 1558597719000}
{'pair': 'USD/RUB', 'exchange_id': 48, 'ask_price': 64.4953, 'bid_price': 64.4253, 'epoch': 1558597719000}
...

INFO:log:transforming C json rdd to DataFrame
INFO:log:writing below streaming forex_quote_log data to delta
+-------+-----------+---------+---------+-------------+----------+--------------------+
|   pair|exchange_id|ask_price|bid_price|        epoch|      date|    source_file_path|
+-------+-----------+---------+---------+-------------+----------+--------------------+
|EUR/JPY|         60|  122.874|  122.836|1558597719764|2019-05-23|file:/data/stream...|
|USD/CNH|         60|  6.93728|  6.93619|1558597719835|2019-05-23|file:/data/stream...|
|EUR/SEK|         60| 10.73765| 10.73474|1558597719833|2019-05-23|file:/data/stream...|
|USD/JPY|         60|  110.274|  110.231|1558597719723|2019-05-23|file:/data/stream...|
|TRY/JPY|         60|   17.918|   17.902|1558597719764|2019-05-23|file:/data/stream...|
|GBP/JPY|         60|  139.101|  139.034|1558597719751|2019-05-23|file:/data/stream...|
|SGD/JPY|         60|   79.809|   79.789|1558597719723|2019-05-23|file:/data/stream...|
|GBP/CAD|         48|  1.69711|  1.69693|1558597719000|2019-05-23|file:/data/stream...|
|ZAR/SGD|         48| 0.095765|  0.09575|1558597719000|2019-05-23|file:/data/stream...|
|USD/RUB|         48|  64.4953|  64.4253|1558597719000|2019-05-23|file:/data/stream...|
|NZD/CAD|         60|  0.87333|  0.87316|1558597719671|2019-05-23|file:/data/stream...|
|ZAR/USD|         48| 0.069317| 0.069302|1558597719000|2019-05-23|file:/data/stream...|
|CZK/CAD|         48|  0.05806| 0.058057|1558597719000|2019-05-23|file:/data/stream...|
|NZD/USD|         48|   0.6491|  0.64903|1558597719000|2019-05-23|file:/data/stream...|
|USD/CAD|         48|  1.34554|  1.34546|1558597719000|2019-05-23|file:/data/stream...|
|HKD/JPY|         60| 14.04873| 14.04642|1558597719723|2019-05-23|file:/data/stream...|
|CHF/SGD|         48|   1.3694|   1.3692|1558597719000|2019-05-23|file:/data/stream...|
|AUD/CNH|         48|  4.76826|  4.76709|1558597719000|2019-05-23|file:/data/stream...|
|NZD/JPY|         48|   71.566|   71.555|1558597719000|2019-05-23|file:/data/stream...|
|CAD/JPY|         48|   81.956|   81.941|1558597719000|2019-05-23|file:/data/stream...|
+-------+-----------+---------+---------+-------------+----------+--------------------+
only showing top 20 rows

ERROR:log:An error occurred while calling o222.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 14.0 failed 4 times, most recent failure: Lost task 0.3 in stage 14.0 (TID 164, blackened, executor 1): java.lang.AssertionError: File (file:/data/stream/polygon/C/C_20190523074900_155859774000950.json) doesn't belong in the transaction log at hdfs://localhost:9000/data/lake/polygon/forex_quote_log/_delta_log. Please contact Databricks Support.
    at org.apache.spark.sql.delta.Snapshot$$anonfun$org$apache$spark$sql$delta$Snapshot$$assertLogBelongsToTable$1.apply(Snapshot.scala:215)
    at org.apache.spark.sql.delta.Snapshot$$anonfun$org$apache$spark$sql$delta$Snapshot$$assertLogBelongsToTable$1.apply(Snapshot.scala:210)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.serializefromobject_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
    at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2551)
    at org.apache.spark.sql.Dataset.first(Dataset.scala:2558)
    at org.apache.spark.sql.delta.Snapshot.<init>(Snapshot.scala:142)
    at org.apache.spark.sql.delta.DeltaLog$$anonfun$org$apache$spark$sql$delta$DeltaLog$$updateInternal$1$$anonfun$apply$4.apply(DeltaLog.scala:310)
    at org.apache.spark.sql.delta.DeltaLog$$anonfun$org$apache$spark$sql$delta$DeltaLog$$updateInternal$1$$anonfun$apply$4.apply(DeltaLog.scala:251)
    at org.apache.spark.sql.delta.util.DeltaProgressReporter$class.withStatusCode(DeltaProgressReporter.scala:30)
    at org.apache.spark.sql.delta.DeltaLog.withStatusCode(DeltaLog.scala:59)
    at org.apache.spark.sql.delta.DeltaLog$$anonfun$org$apache$spark$sql$delta$DeltaLog$$updateInternal$1.apply(DeltaLog.scala:251)
    at org.apache.spark.sql.delta.DeltaLog$$anonfun$org$apache$spark$sql$delta$DeltaLog$$updateInternal$1.apply(DeltaLog.scala:251)
    at com.databricks.spark.util.DatabricksLogging$class.recordOperation(DatabricksLogging.scala:75)
    at org.apache.spark.sql.delta.DeltaLog.recordOperation(DeltaLog.scala:59)
    at org.apache.spark.sql.delta.metering.DeltaLogging$class.recordDeltaOperation(DeltaLogging.scala:105)
    at org.apache.spark.sql.delta.DeltaLog.recordDeltaOperation(DeltaLog.scala:59)
    at org.apache.spark.sql.delta.DeltaLog.org$apache$spark$sql$delta$DeltaLog$$updateInternal(DeltaLog.scala:250)
    at org.apache.spark.sql.delta.DeltaLog$$anonfun$update$2.apply(DeltaLog.scala:211)
    at org.apache.spark.sql.delta.DeltaLog$$anonfun$update$2.apply(DeltaLog.scala:211)
    at org.apache.spark.sql.delta.DeltaLog.lockInterruptibly(DeltaLog.scala:181)
    at org.apache.spark.sql.delta.DeltaLog.update(DeltaLog.scala:210)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$org$apache$spark$sql$delta$OptimisticTransactionImpl$$doCommit$1.apply$mcJ$sp(OptimisticTransaction.scala:323)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$org$apache$spark$sql$delta$OptimisticTransactionImpl$$doCommit$1.apply(OptimisticTransaction.scala:316)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$org$apache$spark$sql$delta$OptimisticTransactionImpl$$doCommit$1.apply(OptimisticTransaction.scala:316)
    at org.apache.spark.sql.delta.DeltaLog.lockInterruptibly(DeltaLog.scala:181)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$class.org$apache$spark$sql$delta$OptimisticTransactionImpl$$doCommit(OptimisticTransaction.scala:315)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$commit$1.apply$mcJ$sp(OptimisticTransaction.scala:232)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$commit$1.apply(OptimisticTransaction.scala:218)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$commit$1.apply(OptimisticTransaction.scala:218)
    at com.databricks.spark.util.DatabricksLogging$class.recordOperation(DatabricksLogging.scala:75)
    at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:66)
    at org.apache.spark.sql.delta.metering.DeltaLogging$class.recordDeltaOperation(DeltaLogging.scala:105)
    at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:66)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$class.commit(OptimisticTransaction.scala:218)
    at org.apache.spark.sql.delta.OptimisticTransaction.commit(OptimisticTransaction.scala:66)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:72)
    at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:146)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.AssertionError: File (file:/data/stream/polygon/C/C_20190523074900_155859774000950.json) doesn't belong in the transaction log at hdfs://localhost:9000/data/lake/polygon/forex_quote_log/_delta_log. Please contact Databricks Support.
    at org.apache.spark.sql.delta.Snapshot$$anonfun$org$apache$spark$sql$delta$Snapshot$$assertLogBelongsToTable$1.apply(Snapshot.scala:215)
    at org.apache.spark.sql.delta.Snapshot$$anonfun$org$apache$spark$sql$delta$Snapshot$$assertLogBelongsToTable$1.apply(Snapshot.scala:210)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.serializefromobject_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

-------------------------------------------
Time: 2019-05-23 07:49:01
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:49:02
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:49:03
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:49:04
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:49:05
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:49:06
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:49:07
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:49:08
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:49:09
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:49:10
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:49:11
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:49:12
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:49:13
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:49:14
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:49:15
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:49:16
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:49:17
-------------------------------------------
{'pair': 'PLN/CHF', 'exchange_id': 48, 'ask_price': 0.26113, 'bid_price': 0.26096, 'epoch': 1558597739000}
{'pair': 'CHF/JPY', 'exchange_id': 48, 'ask_price': 109.27, 'bid_price': 109.26, 'epoch': 1558597739000}
{'pair': 'AUD/LTL', 'exchange_id': 48, 'ask_price': 2.0165, 'bid_price': 2.0153, 'epoch': 1558597739000}
{'pair': 'KRW/SEK', 'exchange_id': 48, 'ask_price': 0.008101, 'bid_price': 0.008098, 'epoch': 1558597738000}
{'pair': 'ZAR/NOK', 'exchange_id': 48, 'ask_price': 0.6072, 'bid_price': 0.60706, 'epoch': 1558597739000}
{'pair': 'TRY/DKK', 'exchange_id': 48, 'ask_price': 1.0915, 'bid_price': 1.0909, 'epoch': 1558597739000}
{'pair': 'EUR/DKK', 'exchange_id': 48, 'ask_price': 7.4682, 'bid_price': 7.46695, 'epoch': 1558597739000}
{'pair': 'HKD/ZAR', 'exchange_id': 48, 'ask_price': 1.8389, 'bid_price': 1.8377, 'epoch': 1558597739000}
{'pair': 'CAD/PLN', 'exchange_id': 48, 'ask_price': 2.87381, 'bid_price': 2.87196, 'epoch': 1558597738000}
{'pair': 'CHF/PKR', 'exchange_id': 48, 'ask_price': 151.15, 'bid_price': 151.14, 'epoch': 1558597739000}
...

INFO:log:transforming C json rdd to DataFrame
INFO:log:writing below streaming forex_quote_log data to delta
+-------+-----------+---------+---------+-------------+----------+--------------------+
|   pair|exchange_id|ask_price|bid_price|        epoch|      date|    source_file_path|
+-------+-----------+---------+---------+-------------+----------+--------------------+
|PLN/CHF|         48|  0.26113|  0.26096|1558597739000|2019-05-23|file:/data/stream...|
|CHF/JPY|         48|   109.27|   109.26|1558597739000|2019-05-23|file:/data/stream...|
|AUD/LTL|         48|   2.0165|   2.0153|1558597739000|2019-05-23|file:/data/stream...|
|KRW/SEK|         48| 0.008101| 0.008098|1558597738000|2019-05-23|file:/data/stream...|
|ZAR/NOK|         48|   0.6072|  0.60706|1558597739000|2019-05-23|file:/data/stream...|
|TRY/DKK|         48|   1.0915|   1.0909|1558597739000|2019-05-23|file:/data/stream...|
|EUR/DKK|         48|   7.4682|  7.46695|1558597739000|2019-05-23|file:/data/stream...|
|HKD/ZAR|         48|   1.8389|   1.8377|1558597739000|2019-05-23|file:/data/stream...|
|CAD/PLN|         48|  2.87381|  2.87196|1558597738000|2019-05-23|file:/data/stream...|
|CHF/PKR|         48|   151.15|   151.14|1558597739000|2019-05-23|file:/data/stream...|
|EUR/PLN|         48|  4.30854|  4.30604|1558597738000|2019-05-23|file:/data/stream...|
|USD/SGD|         48|    1.382|   1.3814|1558597739000|2019-05-23|file:/data/stream...|
|TRY/CHF|         48| 0.164381| 0.164262|1558597739000|2019-05-23|file:/data/stream...|
|DKK/EUR|         48|  0.13392|   0.1339|1558597739000|2019-05-23|file:/data/stream...|
|CHF/CLP|         48|   692.38|    692.1|1558597739000|2019-05-23|file:/data/stream...|
|NZD/DKK|         48|   4.3509|   4.3492|1558597739000|2019-05-23|file:/data/stream...|
|JPY/GBP|         48| 0.007211| 0.007171|1558597739000|2019-05-23|file:/data/stream...|
|SEK/INR|         48|   7.2291|    7.229|1558597739000|2019-05-23|file:/data/stream...|
|NGN/ZAR|         48| 0.039971| 0.039962|1558597739000|2019-05-23|file:/data/stream...|
|RUB/JPY|         48|   1.7112|   1.7094|1558597739000|2019-05-23|file:/data/stream...|
+-------+-----------+---------+---------+-------------+----------+--------------------+
only showing top 20 rows

ERROR:log:An error occurred while calling o1418.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 23.0 failed 4 times, most recent failure: Lost task 0.3 in stage 23.0 (TID 176, blackened, executor 1): java.lang.AssertionError: File (file:/data/stream/polygon/C/C_20190523074916_155859775613011.json) doesn't belong in the transaction log at hdfs://localhost:9000/data/lake/polygon/forex_quote_log/_delta_log. Please contact Databricks Support.
    at org.apache.spark.sql.delta.Snapshot$$anonfun$org$apache$spark$sql$delta$Snapshot$$assertLogBelongsToTable$1.apply(Snapshot.scala:215)
    at org.apache.spark.sql.delta.Snapshot$$anonfun$org$apache$spark$sql$delta$Snapshot$$assertLogBelongsToTable$1.apply(Snapshot.scala:210)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.serializefromobject_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
    at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2551)
    at org.apache.spark.sql.Dataset.first(Dataset.scala:2558)
    at org.apache.spark.sql.delta.Snapshot.<init>(Snapshot.scala:142)
    at org.apache.spark.sql.delta.DeltaLog$$anonfun$org$apache$spark$sql$delta$DeltaLog$$updateInternal$1$$anonfun$apply$4.apply(DeltaLog.scala:310)
    at org.apache.spark.sql.delta.DeltaLog$$anonfun$org$apache$spark$sql$delta$DeltaLog$$updateInternal$1$$anonfun$apply$4.apply(DeltaLog.scala:251)
    at org.apache.spark.sql.delta.util.DeltaProgressReporter$class.withStatusCode(DeltaProgressReporter.scala:30)
    at org.apache.spark.sql.delta.DeltaLog.withStatusCode(DeltaLog.scala:59)
    at org.apache.spark.sql.delta.DeltaLog$$anonfun$org$apache$spark$sql$delta$DeltaLog$$updateInternal$1.apply(DeltaLog.scala:251)
    at org.apache.spark.sql.delta.DeltaLog$$anonfun$org$apache$spark$sql$delta$DeltaLog$$updateInternal$1.apply(DeltaLog.scala:251)
    at com.databricks.spark.util.DatabricksLogging$class.recordOperation(DatabricksLogging.scala:75)
    at org.apache.spark.sql.delta.DeltaLog.recordOperation(DeltaLog.scala:59)
    at org.apache.spark.sql.delta.metering.DeltaLogging$class.recordDeltaOperation(DeltaLogging.scala:105)
    at org.apache.spark.sql.delta.DeltaLog.recordDeltaOperation(DeltaLog.scala:59)
    at org.apache.spark.sql.delta.DeltaLog.org$apache$spark$sql$delta$DeltaLog$$updateInternal(DeltaLog.scala:250)
    at org.apache.spark.sql.delta.DeltaLog$$anonfun$update$2.apply(DeltaLog.scala:211)
    at org.apache.spark.sql.delta.DeltaLog$$anonfun$update$2.apply(DeltaLog.scala:211)
    at org.apache.spark.sql.delta.DeltaLog.lockInterruptibly(DeltaLog.scala:181)
    at org.apache.spark.sql.delta.DeltaLog.update(DeltaLog.scala:210)
    at org.apache.spark.sql.delta.DeltaLog.startTransaction(DeltaLog.scala:356)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:67)
    at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:146)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.AssertionError: File (file:/data/stream/polygon/C/C_20190523074916_155859775613011.json) doesn't belong in the transaction log at hdfs://localhost:9000/data/lake/polygon/forex_quote_log/_delta_log. Please contact Databricks Support.
    at org.apache.spark.sql.delta.Snapshot$$anonfun$org$apache$spark$sql$delta$Snapshot$$assertLogBelongsToTable$1.apply(Snapshot.scala:215)
    at org.apache.spark.sql.delta.Snapshot$$anonfun$org$apache$spark$sql$delta$Snapshot$$assertLogBelongsToTable$1.apply(Snapshot.scala:210)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.serializefromobject_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

-------------------------------------------
Time: 2019-05-23 07:49:18
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:49:19
-------------------------------------------

File

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:58:30
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:58:31
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:58:32
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:58:33
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:58:34
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:58:35
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:58:36
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:58:37
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:58:38
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:58:39
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:58:40
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:58:41
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:58:42
-------------------------------------------
{'pair': 'USD/RUB', 'exchange_id': 48, 'ask_price': 64.475, 'bid_price': 64.436, 'epoch': 1558598300000}
{'pair': 'NZD/ZAR', 'exchange_id': 48, 'ask_price': 9.3665, 'bid_price': 9.3603, 'epoch': 1558598300000}
{'pair': 'USD/GBP', 'exchange_id': 48, 'ask_price': 0.79271, 'bid_price': 0.79264, 'epoch': 1558598300000}
{'pair': 'AED/JPY', 'exchange_id': 48, 'ask_price': 30.034, 'bid_price': 30.022, 'epoch': 1558598300000}
{'pair': 'JPY/RUB', 'exchange_id': 48, 'ask_price': 0.5846, 'bid_price': 0.58424, 'epoch': 1558598300000}
{'pair': 'SAR/KWD', 'exchange_id': 48, 'ask_price': 0.081225, 'bid_price': 0.081114, 'epoch': 1558598300000}
{'pair': 'JPY/ZAR', 'exchange_id': 48, 'ask_price': 0.13083, 'bid_price': 0.13074, 'epoch': 1558598300000}
{'pair': 'USD/EUR', 'exchange_id': 48, 'ask_price': 0.89735, 'bid_price': 0.89729, 'epoch': 1558598300000}
{'pair': 'ZAR/MXN', 'exchange_id': 48, 'ask_price': 1.3198, 'bid_price': 1.3188, 'epoch': 1558598300000}
{'pair': 'NZD/CAD', 'exchange_id': 48, 'ask_price': 0.87359, 'bid_price': 0.87323, 'epoch': 1558598300000}
...

INFO:log:transforming C json rdd to DataFrame
INFO:log:writing below streaming forex_quote_log data to delta
+-------+-----------+---------+---------+-------------+----------+--------------------+
|   pair|exchange_id|ask_price|bid_price|        epoch|      date|    source_file_path|
+-------+-----------+---------+---------+-------------+----------+--------------------+
|USD/RUB|         48|   64.475|   64.436|1558598300000|2019-05-23|file:/data/stream...|
|NZD/ZAR|         48|   9.3665|   9.3603|1558598300000|2019-05-23|file:/data/stream...|
|USD/GBP|         48|  0.79271|  0.79264|1558598300000|2019-05-23|file:/data/stream...|
|AED/JPY|         48|   30.034|   30.022|1558598300000|2019-05-23|file:/data/stream...|
|JPY/RUB|         48|   0.5846|  0.58424|1558598300000|2019-05-23|file:/data/stream...|
|SAR/KWD|         48| 0.081225| 0.081114|1558598300000|2019-05-23|file:/data/stream...|
|JPY/ZAR|         48|  0.13083|  0.13074|1558598300000|2019-05-23|file:/data/stream...|
|USD/EUR|         48|  0.89735|  0.89729|1558598300000|2019-05-23|file:/data/stream...|
|ZAR/MXN|         48|   1.3198|   1.3188|1558598300000|2019-05-23|file:/data/stream...|
|NZD/CAD|         48|  0.87359|  0.87323|1558598300000|2019-05-23|file:/data/stream...|
|DKK/THB|         48|   4.7713|   4.7704|1558598300000|2019-05-23|file:/data/stream...|
|HKD/GBP|         48|    0.101|  0.10099|1558598300000|2019-05-23|file:/data/stream...|
|ZAR/BWP|         48|  0.74742|  0.74692|1558598300000|2019-05-23|file:/data/stream...|
|CAD/BMD|         48|  0.74515|  0.74511|1558598300000|2019-05-23|file:/data/stream...|
|THB/SGD|         48| 0.043224| 0.043217|1558598300000|2019-05-23|file:/data/stream...|
|DKK/ZAR|         48|  2.15325|  2.15176|1558598300000|2019-05-23|file:/data/stream...|
|MYR/CHF|         48|  0.24042|  0.24037|1558598300000|2019-05-23|file:/data/stream...|
|EUR/GBP|         48|  0.88341|  0.88332|1558598300000|2019-05-23|file:/data/stream...|
|HKD/SEK|         48|  1.22746|    1.227|1558598300000|2019-05-23|file:/data/stream...|
|USD/AED|         48|   3.6737|   3.6722|1558598300000|2019-05-23|file:/data/stream...|
+-------+-----------+---------+---------+-------------+----------+--------------------+
only showing top 20 rows

ERROR:log:An error occurred while calling o573.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 14.0 failed 4 times, most recent failure: Lost task 0.3 in stage 14.0 (TID 164, blackened, executor 1): java.lang.AssertionError: File (file:/data/stream/polygon/C/C_20190523075841_155859832175135.json) doesn't belong in the transaction log at file:/data/lake/polygon/forex_quote_log/_delta_log. Please contact Databricks Support.
    at org.apache.spark.sql.delta.Snapshot$$anonfun$org$apache$spark$sql$delta$Snapshot$$assertLogBelongsToTable$1.apply(Snapshot.scala:215)
    at org.apache.spark.sql.delta.Snapshot$$anonfun$org$apache$spark$sql$delta$Snapshot$$assertLogBelongsToTable$1.apply(Snapshot.scala:210)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.serializefromobject_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
    at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2551)
    at org.apache.spark.sql.Dataset.first(Dataset.scala:2558)
    at org.apache.spark.sql.delta.Snapshot.<init>(Snapshot.scala:142)
    at org.apache.spark.sql.delta.DeltaLog$$anonfun$org$apache$spark$sql$delta$DeltaLog$$updateInternal$1$$anonfun$apply$4.apply(DeltaLog.scala:310)
    at org.apache.spark.sql.delta.DeltaLog$$anonfun$org$apache$spark$sql$delta$DeltaLog$$updateInternal$1$$anonfun$apply$4.apply(DeltaLog.scala:251)
    at org.apache.spark.sql.delta.util.DeltaProgressReporter$class.withStatusCode(DeltaProgressReporter.scala:30)
    at org.apache.spark.sql.delta.DeltaLog.withStatusCode(DeltaLog.scala:59)
    at org.apache.spark.sql.delta.DeltaLog$$anonfun$org$apache$spark$sql$delta$DeltaLog$$updateInternal$1.apply(DeltaLog.scala:251)
    at org.apache.spark.sql.delta.DeltaLog$$anonfun$org$apache$spark$sql$delta$DeltaLog$$updateInternal$1.apply(DeltaLog.scala:251)
    at com.databricks.spark.util.DatabricksLogging$class.recordOperation(DatabricksLogging.scala:75)
    at org.apache.spark.sql.delta.DeltaLog.recordOperation(DeltaLog.scala:59)
    at org.apache.spark.sql.delta.metering.DeltaLogging$class.recordDeltaOperation(DeltaLogging.scala:105)
    at org.apache.spark.sql.delta.DeltaLog.recordDeltaOperation(DeltaLog.scala:59)
    at org.apache.spark.sql.delta.DeltaLog.org$apache$spark$sql$delta$DeltaLog$$updateInternal(DeltaLog.scala:250)
    at org.apache.spark.sql.delta.DeltaLog$$anonfun$update$2.apply(DeltaLog.scala:211)
    at org.apache.spark.sql.delta.DeltaLog$$anonfun$update$2.apply(DeltaLog.scala:211)
    at org.apache.spark.sql.delta.DeltaLog.lockInterruptibly(DeltaLog.scala:181)
    at org.apache.spark.sql.delta.DeltaLog.update(DeltaLog.scala:210)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$org$apache$spark$sql$delta$OptimisticTransactionImpl$$doCommit$1.apply$mcJ$sp(OptimisticTransaction.scala:323)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$org$apache$spark$sql$delta$OptimisticTransactionImpl$$doCommit$1.apply(OptimisticTransaction.scala:316)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$org$apache$spark$sql$delta$OptimisticTransactionImpl$$doCommit$1.apply(OptimisticTransaction.scala:316)
    at org.apache.spark.sql.delta.DeltaLog.lockInterruptibly(DeltaLog.scala:181)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$class.org$apache$spark$sql$delta$OptimisticTransactionImpl$$doCommit(OptimisticTransaction.scala:315)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$commit$1.apply$mcJ$sp(OptimisticTransaction.scala:232)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$commit$1.apply(OptimisticTransaction.scala:218)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$commit$1.apply(OptimisticTransaction.scala:218)
    at com.databricks.spark.util.DatabricksLogging$class.recordOperation(DatabricksLogging.scala:75)
    at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:66)
    at org.apache.spark.sql.delta.metering.DeltaLogging$class.recordDeltaOperation(DeltaLogging.scala:105)
    at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:66)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$class.commit(OptimisticTransaction.scala:218)
    at org.apache.spark.sql.delta.OptimisticTransaction.commit(OptimisticTransaction.scala:66)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:72)
    at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:146)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.AssertionError: File (file:/data/stream/polygon/C/C_20190523075841_155859832175135.json) doesn't belong in the transaction log at file:/data/lake/polygon/forex_quote_log/_delta_log. Please contact Databricks Support.
    at org.apache.spark.sql.delta.Snapshot$$anonfun$org$apache$spark$sql$delta$Snapshot$$assertLogBelongsToTable$1.apply(Snapshot.scala:215)
    at org.apache.spark.sql.delta.Snapshot$$anonfun$org$apache$spark$sql$delta$Snapshot$$assertLogBelongsToTable$1.apply(Snapshot.scala:210)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.serializefromobject_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

-------------------------------------------
Time: 2019-05-23 07:58:43
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:58:44
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:58:45
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:58:46
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:58:47
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:58:48
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:58:49
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:58:50
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:58:51
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:58:52
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:58:53
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:58:54
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:58:55
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:58:56
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:58:57
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:58:58
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:58:59
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:59:00
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:59:01
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:59:02
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:59:03
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:59:04
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:59:05
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:59:06
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:59:07
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:59:08
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:59:09
-------------------------------------------
{'pair': 'NZD/CAD', 'exchange_id': 48, 'ask_price': 0.87356, 'bid_price': 0.87321, 'epoch': 1558598321000}
{'pair': 'USD/EUR', 'exchange_id': 48, 'ask_price': 0.89746, 'bid_price': 0.8974, 'epoch': 1558598321000}
{'pair': 'SEK/EUR', 'exchange_id': 48, 'ask_price': 0.09318, 'bid_price': 0.09315, 'epoch': 1558598321000}
{'pair': 'NZD/GBP', 'exchange_id': 48, 'ask_price': 0.5149, 'bid_price': 0.5145, 'epoch': 1558598321000}
{'pair': 'CAD/SEK', 'exchange_id': 48, 'ask_price': 7.1613, 'bid_price': 7.1582, 'epoch': 1558598321000}
{'pair': 'CAD/NZD', 'exchange_id': 48, 'ask_price': 1.1452, 'bid_price': 1.14474, 'epoch': 1558598321000}
{'pair': 'HKD/SEK', 'exchange_id': 48, 'ask_price': 1.2275, 'bid_price': 1.2271, 'epoch': 1558598321000}
{'pair': 'SEK/ZAR', 'exchange_id': 48, 'ask_price': 1.4978, 'bid_price': 1.4972, 'epoch': 1558598321000}
{'pair': 'BRL/MXN', 'exchange_id': 48, 'ask_price': 4.7126, 'bid_price': 4.7086, 'epoch': 1558598321000}
{'pair': 'CAD/CHF', 'exchange_id': 48, 'ask_price': 0.75014, 'bid_price': 0.74993, 'epoch': 1558598321000}
...

INFO:log:transforming C json rdd to DataFrame
INFO:log:writing below streaming forex_quote_log data to delta
+-------+-----------+---------+---------+-------------+----------+--------------------+
|   pair|exchange_id|ask_price|bid_price|        epoch|      date|    source_file_path|
+-------+-----------+---------+---------+-------------+----------+--------------------+
|NZD/CAD|         48|  0.87356|  0.87321|1558598321000|2019-05-23|file:/data/stream...|
|USD/EUR|         48|  0.89746|   0.8974|1558598321000|2019-05-23|file:/data/stream...|
|SEK/EUR|         48|  0.09318|  0.09315|1558598321000|2019-05-23|file:/data/stream...|
|NZD/GBP|         48|   0.5149|   0.5145|1558598321000|2019-05-23|file:/data/stream...|
|CAD/SEK|         48|   7.1613|   7.1582|1558598321000|2019-05-23|file:/data/stream...|
|CAD/NZD|         48|   1.1452|  1.14474|1558598321000|2019-05-23|file:/data/stream...|
|HKD/SEK|         48|   1.2275|   1.2271|1558598321000|2019-05-23|file:/data/stream...|
|SEK/ZAR|         48|   1.4978|   1.4972|1558598321000|2019-05-23|file:/data/stream...|
|BRL/MXN|         48|   4.7126|   4.7086|1558598321000|2019-05-23|file:/data/stream...|
|CAD/CHF|         48|  0.75014|  0.74993|1558598321000|2019-05-23|file:/data/stream...|
|TWD/SEK|         48|  0.30559|  0.30547|1558598321000|2019-05-23|file:/data/stream...|
|CAD/USD|         48|   0.7433|  0.74321|1558598321000|2019-05-23|file:/data/stream...|
|CHF/EUR|         48|  0.88942|  0.88922|1558598321000|2019-05-23|file:/data/stream...|
|CZK/SEK|         48|  0.41575|   0.4156|1558598321000|2019-05-23|file:/data/stream...|
|TWD/EUR|         48| 0.028489| 0.028464|1558598321000|2019-05-23|file:/data/stream...|
|EUR/SEK|         48|  10.7359| 10.73188|1558598321000|2019-05-23|file:/data/stream...|
|SGD/CHF|         48|  0.73043|  0.73027|1558598321000|2019-05-23|file:/data/stream...|
|ILS/SEK|         48|   2.6671|   2.6661|1558598321000|2019-05-23|file:/data/stream...|
|NZD/CHF|         48|  0.65521|  0.65493|1558598321000|2019-05-23|file:/data/stream...|
|EUR/CNH|         48|  7.73142|  7.72982|1558598321000|2019-05-23|file:/data/stream...|
+-------+-----------+---------+---------+-------------+----------+--------------------+
only showing top 20 rows

ERROR:log:An error occurred while calling o2412.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 23.0 failed 4 times, most recent failure: Lost task 0.3 in stage 23.0 (TID 176, blackened, executor 1): java.lang.AssertionError: File (file:/data/stream/polygon/C/C_20190523075908_155859834895516.json) doesn't belong in the transaction log at file:/data/lake/polygon/forex_quote_log/_delta_log. Please contact Databricks Support.
    at org.apache.spark.sql.delta.Snapshot$$anonfun$org$apache$spark$sql$delta$Snapshot$$assertLogBelongsToTable$1.apply(Snapshot.scala:215)
    at org.apache.spark.sql.delta.Snapshot$$anonfun$org$apache$spark$sql$delta$Snapshot$$assertLogBelongsToTable$1.apply(Snapshot.scala:210)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.serializefromobject_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
    at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2551)
    at org.apache.spark.sql.Dataset.first(Dataset.scala:2558)
    at org.apache.spark.sql.delta.Snapshot.<init>(Snapshot.scala:142)
    at org.apache.spark.sql.delta.DeltaLog$$anonfun$org$apache$spark$sql$delta$DeltaLog$$updateInternal$1$$anonfun$apply$4.apply(DeltaLog.scala:310)
    at org.apache.spark.sql.delta.DeltaLog$$anonfun$org$apache$spark$sql$delta$DeltaLog$$updateInternal$1$$anonfun$apply$4.apply(DeltaLog.scala:251)
    at org.apache.spark.sql.delta.util.DeltaProgressReporter$class.withStatusCode(DeltaProgressReporter.scala:30)
    at org.apache.spark.sql.delta.DeltaLog.withStatusCode(DeltaLog.scala:59)
    at org.apache.spark.sql.delta.DeltaLog$$anonfun$org$apache$spark$sql$delta$DeltaLog$$updateInternal$1.apply(DeltaLog.scala:251)
    at org.apache.spark.sql.delta.DeltaLog$$anonfun$org$apache$spark$sql$delta$DeltaLog$$updateInternal$1.apply(DeltaLog.scala:251)
    at com.databricks.spark.util.DatabricksLogging$class.recordOperation(DatabricksLogging.scala:75)
    at org.apache.spark.sql.delta.DeltaLog.recordOperation(DeltaLog.scala:59)
    at org.apache.spark.sql.delta.metering.DeltaLogging$class.recordDeltaOperation(DeltaLogging.scala:105)
    at org.apache.spark.sql.delta.DeltaLog.recordDeltaOperation(DeltaLog.scala:59)
    at org.apache.spark.sql.delta.DeltaLog.org$apache$spark$sql$delta$DeltaLog$$updateInternal(DeltaLog.scala:250)
    at org.apache.spark.sql.delta.DeltaLog$$anonfun$update$2.apply(DeltaLog.scala:211)
    at org.apache.spark.sql.delta.DeltaLog$$anonfun$update$2.apply(DeltaLog.scala:211)
    at org.apache.spark.sql.delta.DeltaLog.lockInterruptibly(DeltaLog.scala:181)
    at org.apache.spark.sql.delta.DeltaLog.update(DeltaLog.scala:210)
    at org.apache.spark.sql.delta.DeltaLog.startTransaction(DeltaLog.scala:356)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:67)
    at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:146)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.AssertionError: File (file:/data/stream/polygon/C/C_20190523075908_155859834895516.json) doesn't belong in the transaction log at file:/data/lake/polygon/forex_quote_log/_delta_log. Please contact Databricks Support.
    at org.apache.spark.sql.delta.Snapshot$$anonfun$org$apache$spark$sql$delta$Snapshot$$assertLogBelongsToTable$1.apply(Snapshot.scala:215)
    at org.apache.spark.sql.delta.Snapshot$$anonfun$org$apache$spark$sql$delta$Snapshot$$assertLogBelongsToTable$1.apply(Snapshot.scala:210)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.serializefromobject_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

-------------------------------------------
Time: 2019-05-23 07:59:10
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:59:11
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-23 07:59:12
-------------------------------------------
zsxwing commented 5 years ago

@colinglaes Thanks for testing this. Can you try Scala and see if you can reproduce the same issue? Delta is using input_file_name and you may just hit https://issues.apache.org/jira/browse/SPARK-27711 . This one will be fixed in the next Spark release.

colinglaes commented 5 years ago

yep! I haven't really used scala but have been thinking of trying it out since there seems to be more functionality, i'll try my best to rewrite this code in scala and let you know what the results are

colinglaes commented 5 years ago

I wanted to double check whether or not removing the filename would fix the issue but unfortunately it didn't similar to 2.4.2. The main line of the error does make reference to the file name issue even though i didn't include the function, so i'm guessing that it's being used in the process of writing the streaming df to delta. Since I don't have any experience with scala and have been fairly busy I may wait until the next release and see if it fixes my issue. Do you have any idea of when it will be released?

-------------------------------------------
Time: 2019-05-24 04:05:25
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:05:26
-------------------------------------------
{'pair': 'HRK/AUD', 'exchange_id': 48, 'ask_price': 0.21872, 'bid_price': 0.21871, 'epoch': 1558670693000}
{'pair': 'CHF/JPY', 'exchange_id': 48, 'ask_price': 109.156, 'bid_price': 109.133, 'epoch': 1558670693000}
{'pair': 'ARS/ZAR', 'exchange_id': 48, 'ask_price': 0.32158, 'bid_price': 0.32152, 'epoch': 1558670693000}
{'pair': 'NZD/SEK', 'exchange_id': 48, 'ask_price': 6.2732, 'bid_price': 6.2616, 'epoch': 1558670693000}
{'pair': 'NZD/NOK', 'exchange_id': 48, 'ask_price': 5.7098, 'bid_price': 5.7006, 'epoch': 1558670693000}
{'pair': 'SEK/NOK', 'exchange_id': 48, 'ask_price': 0.9104, 'bid_price': 0.9102, 'epoch': 1558670693000}
{'pair': 'NZD/EUR', 'exchange_id': 48, 'ask_price': 0.5828, 'bid_price': 0.5824, 'epoch': 1558670693000}
{'pair': 'IDR/HKD', 'exchange_id': 48, 'ask_price': 0.0005451, 'bid_price': 0.00054479, 'epoch': 1558670693000}
{'pair': 'ZAR/HKD', 'exchange_id': 48, 'ask_price': 0.54165, 'bid_price': 0.54134, 'epoch': 1558670693000}
{'pair': 'COP/ARS', 'exchange_id': 48, 'ask_price': 0.013567, 'bid_price': 0.013565, 'epoch': 1558670693000}
...

INFO:log:transforming C json rdd to DataFrame
INFO:log:writing below streaming forex_quote_log data to delta
+-------+-----------+---------+---------+-------------+----------+
|   pair|exchange_id|ask_price|bid_price|        epoch|      date|
+-------+-----------+---------+---------+-------------+----------+
|HRK/AUD|         48|  0.21872|  0.21871|1558670693000|2019-05-24|
|CHF/JPY|         48|  109.156|  109.133|1558670693000|2019-05-24|
|ARS/ZAR|         48|  0.32158|  0.32152|1558670693000|2019-05-24|
|NZD/SEK|         48|   6.2732|   6.2616|1558670693000|2019-05-24|
|NZD/NOK|         48|   5.7098|   5.7006|1558670693000|2019-05-24|
|SEK/NOK|         48|   0.9104|   0.9102|1558670693000|2019-05-24|
|NZD/EUR|         48|   0.5828|   0.5824|1558670693000|2019-05-24|
|IDR/HKD|         48| 5.451E-4|5.4479E-4|1558670693000|2019-05-24|
|ZAR/HKD|         48|  0.54165|  0.54134|1558670693000|2019-05-24|
|COP/ARS|         48| 0.013567| 0.013565|1558670693000|2019-05-24|
|CAD/SGD|         48|   1.0241|   1.0237|1558670693000|2019-05-24|
|NZD/DKK|         48|   4.3534|   4.3488|1558670693000|2019-05-24|
|USD/ARS|         48|   45.081|   45.073|1558670693000|2019-05-24|
|INR/HKD|         48|  0.11251|  0.11245|1558670693000|2019-05-24|
|DKK/SEK|         48|    1.441|   1.4398|1558670693000|2019-05-24|
|SGD/HKD|         48|   5.6887|   5.6855|1558670693000|2019-05-24|
|JPY/RSD|         48|  0.96364|   0.9616|1558670693000|2019-05-24|
|DKK/HKD|         48|   1.1753|   1.1747|1558670693000|2019-05-24|
|MYR/HKD|         48|     1.88|    1.879|1558670693000|2019-05-24|
|THB/HKD|         48|  0.24588|  0.24574|1558670693000|2019-05-24|
+-------+-----------+---------+---------+-------------+----------+
only showing top 20 rows

ERROR:log:An error occurred while calling o220.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 14.0 failed 4 times, most recent failure: Lost task 0.3 in stage 14.0 (TID 164, blackened, executor 1): java.lang.AssertionError: File (file:/data/stream/polygon/C/C_20190524040525_155867072530212.json) doesn't belong in the transaction log at hdfs://localhost:9000/data/lake/polygon/forex_quote_log/_delta_log. Please contact Databricks Support.
    at org.apache.spark.sql.delta.Snapshot$$anonfun$org$apache$spark$sql$delta$Snapshot$$assertLogBelongsToTable$1.apply(Snapshot.scala:215)
    at org.apache.spark.sql.delta.Snapshot$$anonfun$org$apache$spark$sql$delta$Snapshot$$assertLogBelongsToTable$1.apply(Snapshot.scala:210)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.serializefromobject_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
    at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2551)
    at org.apache.spark.sql.Dataset.first(Dataset.scala:2558)
    at org.apache.spark.sql.delta.Snapshot.<init>(Snapshot.scala:142)
    at org.apache.spark.sql.delta.DeltaLog$$anonfun$org$apache$spark$sql$delta$DeltaLog$$updateInternal$1$$anonfun$apply$4.apply(DeltaLog.scala:310)
    at org.apache.spark.sql.delta.DeltaLog$$anonfun$org$apache$spark$sql$delta$DeltaLog$$updateInternal$1$$anonfun$apply$4.apply(DeltaLog.scala:251)
    at org.apache.spark.sql.delta.util.DeltaProgressReporter$class.withStatusCode(DeltaProgressReporter.scala:30)
    at org.apache.spark.sql.delta.DeltaLog.withStatusCode(DeltaLog.scala:59)
    at org.apache.spark.sql.delta.DeltaLog$$anonfun$org$apache$spark$sql$delta$DeltaLog$$updateInternal$1.apply(DeltaLog.scala:251)
    at org.apache.spark.sql.delta.DeltaLog$$anonfun$org$apache$spark$sql$delta$DeltaLog$$updateInternal$1.apply(DeltaLog.scala:251)
    at com.databricks.spark.util.DatabricksLogging$class.recordOperation(DatabricksLogging.scala:75)
    at org.apache.spark.sql.delta.DeltaLog.recordOperation(DeltaLog.scala:59)
    at org.apache.spark.sql.delta.metering.DeltaLogging$class.recordDeltaOperation(DeltaLogging.scala:105)
    at org.apache.spark.sql.delta.DeltaLog.recordDeltaOperation(DeltaLog.scala:59)
    at org.apache.spark.sql.delta.DeltaLog.org$apache$spark$sql$delta$DeltaLog$$updateInternal(DeltaLog.scala:250)
    at org.apache.spark.sql.delta.DeltaLog$$anonfun$update$2.apply(DeltaLog.scala:211)
    at org.apache.spark.sql.delta.DeltaLog$$anonfun$update$2.apply(DeltaLog.scala:211)
    at org.apache.spark.sql.delta.DeltaLog.lockInterruptibly(DeltaLog.scala:181)
    at org.apache.spark.sql.delta.DeltaLog.update(DeltaLog.scala:210)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$org$apache$spark$sql$delta$OptimisticTransactionImpl$$doCommit$1.apply$mcJ$sp(OptimisticTransaction.scala:323)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$org$apache$spark$sql$delta$OptimisticTransactionImpl$$doCommit$1.apply(OptimisticTransaction.scala:316)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$org$apache$spark$sql$delta$OptimisticTransactionImpl$$doCommit$1.apply(OptimisticTransaction.scala:316)
    at org.apache.spark.sql.delta.DeltaLog.lockInterruptibly(DeltaLog.scala:181)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$class.org$apache$spark$sql$delta$OptimisticTransactionImpl$$doCommit(OptimisticTransaction.scala:315)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$commit$1.apply$mcJ$sp(OptimisticTransaction.scala:232)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$commit$1.apply(OptimisticTransaction.scala:218)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$commit$1.apply(OptimisticTransaction.scala:218)
    at com.databricks.spark.util.DatabricksLogging$class.recordOperation(DatabricksLogging.scala:75)
    at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:66)
    at org.apache.spark.sql.delta.metering.DeltaLogging$class.recordDeltaOperation(DeltaLogging.scala:105)
    at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:66)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$class.commit(OptimisticTransaction.scala:218)
    at org.apache.spark.sql.delta.OptimisticTransaction.commit(OptimisticTransaction.scala:66)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:72)
    at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:146)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.AssertionError: File (file:/data/stream/polygon/C/C_20190524040525_155867072530212.json) doesn't belong in the transaction log at hdfs://localhost:9000/data/lake/polygon/forex_quote_log/_delta_log. Please contact Databricks Support.
    at org.apache.spark.sql.delta.Snapshot$$anonfun$org$apache$spark$sql$delta$Snapshot$$assertLogBelongsToTable$1.apply(Snapshot.scala:215)
    at org.apache.spark.sql.delta.Snapshot$$anonfun$org$apache$spark$sql$delta$Snapshot$$assertLogBelongsToTable$1.apply(Snapshot.scala:210)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.serializefromobject_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

-------------------------------------------
Time: 2019-05-24 04:05:27
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:05:28
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:05:29
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:05:30
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:05:31
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:05:32
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:05:33
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:05:34
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:05:35
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:05:36
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:05:37
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:05:38
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:05:39
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:05:40
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:05:41
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:05:42
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:05:43
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:05:44
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:05:45
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:05:46
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:05:47
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:05:48
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:05:49
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:05:50
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:05:51
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:05:52
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:05:53
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:05:54
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:05:55
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:05:56
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:05:57
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:05:58
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:05:59
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:06:00
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:06:01
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:06:02
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:06:03
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:06:04
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:06:05
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:06:06
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:06:07
-------------------------------------------
{'pair': 'CAD/HKD', 'exchange_id': 48, 'ask_price': 5.826, 'bid_price': 5.8233, 'epoch': 1558670725000}
{'pair': 'NOK/GBP', 'exchange_id': 48, 'ask_price': 0.090223, 'bid_price': 0.090192, 'epoch': 1558670725000}
{'pair': 'MYR/HKD', 'exchange_id': 48, 'ask_price': 1.8801, 'bid_price': 1.8793, 'epoch': 1558670725000}
{'pair': 'NOK/SEK', 'exchange_id': 48, 'ask_price': 1.09906, 'bid_price': 1.09804, 'epoch': 1558670725000}
{'pair': 'HKD/EUR', 'exchange_id': 48, 'ask_price': 0.11398, 'bid_price': 0.11393, 'epoch': 1558670725000}
{'pair': 'EUR/TRY', 'exchange_id': 48, 'ask_price': 6.8466, 'bid_price': 6.83835, 'epoch': 1558670725000}
{'pair': 'NOK/ZAR', 'exchange_id': 48, 'ask_price': 1.6552, 'bid_price': 1.6546, 'epoch': 1558670725000}
{'pair': 'NZD/GBP', 'exchange_id': 48, 'ask_price': 0.51475, 'bid_price': 0.51456, 'epoch': 1558670725000}
{'pair': 'GBP/ZAR', 'exchange_id': 48, 'ask_price': 18.35332, 'bid_price': 18.34111, 'epoch': 1558670725000}
{'pair': 'ZAR/HKD', 'exchange_id': 48, 'ask_price': 0.54166, 'bid_price': 0.54142, 'epoch': 1558670725000}
...

INFO:log:transforming C json rdd to DataFrame
INFO:log:writing below streaming forex_quote_log data to delta
+-------+-----------+---------+---------+-------------+----------+
|   pair|exchange_id|ask_price|bid_price|        epoch|      date|
+-------+-----------+---------+---------+-------------+----------+
|CAD/HKD|         48|    5.826|   5.8233|1558670725000|2019-05-24|
|NOK/GBP|         48| 0.090223| 0.090192|1558670725000|2019-05-24|
|MYR/HKD|         48|   1.8801|   1.8793|1558670725000|2019-05-24|
|NOK/SEK|         48|  1.09906|  1.09804|1558670725000|2019-05-24|
|HKD/EUR|         48|  0.11398|  0.11393|1558670725000|2019-05-24|
|EUR/TRY|         48|   6.8466|  6.83835|1558670725000|2019-05-24|
|NOK/ZAR|         48|   1.6552|   1.6546|1558670725000|2019-05-24|
|NZD/GBP|         48|  0.51475|  0.51456|1558670725000|2019-05-24|
|GBP/ZAR|         48| 18.35332| 18.34111|1558670725000|2019-05-24|
|ZAR/HKD|         48|  0.54166|  0.54142|1558670725000|2019-05-24|
|SGD/PLN|         48|  2.79148|  2.78942|1558670725000|2019-05-24|
|EUR/HUF|         48|  326.683|  325.992|1558670725000|2019-05-24|
|SEK/NOK|         48|  0.91071|  0.90987|1558670725000|2019-05-24|
|TRY/SGD|         48|  0.22561|  0.22537|1558670725000|2019-05-24|
|THB/HKD|         48|  0.24589|  0.24578|1558670725000|2019-05-24|
|GBP/PLN|         48|  4.87529|  4.87238|1558670725000|2019-05-24|
|NZD/TRY|         48|  3.98991|  3.98425|1558670725000|2019-05-24|
|SGD/TRY|         48|  4.43851|  4.43247|1558670725000|2019-05-24|
|CNY/SGD|         48|  0.19957|  0.19954|1558670725000|2019-05-24|
|DKK/TRY|         48|  0.91682|  0.91571|1558670725000|2019-05-24|
+-------+-----------+---------+---------+-------------+----------+
only showing top 20 rows

ERROR:log:An error occurred while calling o2995.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 23.0 failed 4 times, most recent failure: Lost task 0.3 in stage 23.0 (TID 176, blackened, executor 1): java.lang.AssertionError: File (file:/data/stream/polygon/C/C_20190524040606_155867076675476.json) doesn't belong in the transaction log at hdfs://localhost:9000/data/lake/polygon/forex_quote_log/_delta_log. Please contact Databricks Support.
    at org.apache.spark.sql.delta.Snapshot$$anonfun$org$apache$spark$sql$delta$Snapshot$$assertLogBelongsToTable$1.apply(Snapshot.scala:215)
    at org.apache.spark.sql.delta.Snapshot$$anonfun$org$apache$spark$sql$delta$Snapshot$$assertLogBelongsToTable$1.apply(Snapshot.scala:210)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.serializefromobject_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
    at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2551)
    at org.apache.spark.sql.Dataset.first(Dataset.scala:2558)
    at org.apache.spark.sql.delta.Snapshot.<init>(Snapshot.scala:142)
    at org.apache.spark.sql.delta.DeltaLog$$anonfun$org$apache$spark$sql$delta$DeltaLog$$updateInternal$1$$anonfun$apply$4.apply(DeltaLog.scala:310)
    at org.apache.spark.sql.delta.DeltaLog$$anonfun$org$apache$spark$sql$delta$DeltaLog$$updateInternal$1$$anonfun$apply$4.apply(DeltaLog.scala:251)
    at org.apache.spark.sql.delta.util.DeltaProgressReporter$class.withStatusCode(DeltaProgressReporter.scala:30)
    at org.apache.spark.sql.delta.DeltaLog.withStatusCode(DeltaLog.scala:59)
    at org.apache.spark.sql.delta.DeltaLog$$anonfun$org$apache$spark$sql$delta$DeltaLog$$updateInternal$1.apply(DeltaLog.scala:251)
    at org.apache.spark.sql.delta.DeltaLog$$anonfun$org$apache$spark$sql$delta$DeltaLog$$updateInternal$1.apply(DeltaLog.scala:251)
    at com.databricks.spark.util.DatabricksLogging$class.recordOperation(DatabricksLogging.scala:75)
    at org.apache.spark.sql.delta.DeltaLog.recordOperation(DeltaLog.scala:59)
    at org.apache.spark.sql.delta.metering.DeltaLogging$class.recordDeltaOperation(DeltaLogging.scala:105)
    at org.apache.spark.sql.delta.DeltaLog.recordDeltaOperation(DeltaLog.scala:59)
    at org.apache.spark.sql.delta.DeltaLog.org$apache$spark$sql$delta$DeltaLog$$updateInternal(DeltaLog.scala:250)
    at org.apache.spark.sql.delta.DeltaLog$$anonfun$update$2.apply(DeltaLog.scala:211)
    at org.apache.spark.sql.delta.DeltaLog$$anonfun$update$2.apply(DeltaLog.scala:211)
    at org.apache.spark.sql.delta.DeltaLog.lockInterruptibly(DeltaLog.scala:181)
    at org.apache.spark.sql.delta.DeltaLog.update(DeltaLog.scala:210)
    at org.apache.spark.sql.delta.DeltaLog.startTransaction(DeltaLog.scala:356)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:67)
    at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:146)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.AssertionError: File (file:/data/stream/polygon/C/C_20190524040606_155867076675476.json) doesn't belong in the transaction log at hdfs://localhost:9000/data/lake/polygon/forex_quote_log/_delta_log. Please contact Databricks Support.
    at org.apache.spark.sql.delta.Snapshot$$anonfun$org$apache$spark$sql$delta$Snapshot$$assertLogBelongsToTable$1.apply(Snapshot.scala:215)
    at org.apache.spark.sql.delta.Snapshot$$anonfun$org$apache$spark$sql$delta$Snapshot$$assertLogBelongsToTable$1.apply(Snapshot.scala:210)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.serializefromobject_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

-------------------------------------------
Time: 2019-05-24 04:06:08
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
-------------------------------------------
Time: 2019-05-24 04:06:09
-------------------------------------------

ERROR:log:received empty C json rdd, skipping
colinglaes commented 4 years ago

@brkyvz @tdas @zsxwing just upgraded to 2.4.4, no issues so far :D

zsxwing commented 4 years ago

Cool. So this is likely https://issues.apache.org/jira/browse/SPARK-27711. I'm closing this since this is not a Delta issue.