Azure / spark-cdm-connector

MIT License
75 stars 32 forks source link

Incremental or partition elimination using spark-cdm-connector? #59

Open subashsivaji opened 3 years ago

subashsivaji commented 3 years ago

We are using, Databricks runtime 6.6. (Spark 2.4.5) com.microsoft.azure:spark-cdm-connector:0.18.1

When reading data from cdm folder - how do we do partition elimination using spark-cdm-connector? Example, I have contact entity/table in cdm folder which contains 2 million records.

In non-cdm scenarios,,

  1. we would have had date partitioned folder structure to eliminate and read only specific partitions.
  2. (or) we would have done auto-loader to incrementally load the deltas based on file changes.

Using cdm folder structure and spark-cdm-connector - when we do the following - everytime this is going to scan 2 million records - load it into dataframe and then does the filter based on modifiedon. This is of course not scalable. Are there any alternatives? Any better way to do partition elimination? Please suggest?

readDf = spark.read.format("com.microsoft.cdm")\
.option("storage", "mystorageaccount.dfs.core.windows.net")\
.option("manifestPath", "/commondataservice-xxxx-orgxxxxx/model.json")\
.option("entity", "contact")\
.option("appId","xxxxxxxxx-8286f4564568")\
.option("appKey", "xxxxxxxxxx_MX3U_uETaXMaererccccc~")\
.option("tenantId", "67878678-vvvvv-vvvv-vvvv-8347ddb3daa7")\
.load()

## this will give just records that were modified in 2020
## however this will read the entire 2 million records and then filter.
filter_df = readDf.filter(year(readDf.modifiedon) >= 2020)
filter_df.count()

Please see plan - there is no partition elimination.

filter_df.explain(True)

== Parsed Logical Plan ==
'Filter (year(modifiedon#341) >= 2020)
+- RelationV2 DefaultSource[Id#4, SinkCreatedOn#5, SinkModifiedOn#6, statecode#7L, statuscode#8L, haschildrencode#9L, customertypecode#10L, gendercode#11L, address1_addresstypecode#12L, preferredappointmenttimecode#13L, pro_brokertype#14L, address2_freighttermscode#15L, msdyn_orgchangestatus#16L, paymenttermscode#17L, accountrolecode#18L, territorycode#19L, address3_freighttermscode#20L, pro_contacttype#21L, address1_shippingmethodcode#22L, preferredappointmentdaycode#23L, customersizecode#24L, preferredcontactmethodcode#25L, leadsourcecode#26L, address3_addresstypecode#27L, ... 324 more fields] (Options: [appKey=xxxxxxxxxxxxxx,manifestPath=commondataservice-yyyyyyyyyyyyyy/mode...)

== Analyzed Logical Plan ==
Id: string, SinkCreatedOn: timestamp, SinkModifiedOn: timestamp, statecode: bigint, statuscode: bigint, haschildrencode: bigint, customertypecode: bigint, gendercode: bigint, address1_addresstypecode: bigint, preferredappointmenttimecode: bigint, pro_brokertype: bigint, address2_freighttermscode: bigint, msdyn_orgchangestatus: bigint, paymenttermscode: bigint, accountrolecode: bigint, territorycode: bigint, address3_freighttermscode: bigint, pro_contacttype: bigint, address1_shippingmethodcode: bigint, preferredappointmentdaycode: bigint, customersizecode: bigint, preferredcontactmethodcode: bigint, leadsourcecode: bigint, address3_addresstypecode: bigint, ... 324 more fields
Filter (year(cast(modifiedon#341 as date)) >= 2020)
+- RelationV2 DefaultSource[Id#4, SinkCreatedOn#5, SinkModifiedOn#6, statecode#7L, statuscode#8L, haschildrencode#9L, customertypecode#10L, gendercode#11L, address1_addresstypecode#12L, preferredappointmenttimecode#13L, pro_brokertype#14L, address2_freighttermscode#15L, msdyn_orgchangestatus#16L, paymenttermscode#17L, accountrolecode#18L, territorycode#19L, address3_freighttermscode#20L, pro_contacttype#21L, address1_shippingmethodcode#22L, preferredappointmentdaycode#23L, customersizecode#24L, preferredcontactmethodcode#25L, leadsourcecode#26L, address3_addresstypecode#27L, ... 324 more fields] (Options: [appKey=xxxxxxxxxxxxxx,manifestPath=commondataservice-yyyyyyyyyyyyyy/mode...)

== Optimized Logical Plan ==
Filter (year(cast(modifiedon#341 as date)) >= 2020)
+- RelationV2 DefaultSource[Id#4, SinkCreatedOn#5, SinkModifiedOn#6, statecode#7L, statuscode#8L, haschildrencode#9L, customertypecode#10L, gendercode#11L, address1_addresstypecode#12L, preferredappointmenttimecode#13L, pro_brokertype#14L, address2_freighttermscode#15L, msdyn_orgchangestatus#16L, paymenttermscode#17L, accountrolecode#18L, territorycode#19L, address3_freighttermscode#20L, pro_contacttype#21L, address1_shippingmethodcode#22L, preferredappointmentdaycode#23L, customersizecode#24L, preferredcontactmethodcode#25L, leadsourcecode#26L, address3_addresstypecode#27L, ... 324 more fields] (Options: [appKey=xxxxxxxxxxxxxx,manifestPath=commondataservice-yyyyyyyyyyyyyy/mode...)

== Physical Plan ==
Project [Id#4, SinkCreatedOn#5, SinkModifiedOn#6, statecode#7L, statuscode#8L, haschildrencode#9L, customertypecode#10L, gendercode#11L, address1_addresstypecode#12L, preferredappointmenttimecode#13L, pro_brokertype#14L, address2_freighttermscode#15L, msdyn_orgchangestatus#16L, paymenttermscode#17L, accountrolecode#18L, territorycode#19L, address3_freighttermscode#20L, pro_contacttype#21L, address1_shippingmethodcode#22L, preferredappointmentdaycode#23L, customersizecode#24L, preferredcontactmethodcode#25L, leadsourcecode#26L, address3_addresstypecode#27L, ... 324 more fields]
+- Filter (year(cast(modifiedon#341 as date)) >= 2020)
   +- ScanV2 DefaultSource[Id#4, SinkCreatedOn#5, SinkModifiedOn#6, statecode#7L, statuscode#8L, haschildrencode#9L, customertypecode#10L, gendercode#11L, address1_addresstypecode#12L, preferredappointmenttimecode#13L, pro_brokertype#14L, address2_freighttermscode#15L, msdyn_orgchangestatus#16L, paymenttermscode#17L, accountrolecode#18L, territorycode#19L, address3_freighttermscode#20L, pro_contacttype#21L, address1_shippingmethodcode#22L, preferredappointmentdaycode#23L, customersizecode#24L, preferredcontactmethodcode#25L, leadsourcecode#26L, address3_addresstypecode#27L, ... 324 more fields] (Options: [appKey=xxxxxxxxxxxxxx,manifestPath=commondataservice-yyyyyyyyyyyyyy/mode...)

Scan and then filter image

bissont commented 3 years ago

Hi,

There is currently no support for the partition elimination using the connector.

I'm marking it as a feature request. @billgib /@euangms can provide more details about future feature development work.

Thanks, Tim

subashsivaji commented 3 years ago

@billgib /@euangms Any ETA on this? Due to this limitation we had to fall-back to Azure Data Factory mapping data flows CDM connector. But we wanted to use spark-cdm-connector for our client.

euangms commented 3 years ago

We are currently going through our next planning cycle, this request is tracked as 978616

reynoldspravindev commented 3 years ago

I second subashsivaji's request. This would be a great feature for big data platforms where the data volume for a given entity is high and although only records in the past year or so gets modified. In this case, we end up reading the complete data of the entity and then do a filter on the dataframe for further processing. This induces a potential performance bottleneck down the line as the data increases. Hence it would be great if this can be implemented as feature.

SQLArchitect commented 3 years ago

As I interpret the documentation there's room for conjecture that the CDM object doesn't need to actually contain the data but is a wrapper over the Delta Lake. Is this true? If so, what compute resources process the queries?

On Thu, Jan 28, 2021, 9:21 PM reynoldspravindev notifications@github.com wrote:

I second subashsivaji's request. This would be a great feature for big data platforms where the data volume for a given entity is high and although only records in the past year or so gets modified. In this case, we end up reading the complete data of the entity and then do a filter on the dataframe for further processing. This induces a potential performance bottleneck down the line as the data increases. Hence it would be great if this can be implemented as feature.

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/Azure/spark-cdm-connector/issues/59#issuecomment-769529139, or unsubscribe https://github.com/notifications/unsubscribe-auth/AHWPJK3A25BH75WPIA5KN6LS4ILRBANCNFSM4TXUESKQ .

TissonMathew commented 3 years ago

@SQLArchitect Spark CDM connector currently doesn't support Delta. We built a workaround to use CDM (schema only) with Delta Lake for our SaaS product.

SQLArchitect commented 3 years ago

@TissonMathew I would love the opportunity to speak with you about this. Whether or not it supports Delta, what is providing the compute? Is it the M engine running on a multi worker node Spark cluster?

InitionsJulius commented 3 years ago

Are there any news for this delta feature? We would also like to use it.

SQLArchitect commented 3 years ago

These are not Delta Lake specific.

On Fri, Apr 16, 2021, 6:08 AM InitionsJulius @.***> wrote:

Are there any news for this delta feature? We would also like to use it.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/Azure/spark-cdm-connector/issues/59#issuecomment-821070106, or unsubscribe https://github.com/notifications/unsubscribe-auth/AHWPJK65Q4MWU7N2F6465PDTJAEDPANCNFSM4TXUESKQ .

NoctisWu commented 1 year ago

Hello, wanted to follow up if there's any news for this partition elimination feature? It would be helpful if we can specify the partition during the load and exclude the rest partitions. @bissont

andreasgranqvist commented 1 year ago

Hello, I would also very much like to hear if there are any plans to implement partition elimination?