tensorflow / tfx

TFX is an end-to-end platform for deploying production ML pipelines
https://tensorflow.org/tfx
Apache License 2.0
2.11k stars 707 forks source link

Improving time series forecasting and similar tasks #2199

Closed ntakouris closed 2 years ago

ntakouris commented 4 years ago

I've been creating an e2e pipeline for the past month using tfx. I feel that there are some things that could be vastly improved for such tasks. Please read on and if something does not make sense, @ me to explain more :), this is a matter open for discussion.

The task is simple: given a input_window_size features, compute the next feature values on the next timestep. Preprocessing will always be an easy 3-liner with transform:

(just z-score scale everything)

def preprocessing_fn(inputs):
  outputs = {}
  for key in features.DENSE_FLOAT_FEATURE_KEYS:
    outputs[features.transformed_name(key)] = tft.scale_to_z_score(inputs[key])

  return outputs

From this point on, these problems require attention:

Lack of a label column

Given a dataset D (T timesteps, N features), window in W-sized frames using the W+i element as a label. This is a bit of a hassle to do with just tfds. Here's a sample code that does

@tf.function
def collate_fn(x, x_idx_start, x_idx_num, y_idx_start, y_idx_num):
    return tf.slice(x, [x_idx_start, 0], [x_idx_num, <feature size>]), tf.slice(x, [y_idx_start, 0], [y_idx_num, <feature size>])

@tf.function
def sub_to_batch(sub, window):
    return sub.batch(window)

@tf.function
def unmarshal(x, keys):
    t = [x[k] for k in keys]
    return tf.reshape(t, [len(keys)])

# on input fn
dataset = tf.data.experimental.make_batched_features_dataset(
        file_pattern=file_pattern,
        features=transformed_feature_spec,
        reader=_gzip_reader_fn,
        shuffle=False,
        sloppy_ordering=False,
        batch_size=big_window * 8) \ #load many together to not be io bounded
        .unbatch() \ # unbatch for windowing proper order
        .map(partial(unmarshal, keys=features.transformed_names(features.DENSE_FLOAT_FEATURE_KEYS)), num_parallel_calls=tf.data.experimental.AUTOTUNE) \ # unmarshal from {feature: value} dict to pure tensors
        .window(big_window, shift=1) \  # window
        .flat_map(partial(sub_to_batch, window=big_window)) \ # unwrap the dataset of datasets to window size
        .shuffle(True) \ # verything is in self-contained here and we can shuffle
        .map(collate_fn_partial, num_parallel_calls=tf.data.experimental.AUTOTUNE) \ # produce labels
        .prefetch(tf.data.experimental.AUTOTUNE) \
        .batch(batch_size) # use the actual batch size

This works, but produces problems in the serving and hyperparameter searching approaches, restricting you to use constant windows per job run, because the tuner_fn does not call your run_fn directly (because it also saves the model), but just builds and evaluates the model on it's own context. Therefore, your input_fn can't depend on hyperparameters.

This could be fixed by having a separate function that saves the model for serving and moving model and input building to a different function that's shared to tuner and trainer.

Serving

I've not managed to invoke the model I build this way with tf.Example, but I made a new _get_serve_raw_fn that's used to parse raw-json time series data. There are a couple of problems.

First, by the preprocessing and tfds loading. The model just receives tensors (unmarshall function on the tfds), so you've got to make some sort of manual mapping from transformed key name tensors (ex. feature to feature_xf), either by adding another marshalling reconstruction layer (bring back ditched dict keys) on the dataset loading side along with named model inputs, or by changing the serving function to map tft_layer outputs to your model.

The second problem is related to the dimensions of tft_transform, where you need to put inputs in the form of (None,) to broadcast the z-score operations to all the window feature columns (using None in the signature spec is not optimal, constraints can be typed into the input tensor spec), and then reshape (by messing with the batch dimension) via the keras backend, in order to support our normal model invocation. Here's my serving function builder and my signature:

def _get_serve_raw_fn(model, tf_transform_output):
    logging.info('_get_serve_raw_fn')
    model.tft_layer = tf_transform_output.transform_features_layer()

    input_layers = {
        colname: Input(name=colname, shape=(
            constants.INPUT_WINDOW_SIZE), dtype=tf.float32) # placeholder for the serving fn
        for colname in features.transformed_names(
            features.DENSE_FLOAT_FEATURE_KEYS)
    } # (o h l c v), post-transform layer

    # input_layers_expand_dim = {k: K.reshape(v, (1, constants.INPUT_WINDOW_SIZE)) for k, v in input_layers.items()}
    # logging.info(input_layers_expand_dim)
    pre_model_input = Concatenate(axis=-1)(input_layers.values())
    pre_model_input = Reshape(target_shape=(constants.INPUT_WINDOW_SIZE, <feature size>))(
        pre_model_input)  # (None, input_window_size, <feature size>)

    pre_input_model = tf.keras.Model(input_layers, pre_model_input)
    model.pre_input = pre_input_model

    @tf.function
    def serve_raw_fn(feature_1, feature_2):
        # map to tft_layer inputs
        parsed_features = {'Feature_1' : feature_1, # unnecessary json mapping that could be inferred by tf.serving
                           'Feature_2' : feature_2}

        transformed_features = model.tft_layer(parsed_features)

        # convert each transformed_feature from (window,) to (None, window) to compensate for bs
        batched_features = {
            k: K.reshape(v, [-1, constants.INPUT_WINDOW_SIZE]) # add none dimension
            for k,v in transformed_features.items()
        }

        pre_input_features = model.pre_input(batched_features)
        return model(pre_input_features)        

    return serve_raw_fn
        'serving_raw': serving_raw_entry.get_concrete_function(
                    tf.TensorSpec(shape=[None], dtype=tf.float32, name='feature_1'), # unnecessary json mapping here as well -- just use named tft_layer inputs by default, even by not using tf.Example
                    tf.TensorSpec(shape=[None], dtype=tf.float32, name='feature_2') # this could be WINDOW instead of None, but dimensions of everything else break

More on type-safety and signatures: The shape can't be exposed on the tf.Example data as well--but this is compensated by using the schema on the protocol buffers (but there's the issue of dimensions with the None, of the tf transform again. Altough, this could be fixable by generating a specific schema with the model size after training, before saving the model for training (still a workaround).

Sidenote: if someone got a specific problem, you can workaround it by using a faketf.function to expose some input or output metadata to use on serving, if you've got a model client that can produce features of arbitrary window size easily (thus depending on the model to require a specific window size).

 @tf.function
    def _io_spec(x):
        return x

#on signatures
{ 
        'input_window_size': _io_spec.get_concrete_function(
            tf.TensorSpec(
                    shape=[constants.INPUT_WINDOW_SIZE],
                    dtype=tf.float32,
                    name='input_window_size')
        )
}

Other micro-issues

ntakouris commented 4 years ago

Update: It seems that TF 2.3.0 added tf.keras.preprocessing.timeseries_dataset_from_array

With some extra handy other features like sampling, etc. Still not part of input pipelines with multiple split tfrecord files. For big time-series in multiple tf-record files you'd need to merge partN[window:] + partN+1[:window] to avoid any data loss, if you are using the preprocessing pipeline I provided below (to make the window+1 index be the label). The other problems stated still exist.

Mar-ai commented 3 years ago

Since this Issue is entitled "Improving time series forecasting and similar tasks" I'd like to mention a problem we've encountered while working on adapting the TFX pipeline for a time series problem: As described here there are no guarantees that the tf.Examples which are generated and used in the TFX pipeline are maintained in a specific order. This is quite a fundamental problem, as we need the data to be ordered (by timestamp or similar) so the windows which are generated from the data are meaningful.

We're currently looking into feeding already windowed data into ExampleGen as suggested as a workaround, but of course it'd would be great if this issue could be solved within TFX.

ntakouris commented 3 years ago

I did write some blog post the other day on how to efficiently use tf.data.Dataset to preserve data integrity across multiple tfrecord files. I won't go into details but you can read more about it at:

“Advanced Tensorflow Data Input Pipelines: Handling Time Series” https://link.medium.com/Y5SbNJF4vbb

But a requirement for that is to split inputs into multiple tfrecord files as an ordered series. Ex. Record 1 is from row 0 - N, Record 2 is from row N+1 to M and so on. I'm not sure if the tfx example gen or preprocessing stage preserves order and I did not have time to test it (multiple workers on beam) at this stage.

Also, features like having the evaluator use an input function as well are still missing.

Performance note: I'm not sure if partitioning such dataset across multiple files can improve performance (so that loading this from rotational drives on a distributed file system is on par with pushing everything into a super fast nvme drive). Currently I just use a big ~20 GB (and growing about 0.5GB per month) tf record file.

These things can get much easier with some utility functions from the tfx ecosystem.

tomalbrecht commented 3 years ago

As far as I understand combining this Issue with SequenceExample should do the trick without the need to split into multiple tfrecord files. Am I right?

I am still looking for a working example to feed a trainer with sliding windows (via datasets). I couldn't figure out yet, how to use SequenceExample with datasets. Does anyone have a working example (not a NLP one, but TimeSeries)?

ntakouris commented 3 years ago

I disagree with SequenceExample. I do not want to windowize my data beforehand as it is easily doable with an input function if the files are read in order (even if the data points are sharded in small files--in order).

Suppose you fix ExampleGen to work deterministically with huge amounts of data and read everything in order (even with a single thread).

There are still some parts missing:

FernandoDorado commented 2 years ago

I share my concerns with @ntakouris , and I am having many problems using TFX handling time series, especially in the context of dimensionality. I would appreciate more detail in the documentation or a complete example.

ntakouris commented 2 years ago

so stale, that I moved to pytorch