anthonyreilly / NetCoreForce

Salesforce REST API toolkit for .NET Standard and .NET Core
MIT License
110 stars 63 forks source link

Feature Request - Add Bulk Data Load Endpoints #53

Open bakes82 opened 1 year ago

bakes82 commented 1 year ago

It would be nice to have these added.

https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/bulk_api_2_0.htm

senj commented 1 year ago

Just for reference, I did something like this:

public class SalesforceJobRepository : ISalesforceJobRepository
    {
        private readonly ILogger<SalesforceJobRepository> _logger;
        private readonly SalesforceFaultHandler _salesforceFaultHandler;
        private readonly IResilientSalesforceClient _resilientSalesforceClient;
        private readonly HttpClient _httpClient;
        private readonly string _instanceUrl;

        public SalesforceJobRepository(
            ILogger<SalesforceJobRepository> logger,
            SalesforceFaultHandler salesforceFaultHandler,
            IResilientSalesforceClient resilientSalesforceClient,
            HttpClient httpClient)
        {
            _logger = logger;
            _salesforceFaultHandler = salesforceFaultHandler;
            _resilientSalesforceClient = resilientSalesforceClient;
            _httpClient = httpClient;

            _instanceUrl = _resilientSalesforceClient.GetForceClient().InstanceUrl;
        }

        public async Task WriteProducts(IEnumerable<SalesforceProductModel> products)
        {
            string jobId = await CreateJobAsync();
            _logger.LogInformation("Created salesforce job with id: {id}", jobId);

            await AddDataToJobAsync(jobId, products);
            _logger.LogInformation("Added {count} products to job", products.Count());

            await CloseJobAsync(jobId);
            _logger.LogInformation("Closed job");
        }

        private async Task<string> CreateJobAsync()
        {
            // only for updating access token
            int count = await _salesforceFaultHandler.ExecuteAsync(() => _resilientSalesforceClient.CountQueryAsync("SELECT COUNT() FROM Product2", false));
            _logger.LogInformation("Before starting the job, there are {count} products in salesforce.", count);

            string instanceUrl = $"{_instanceUrl}/services/data/v53.0/jobs/ingest/";
            _logger.LogDebug("Working on: {url}", instanceUrl);

            using HttpRequestMessage request = new(HttpMethod.Post, instanceUrl);
            request.Content = new StringContent(JsonConvert.SerializeObject(new CreateJobRequest
            {
                ContentType = "CSV",
                Operation = "upsert",
                ExternalIdFieldName = "MaterialNumberExternalId__c",
                ObjectTypeName = TtsSfProduct2.SObjectTypeName,
                LineEnding = "CRLF"
            }), Encoding.UTF8, "application/json");

            request.Headers.Add("Authorization", $"Bearer {_resilientSalesforceClient.GetForceClient().AccessToken}");

            HttpResponseMessage response = await _httpClient.SendAsync(request);
            string content = await response.Content.ReadAsStringAsync();

            if (!response.IsSuccessStatusCode)
            {
                _logger.LogError("Unable to create salesforce job: {statusCode} {message}", response.StatusCode, content);
            }
            else
            {
                _logger.LogInformation("Creating salesforce job returned with: {statusCode} {message}", response.StatusCode, content);
            }

            return JsonConvert.DeserializeObject<CreateJobResponse>(content).Id;
        }

        private async Task AddDataToJobAsync(string jobId, IEnumerable<SalesforceProductModel> products)
        {
            string instanceUrl = $"{_instanceUrl}/services/data/v53.0/jobs/ingest/{jobId}/batches";

            using HttpRequestMessage request = new(HttpMethod.Put, instanceUrl);
            string content = JsonToCsv(JsonConvert.SerializeObject(products), ",");
            request.Content = new StringContent(content, Encoding.UTF8, "text/csv");

            request.Headers.Add("Authorization", $"Bearer {_resilientSalesforceClient.GetForceClient().AccessToken}");

            HttpResponseMessage response = await _httpClient.SendAsync(request);
            string responseContent = await response.Content?.ReadAsStringAsync();
            if (!response.IsSuccessStatusCode)
            {
                _logger.LogError("Unable to update salesforce job: {statusCode} {message}", response.StatusCode, responseContent);
            }
            else
            {
                _logger.LogInformation("Updating salesforce job returned with: {statusCode} {message}", response.StatusCode, responseContent);
            }

            response.EnsureSuccessStatusCode();
        }

        private async Task CloseJobAsync(string jobId)
        {
            string instanceUrl = $"{_instanceUrl}/services/data/v53.0/jobs/ingest/{jobId}";

            using HttpRequestMessage request = new(HttpMethod.Patch, instanceUrl);
            request.Content = new StringContent(JsonConvert.SerializeObject(new Models.Salesforce.JobModels.UpdateJobRequest 
            {
                State = "UploadComplete" 
            }), Encoding.UTF8, "application/json");

            request.Headers.Add("Authorization", $"Bearer {_resilientSalesforceClient.GetForceClient().AccessToken}");

            HttpResponseMessage response = await _httpClient.SendAsync(request);
            string content = await response.Content?.ReadAsStringAsync();
            if (!response.IsSuccessStatusCode)
            {
                _logger.LogError("Unable to close salesforce job: {statusCode} {message}", response.StatusCode, content);
            }
            else
            {
                _logger.LogInformation("Closing salesforce job returned with: {statusCode} {message}", response.StatusCode, content);
            }

            response.EnsureSuccessStatusCode();
        }

        private static string JsonToCsv(string jsonContent, string delimiter)
        {
            CsvConfiguration config = new(CultureInfo.CurrentCulture)
            {
                Delimiter = delimiter
            };

            ExpandoObject[] expandos = JsonConvert.DeserializeObject<ExpandoObject[]>(jsonContent);
            using StringWriter writer = new();
            using CsvWriter csv = new(writer, config);
            csv.WriteRecords(expandos as IEnumerable<dynamic>);

            return writer.ToString();
        }
    }
bakes82 commented 1 year ago

Yup, I did roughly the same ;) Figured it would be a fairly straight forward ask, plus way better on API calls than trying to load thousands of records 1 by 1. Only gotchya is on the AddDataToJob, you cant exceed 150MB of data base64 encoded, they recommend 100MB if not base64 encoded, it will throw an error if you try to though if I remember correctly from my testing ;)