Is your feature request related to a problem? Please describe.
When you aggregate data from many data sources contributed by many teams, its possible to have a schema that is changed underneath you. When this happens and you run with a TTL system, its possible to only notice this when things expire.
Describe the solution you'd like
Basic API
Introduce aFilter type that is capable of inferring and enforcing the record schema.
filters:
- implementation: nodestream.filters:SchemaEnforcement
arguments:
mode: "ENFORCE" # One of ENFORCE, WARN, INFER
storage:
location: s3
bucket: my-awesome-s3-schema-bucket
key: schemas/pipelines/my-schema-for-this-cool-pipeline.json
inference: # only used when mode is INFER
sample_size: 10000
Implementation Details
We can validate schemas with the jsonschema library.
Enforcement mode with filter out the offending record and produce two log messages. Oe that there was a filter at the ERROR log level and one containing the filtered record at the DEBUG level.
Warn mode will not filter anything, but will produce two log messages when something does not match the schema. One that there was a violation at the WARN log level and one containing the would be filtered record at the DEBUG level.
INFER mode will accumulate sample_size records and. then generate a schema using the genson library.
We can decouple storage modes from the core filter. Provide a File System and S3 Variant.
Is your feature request related to a problem? Please describe. When you aggregate data from many data sources contributed by many teams, its possible to have a schema that is changed underneath you. When this happens and you run with a TTL system, its possible to only notice this when things expire.
Describe the solution you'd like
Basic API
Introduce a
Filter
type that is capable of inferring and enforcing the record schema.Implementation Details
jsonschema
library.ERROR
log level and one containing the filtered record at theDEBUG
level.WARN
log level and one containing the would be filtered record at theDEBUG
level.INFER
mode will accumulatesample_size
records and. then generate a schema using thegenson
library.Describe alternatives you've considered N/A
Additional context N/A