singer-io / target-csv

Write Singer data to CSV files
GNU Affero General Public License v3.0
37 stars 68 forks source link

Re-use jsonschema Validator instances across records and always use Draft4Validator #4

Closed hughns closed 7 years ago

hughns commented 7 years ago

Two performance improvements:

  1. For large schemas (>100k) calling jsonschema.validate() is slow because it attempts to parse the schema to determine what version of the validator is used.
  2. A schema validator instance is created for each parsed record.

By using the Draft4Validator directly (like target-stitch already does) and caching schema validator instances we can save significant CPU usage overall:

Before:

(singer) cat stream | kernprof -v -l ./target_csv.py
  INFO Sending version information to stitchdata.com. To disable sending anonymous usage data, set the config parameter "disable_collection" to true
Wrote profile results to target_csv.py.lprof
Timer unit: 1e-06 s

Total time: 22.4596 s
File: ./target_csv.py
Function: persist_lines at line 37

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    37                                           @profile
    38                                           def persist_lines(delimiter, quotechar, lines):
    39         1            5      5.0      0.0      state = None
    40         1            2      2.0      0.0      schemas = {}
    41         1            1      1.0      0.0      key_properties = {}
    42         1            0      0.0      0.0      headers = {}
    43                                               
    44       171        42955    251.2      0.2      for line in lines:
    45       170          239      1.4      0.0          try:
    46       170       336087   1977.0      1.5              o = json.loads(line)
    47                                                   except json.decoder.JSONDecodeError:
    48                                                       logger.error("Unable to parse:\n{}".format(line))
    49                                                       raise
    50                                           
    51       170          331      1.9      0.0          if 'type' not in o:
    52                                                       raise Exception("Line is missing required key 'type': {}".format(line))
    53       170          205      1.2      0.0          t = o['type']
    54                                           
    55       170          203      1.2      0.0          if t == 'RECORD':
    56       169          166      1.0      0.0              if 'stream' not in o:
    57                                                           raise Exception("Line is missing required key 'stream': {}".format(line))
    58       169          234      1.4      0.0              if o['stream'] not in schemas:
    59                                                           raise Exception("A record for stream {} was encountered before a corresponding schema".format(o['stream']))
    60                                           
    61       169          182      1.1      0.0              schema = schemas[o['stream']]
    62       169     20930814 123851.0     93.2              validate(o['record'], schema)
    63                                           
    64       169          560      3.3      0.0              filename = o['stream'] + '.csv'
    65       169         5007     29.6      0.0              file_is_empty = (not os.path.isfile(filename)) or os.stat(filename).st_size == 0
    66                                           
    67       169       394393   2333.7      1.8              flattened_record = flatten(o['record'])
    68                                                       
    69       169          398      2.4      0.0              if o['stream'] not in headers and not file_is_empty:
    70         1           61     61.0      0.0                  with open(filename, 'r') as csvfile:
    71         1            2      2.0      0.0                      reader = csv.reader(csvfile,
    72         1            2      2.0      0.0                                          delimiter=delimiter,
    73         1           11     11.0      0.0                                          quotechar=quotechar)
    74         1           95     95.0      0.0                      first_line = next(reader)
    75         1           21     21.0      0.0                      headers[o['stream']] = first_line if first_line else flattened_record.keys()
    76                                                       else:
    77       168          308      1.8      0.0                  headers[o['stream']] = flattened_record.keys()
    78                                                       
    79       169        10038     59.4      0.0              with open(filename, 'a') as csvfile:
    80       169          352      2.1      0.0                  writer = csv.DictWriter(csvfile,                                        
    81       169          223      1.3      0.0                                          headers[o['stream']],
    82       169          154      0.9      0.0                                          extrasaction='ignore',
    83       169          154      0.9      0.0                                          delimiter=delimiter,
    84       169         8406     49.7      0.0                                          quotechar=quotechar)
    85       169          206      1.2      0.0                  if file_is_empty:
    86                                                               writer.writeheader()
    87                                                               
    88       169       727454   4304.5      3.2                  writer.writerow(flattened_record)    
    89                                           
    90       169          365      2.2      0.0              state = None
    91         1            1      1.0      0.0          elif t == 'STATE':
    92                                                       logger.debug('Setting state to {}'.format(o['value']))
    93                                                       state = o['value']
    94         1            1      1.0      0.0          elif t == 'SCHEMA':
    95         1            1      1.0      0.0              if 'stream' not in o:
    96                                                           raise Exception("Line is missing required key 'stream': {}".format(line))
    97         1            1      1.0      0.0              stream = o['stream']
    98         1            1      1.0      0.0              schemas[stream] = o['schema']
    99         1            1      1.0      0.0              if 'key_properties' not in o:
   100                                                           raise Exception("key_properties field is required")
   101         1            1      1.0      0.0              key_properties[stream] = o['key_properties']
   102                                                   else:
   103                                                       raise Exception("Unknown message type {} in message {}"
   104                                                                       .format(o['type'], o))
   105                                               
   106         1            1      1.0      0.0      return state

With the changes:

(singer)  cat stream | kernprof -v -l ./target_csv.py 
  INFO Sending version information to stitchdata.com. To disable sending anonymous usage data, set the config parameter "disable_collection" to true
Wrote profile results to target_csv.py.lprof
Timer unit: 1e-06 s

Total time: 15.5296 s
File: ./target_csv.py
Function: persist_lines at line 37

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    37                                           @profile
    38                                           def persist_lines(delimiter, quotechar, lines):
    39         1            6      6.0      0.0      state = None
    40         1            1      1.0      0.0      schemas = {}
    41         1            1      1.0      0.0      key_properties = {}
    42         1            1      1.0      0.0      headers = {}
    43         1            1      1.0      0.0      validators = {}
    44                                               
    45       171        42725    249.9      0.3      for line in lines:
    46       170          280      1.6      0.0          try:
    47       170       331255   1948.6      2.1              o = json.loads(line)
    48                                                   except json.decoder.JSONDecodeError:
    49                                                       logger.error("Unable to parse:\n{}".format(line))
    50                                                       raise
    51                                           
    52       170          321      1.9      0.0          if 'type' not in o:
    53                                                       raise Exception("Line is missing required key 'type': {}".format(line))
    54       170          216      1.3      0.0          t = o['type']
    55                                           
    56       170          200      1.2      0.0          if t == 'RECORD':
    57       169          167      1.0      0.0              if 'stream' not in o:
    58                                                           raise Exception("Line is missing required key 'stream': {}".format(line))
    59       169          224      1.3      0.0              if o['stream'] not in schemas:
    60                                                           raise Exception("A record for stream {} was encountered before a corresponding schema".format(o['stream']))
    61                                           
    62       169          185      1.1      0.0              schema = schemas[o['stream']]
    63       169     14015068  82929.4     90.2              validators[o['stream']].validate(o['record'])
    64                                           
    65       169          569      3.4      0.0              filename = o['stream'] + '.csv'
    66       169         4771     28.2      0.0              file_is_empty = (not os.path.isfile(filename)) or os.stat(filename).st_size == 0
    67                                           
    68       169       387677   2293.9      2.5              flattened_record = flatten(o['record'])
    69                                                       
    70       169          369      2.2      0.0              if o['stream'] not in headers and not file_is_empty:
    71         1           59     59.0      0.0                  with open(filename, 'r') as csvfile:
    72         1            3      3.0      0.0                      reader = csv.reader(csvfile,
    73         1            1      1.0      0.0                                          delimiter=delimiter,
    74         1            9      9.0      0.0                                          quotechar=quotechar)
    75         1           82     82.0      0.0                      first_line = next(reader)
    76         1           17     17.0      0.0                      headers[o['stream']] = first_line if first_line else flattened_record.keys()
    77                                                       else:
    78       168          295      1.8      0.0                  headers[o['stream']] = flattened_record.keys()
    79                                                       
    80       169         9731     57.6      0.1              with open(filename, 'a') as csvfile:
    81       169          352      2.1      0.0                  writer = csv.DictWriter(csvfile,                                        
    82       169          208      1.2      0.0                                          headers[o['stream']],
    83       169          156      0.9      0.0                                          extrasaction='ignore',
    84       169          156      0.9      0.0                                          delimiter=delimiter,
    85       169         8686     51.4      0.1                                          quotechar=quotechar)
    86       169          222      1.3      0.0                  if file_is_empty:
    87                                                               writer.writeheader()
    88                                                               
    89       169       724462   4286.8      4.7                  writer.writerow(flattened_record)    
    90                                           
    91       169          426      2.5      0.0              state = None
    92         1            1      1.0      0.0          elif t == 'STATE':
    93                                                       logger.debug('Setting state to {}'.format(o['value']))
    94                                                       state = o['value']
    95         1            1      1.0      0.0          elif t == 'SCHEMA':
    96         1            2      2.0      0.0              if 'stream' not in o:
    97                                                           raise Exception("Line is missing required key 'stream': {}".format(line))
    98         1            1      1.0      0.0              stream = o['stream']
    99         1            2      2.0      0.0              schemas[stream] = o['schema']
   100         1          649    649.0      0.0              validators[stream] = Draft4Validator(o['schema'])
   101         1            2      2.0      0.0              if 'key_properties' not in o:
   102                                                           raise Exception("key_properties field is required")
   103         1            2      2.0      0.0              key_properties[stream] = o['key_properties']
   104                                                   else:
   105                                                       raise Exception("Unknown message type {} in message {}"
   106                                                                       .format(o['type'], o))
   107                                               
   108         1            0      0.0      0.0      return state

In this example (for a large schema) a reduction in validation time from 120ms per record to 83ms per record.

cmerrick commented 7 years ago

Hi @hughns, thanks for your contribution!

In order for us to evaluate and accept your PR, we ask that you sign a contribution license agreement. It's all electronic and will take just minutes.

cmerrick commented 7 years ago

You did it @hughns!

Thank you for signing the Singer Contribution License Agreement.

KAllan357 commented 7 years ago

:+1: :100: Thank you!