Bertverbeek4PS / bc2adls

Exporting data from Dynamics 365 Business Central to Azure data lake storage or MS Fabric lakehouse
MIT License
56 stars 22 forks source link

CompanyInformation-79 creates identical unique key across multiple BC companies #56

Closed greglong1 closed 11 months ago

greglong1 commented 11 months ago

We are using bc2adls to Microsoft Fabric and extracting multiple BC company data. When reviewing the csv files (in the delta folder) from bc2adls for the CompanyInformation-79 table, we note that the unique key (systemId-2000000000) is identical and so the delta table created only has a single company entry (due to the de-duplicate function in the notebook).

Can you confirm this is an issue? It appears to work correctly when extracting from the GeneralLedgerSetup-98 entity.

image

Bertverbeek4PS commented 11 months ago

Hi @greglong1, thanks for reporting. This indeed is an bug in the notebook script. You can have the same systemId because it is unique per table.

I have not tested it. But could you try this code:

%%pyspark import json import os import glob from pyspark.sql.types import * from pyspark.sql.utils import AnalysisException from pyspark.sql.functions import col from pyspark.sql.functions import desc file_list = []

for entry in os.scandir(folder_path): if entry.is_dir():

for filename in glob.glob(folder_path + entry.name + '/*'):     
    table_name = entry.name.replace("-","")

    df_new = spark.read.option("minPartitions", no_Partition).format("csv").option("header","true").load(folder_path_spark + entry.name +"/*")   
    file_list.append(filename) #collect the imported filed in a list for deletion later on

    f = open(folder_path_json + entry.name +".cdm.json")
    schema = json.load(f)
    # Parse the schema to get column names and data types
    column_names = [attr["name"] for attr in schema["definitions"][0]["hasAttributes"]] 
    column_types = [attr['dataFormat'] for attr in schema["definitions"][0]["hasAttributes"]]   
    for col_name, col_type in zip(column_names, column_types):
        if col_type == "String":
            col_type = "string"
        if col_type == "Guid":
            col_type = "string"
        if col_type == "Code":
            col_type = "object"
        if col_type == "Option":
            col_type = "string"
        if col_type == "Date":
            col_type = "date"
        if col_type == "Time":
            col_type = "string"
        if col_type == "DateTime":
            col_type = "date"
        if col_type == "Duration":
            col_type = "timedelta"
        if col_type == "Decimal":
            col_type = "float"
        if col_type == "Boolean":
            col_type = "boolean"
        if col_type == "Integer":
            col_type = "int"
        if col_type == "Int64":
            col_type = "int"
        if col_type == "Int32":
            col_type = "int"

        df_new = df_new.withColumn(col_name, df_new[col_name].cast(col_type))

    #check if the table excists
    if table_name in [t.name for t in spark.catalog.listTables()]:  
        #read the old data into a new dataframe and union with the new dataframe
        SQL_Query = "SELECT * FROM " + Lakehouse +"."+table_name;  
        df_old = spark.sql(SQL_Query)
        df_new = df_new.union(df_old).repartition(no_Partition)

        #delete all old records
        df_deletes = df_new.filter(df_new['SystemCreatedAt-2000000001'].isNull())
        df_new = df_new.join(df_deletes, ['$Company','systemId-2000000000'], 'leftanti')

        # remove duplicates by filtering on systemID and systemModifiedAt fields
        df_new = df_new.orderBy('$Company','systemId-2000000000',desc('SystemModifiedAt-2000000003'))
        df_new = df_new.dropDuplicates(['$Company','systemId-2000000000'])

        #overwrite the dataframe in the new table
        df_new.write.mode("overwrite").format("delta").save("Tables/" + table_name) 
    else:  
        #table isn't there so just insert it
        df_new.write.mode("overwrite").format("delta").save("Tables/" + table_name)

    #delete the files
    if Remove_delta:
        for filename in file_list:  
            try:  
                os.remove(filename)  
            except OSError as e:  # this would catch any error when trying to delete the file  
                print(f"Error: {filename} : {e.strerror}")
        file_list = [] # clear the list
greglong1 commented 11 months ago

@Bertverbeek4PS - thanks for the quick response. I believe these are the only changes in the notebook are the addition of:

'$Company',

in 3 of the lines extracted below

    #delete all old records
    df_deletes = df_new.filter(df_new['SystemCreatedAt-2000000001'].isNull())
    df_new = df_new.join(df_deletes, ['$Company','systemId-2000000000'], 'leftanti')

    # remove duplicates by filtering on systemID and systemModifiedAt fields
    df_new = df_new.orderBy('$Company','systemId-2000000000',desc('SystemModifiedAt-2000000003'))
    df_new = df_new.dropDuplicates(['$Company','systemId-2000000000'])

I'll test and advise if this works. Thanks Greg

Bertverbeek4PS commented 11 months ago

Yes indeed. Those are changed. Thanks for testing!!