Azure / azure-cosmosdb-js-server

The JavaScript SDK for server-side programming in Azure Cosmos DB
MIT License
179 stars 152 forks source link

Add sample stored procedure for bulkUpsert #10

Closed codecadwallader closed 5 years ago

codecadwallader commented 7 years ago

This sample is largely identical to bulkImport, but it utilizes upsertDocument instead of createDocument. This allows for re-running an import, or other use cases where documents may already exist and you would prefer to overwrite them instead of stopping the operation.

michaeldavie commented 4 years ago

I'm not sure why this never got merged, but I wanted to ask: does it work? I'm interested in exactly this use case, but just wanted to make sure before I dig into it.

codecadwallader commented 4 years ago

It's been a couple years so your mileage may vary.. but it did at the time so it should.

vchalmel commented 4 years ago

@michaeldavie, @codecadwallader I had one in production for a year, had to smooth some edges from this exemple code, if you have any trouble hit me up and I could try to help.

I may also provide, if interested, sample python code on client side to automatically send bulks, and retrieve occasionnal http errors (e.g. when bulks are too big a multi-threading system to split and re-send, or eventually catch and remove from the bulk some elements causing js errors on cosmos side... etc.)

michaeldavie commented 4 years ago

Thanks @codecadwallader, the stored proc itself is working perfectly.

Sure @vchalmel, any Python you have handy would be appreciated. I've based my approach on this post, to which I had to add the partition key as noted here.

I had a few 413 errors (request too large) that I resolved by chunking my submission as described here. I still have the occasional 429, but with this bulk import I think I might not need multithreading at all anymore.

Once again, thanks for your help!

vchalmel commented 4 years ago

Here is my code sample. Interesting bits :

# -*- coding : UTF-8 -*-

from sys import exc_info

import logging

import re, json

from azure.cosmos.cosmos_client import CosmosClient
from azure.cosmos.documents import ConnectionPolicy
from azure.cosmos.errors import HTTPFailure

from requests.exceptions import ReadTimeout

import locale
from locale import setlocale

import datetime
from datetime import date, datetime
from time import sleep

from threading import Thread

from random import randint

# This function enables logging

def setupLog( loggerName, loggerPath, loggerLevel = logging.ERROR ) :
  #Formatter
  formatter = logging.Formatter('%(asctime)s :: %(levelname)s :: %(message)s')
  #FileHandler
  fileHandler = logging.FileHandler(
    loggerPath,
    encoding = 'UTF-8'
  )
  fileHandler.setFormatter(formatter)
  fileHandler.setLevel(logging.DEBUG)
  #Logger
  logger = logging.getLogger(loggerName)
  if not len(logger.handlers) :
    logger.addHandler(fileHandler)
  else :
    logger.handlers[0] = fileHandler
  logger.propagate = False
  logger.setLevel(loggerLevel)
  #Initialisation
  logger.info('Début Logging')
  return logger

# Dictionnary storing CosmosDB configuration and credentials
configCosmosDB = {
  'ENDPOINT': '',
  'MASTERKEY': '',
  'DOCUMENTDB_DATABASE': '',
  'DOCUMENTDB_CONTAINER': ''
}

# This function establishes two connections policies because we use a multi-region deployement, 
# That would allow us to distribute procedure calls among regions when required
def connectAzure() :

  # This lined is a workaround for a httpheader bug in cosmos queries when using french locales and can be deleted
  setlocale(locale.LC_ALL,['en_US','UTF-8'])

  try :
    connectionPolicy_FR = ConnectionPolicy()
    connectionPolicy_FR.PreferredLocations = ['France Central','West Europe']

    connectionPolicy_EU = ConnectionPolicy()
    connectionPolicy_EU.PreferredLocations = ['West Europe','France Central']

    # Initialize the Python DocumentDB client
    client_FR = CosmosClient(
      configCosmosDB['ENDPOINT'], 
      {
        'masterKey' : configCosmosDB['MASTERKEY']
      }, 
      connectionPolicy_FR
    )

    client_EU = CosmosClient(
      configCosmosDB['ENDPOINT'], 
      {
        'masterKey' : configCosmosDB['MASTERKEY']
      }, 
      connectionPolicy_EU
    )

    return ({
        'client' : client_FR, 
        'database ' : client_FR.ReadDatabase('/dbs/' + configCosmosDB['DOCUMENTDB_DATABASE']), 
        'container' : client_FR.ReadContainer('/dbs/' + configCosmosDB['DOCUMENTDB_DATABASE'] + '/colls/' + configCosmosDB['DOCUMENTDB_CONTAINER'])
      },{
        'client' : client_EU, 
        'database ' : client_EU.ReadDatabase('/dbs/' + configCosmosDB['DOCUMENTDB_DATABASE']), 
        'container' : client_EU.ReadContainer('/dbs/' + configCosmosDB['DOCUMENTDB_DATABASE'] + '/colls/' + configCosmosDB['DOCUMENTDB_CONTAINER'])
    })
  except :
    raise

# connectionAzure would be one of the returned dictionary of the previous function's couple, storedProcedure is the procedure's id
def getStoredProcedure(connectionAzure, storedProcedure) :
  try : 
    requeteProcedure = connectionAzure['client'].QueryStoredProcedures(
      connectionAzure['container']['_self'],
      {
        'query' : 'SELECT * FROM root r WHERE r.id = @id',
        'parameters' : [{
          'name' : '@id',
          'value' : storedProcedure
        }]
      }
    )
    return list(requeteProcedure)[0]
  except :
    raise

# This class handles threaded consumption of CosmosDB stored procs
class ProcedureRunner(Thread) :

  def __init__(self, connectionAzure, procedure, docs, partition, logger):

    super().__init__()

    self.connection = connectionAzure
    self.procedure = procedure
    self.docs = docs
    self.partition = partition
    self.logger = logger

  def run(self):

    try :

      executionProcedure = self.connection['client'].ExecuteStoredProcedure(
        self.procedure,
        [self.docs],
        {'partitionKey' : self.partition}
      )
      if executionProcedure < len(self.docs) :
        self.docs = self.docs[executionProcedure:]
        self.run()

    except HTTPFailure as h :

      # When receiving error 413 the dataset is splitted in half
      if h.status_code == 413 :

        if len(self.docs) > 1 :

          t1 = ProcedureRunner(
            self.connection, 
            self.procedure, 
            self.docs[:len(self.docs)//2], 
            self.partition, 
            self.logger
          )
          t2 = ProcedureRunner(
            self.connection, 
            self.procedure, 
            self.docs[len(self.docs)//2:], 
            self.partition, 
            self.logger
          )
          try :
            t1.start()
          except :
            raise
          try : 
            t2.start()
          except :
            raise 
          t1.join()
          t2.join()

        else :

          self.logger.error('Document invalide : \n' + json.dumps(self.docs))

      # When receiving error 400 the id of the document is extracted from the error msg, this json is logged and the list is resent minus this doc
      elif h.status_code == 400 :

        error_message =  json.loads(h._http_error_message)['message']
        js_err = json.loads(
          re.search( r'Error: (.+?)\\r\\nStack trace', error_message)\
            .group(1)\
            .replace(r'\"','"')
        )
        self.logger.error(
          'Erreur 400 ' + js_err['msg'] \
            + ' à l\'id ' + js_err['id'] \
            + '\n' \
            + json.dumps(
              [
                it for it in self.docs if (
                  (
                    'id' in it and it['id'] == js_err['id']
                  ) or (
                    'filter' in it and it['filter']['id'] == js_err['id']
                  )
                )
              ]
            )
        )
        self.docs = [
          doc for doc in self.docs if (
            (
              'id' in doc and doc['id'] != js_err['id']
            ) or (
              'filter' in doc and doc['filter']['id'] != js_err['id']
            )
          )
        ]
        self.run()

      # When receiving error 429 we wait for the recommended amount of time to retry
      elif h.status_code == 429 :

        self.logger.warning('HTTP 429 - Temporisation ' + h.headers['x-ms-retry-after-ms'] + ' ms')
        sleep(int(h.headers['x-ms-retry-after-ms'])/1000)
        self.run()

      else :

        self.logger.error(
          'HttpFailure levée : %(exception)s : %(timestamp)s'%{
            'exception': exc_info(),
            'timestamp':str(datetime.now())
        })

    except ReadTimeout as r :

      self.logger.warning(
        'ReadTimeout : %(exception)s '%{
          'exception' : exc_info()
        }
      )
      sleep(10)
      self.run()

    except :
      self.logger.exception('Exception non gérée')
      with open('dump' + self.logger.name + '.' + datetime.now().strftime('%Y%m%d-%H%M%S') + '.json', 'w') as outfile:
        json.dump(self.docs, outfile)
      raise

# Those two next functions take the result of the connectiong function as first parameter, a list of jsons, and the configured logger from setupLog

def runBulkUpsert(connectionAzure, listeDocuments, logger) :

  partitions = set([d['keypartition'] for d in listeDocuments])

  upsertProcedure = getStoredProcedure(
    connectionAzure[0], 
    'bulkUpsert'
  )

  #Initialization of threads to call the stored procedure for each partition balanced accross both regions 

  threads = [
    ProcedureRunner(
      connectionAzure[randint(0,1)],
      upsertProcedure['_self'],
      [document for document in listeDocuments if document['keypartition'] == partition],
      partition,
      logger
    ) for partition in partitions
  ]

  for t in threads : 
    try :
      t.start()
    except :
      raise
  for t in threads :
      t.join()

def runBulkSet(connectionAzure, listeDefinitions, logger) :

  partitions = set([d['filter']['keypartition'] for d in listeDefinitions])

  setProcedure = getStoredProcedure(
    connectionAzure[0], 
    'bulkSet'
  )

  threads = [
    ProcedureRunner(
      connectionAzure[randint(0,1)],
      setProcedure['_self'],
      [document for document in listeDefinitions if document['filter']['keypartition'] == partition],
      partition,
      logger
    ) for partition in partitions
  ]

  for t in threads : 
    try :
      t.start()
    except :
      raise
  for t in threads :
      t.join()
michaeldavie commented 4 years ago

Awesome, thanks!