Closed shuvayan closed 3 weeks ago
With sklearn, you can always make a custom regression. You will just have to hide the MMM class in the fit, predict of the child class
Hello @wd60622 ,
I think I understand. It will be something like the below
from typing import Any, Dict, List, Tuple, Callable
import pandas as pd
import pymc as pm
from pymc_marketing.mmm import MMM
import arviz as az
class PyMCPipeline:
def __init__(
self,
steps: List[Tuple[str, Callable]],
mmm: MMM,
) -> None:
"""
Initialize the PyMCPipeline with preprocessing steps and a PyMC Marketing MMM model.
Parameters
----------
steps : List[Tuple[str, Callable]]
List of (name, transform) tuples that are applied sequentially.
mmm : MMM
An instance of the MMM model from pymc_marketing.
"""
self.steps = steps
self.mmm = mmm
self.model = None
self.trace = None
def fit(self, X: pd.DataFrame, y: pd.Series) -> 'PyMCPipeline':
"""
Fit the pipeline, including all preprocessing steps and the PyMC model.
Parameters
----------
X : pd.DataFrame
The input features, including channel and control variables.
y : pd.Series
The target variable.
Returns
-------
self : PyMCPipeline
The fitted pipeline.
"""
# Apply each step sequentially
for name, transform in self.steps:
X = transform.fit_transform(X)
# Build the PyMC model
self.mmm.build_model(X, y)
# Sample from the posterior distribution
self.trace = self.mmm.sample()
return self
def predict(self, X: pd.DataFrame) -> az.InferenceData:
"""
Generate predictions using the fitted PyMC model.
Parameters
----------
X : pd.DataFrame
The input features, including channel and control variables.
Returns
-------
az.InferenceData
Posterior predictive samples.
"""
if self.trace is None:
raise RuntimeError("The model must be fitted before making predictions.")
# Apply preprocessing steps
for name, transform in self.steps:
X = transform.transform(X)
# Generate predictions using posterior predictive sampling
return self.mmm.sample_posterior_predictive(X_pred=X)
def fit_predict(self, X: pd.DataFrame, y: pd.Series) -> az.InferenceData:
"""
Fit the pipeline and then generate predictions.
Parameters
----------
X : pd.DataFrame
The input features, including channel and control variables.
y : pd.Series
The target variable.
Returns
-------
az.InferenceData
Posterior predictive samples.
"""
return self.fit(X, y).predict(X)
Closing this!
Hi @shuvayan,
Think that might work. I was thinking a bit more like their recommended workflow of creating RegressorMixin
subclass which then works in the pipeline.
class MMMRegressor(RegressorMixin, BaseEstimator):
"""Wrapper to allow the MMM to work in a scikit-learn pipeline."""
pipeline = Pipeline([
("preprocessor", preprocessor),
("mmm", MMMRegressor())
])
Ref: https://scikit-learn.org/stable/modules/generated/sklearn.base.RegressorMixin.html
Also, a tip: you can use back ticks and specify the language to get syntax highlighting in GitHub. Makes it a lot easier to read the code snippets!
Hello @wd60622 ,
I think I get it. It will be something like below:
from sklearn.base import BaseEstimator, TransformerMixin, RegressorMixin
from sklearn.pipeline import Pipeline
import pandas as pd
import numpy as np
import pymc as pm
import arviz as az
import matplotlib.pyplot as plt
import warnings
class MMM(BaseEstimator, RegressorMixin):
def __init__(
self,
date_column: str,
channel_columns: list[str],
adstock,
saturation,
control_columns: list[str] | None = None,
yearly_seasonality: int | None = None,
adstock_first: bool = True
) -> None:
self.date_column = date_column
self.channel_columns = channel_columns
self.adstock = adstock
self.saturation = saturation
self.control_columns = control_columns or []
self.yearly_seasonality = yearly_seasonality
self.adstock_first = adstock_first
def fit(self, X: pd.DataFrame, y: pd.Series) -> "MMM":
"""Fit the MMM model to the data."""
# Initialize and build the model here
self.X = X
self.y = y
self.build_model(X, y)
return self
def predict(self, X: pd.DataFrame) -> np.ndarray:
"""Predict using the MMM model."""
# Ensure the model has been built and perform prediction
if not hasattr(self, "model"):
raise RuntimeError("The model has not been built yet.")
return self.sample_posterior_predictive(X).mean(dim="sample").values
def add_lift_test_measurements(
self,
df_lift_test: pd.DataFrame,
dist: type[pm.Distribution] = pm.Gamma,
name: str = "lift_measurements",
) -> None:
"""Add lift tests to the model."""
if not hasattr(self, "model"):
raise RuntimeError("The model has not been built yet.")
if "channel" not in df_lift_test.columns:
raise KeyError("The 'channel' column is required to map the lift measurements to the model.")
df_lift_test_scaled = scale_lift_measurements(
df_lift_test=df_lift_test,
channel_col="channel",
channel_columns=self.channel_columns, # type: ignore
channel_transform=self.channel_transformer.transform,
target_transform=self.target_transformer.transform,
)
time_varying_var_name = (
"media_temporal_latent_multiplier" if self.time_varying_media else None
)
add_lift_measurements_to_likelihood_from_saturation(
df_lift_test=df_lift_test_scaled,
saturation=self.saturation,
time_varying_var_name=time_varying_var_name,
model=self.model,
dist=dist,
name=name,
)
def _create_synth_dataset(
self,
df: pd.DataFrame,
date_column: str,
allocation_strategy: dict[str, float],
channels: list[str] | tuple[str],
controls: list[str] | None,
target_col: str,
time_granularity: str,
time_length: int,
lag: int,
noise_level: float = 0.01,
) -> pd.DataFrame:
"""Create a synthetic dataset based on the given allocation strategy (Budget) and time granularity."""
time_offsets = {
"daily": {"days": 1},
"weekly": {"weeks": 1},
"monthly": {"months": 1},
"quarterly": {"months": 3},
"yearly": {"years": 1},
}
if time_granularity not in time_offsets:
raise ValueError("Unsupported time granularity. Choose from 'daily', 'weekly', 'monthly', 'quarterly', 'yearly'.")
if controls is not None:
_controls: list[str] = controls
else:
controls = []
last_date = pd.to_datetime(df[date_column]).max()
new_dates = []
for i in range(1, time_length + 1):
if time_granularity == "daily":
new_date = last_date + pd.DateOffset(days=i)
elif time_granularity == "weekly":
new_date = last_date + pd.DateOffset(weeks=i)
elif time_granularity == "monthly":
new_date = last_date + pd.DateOffset(months=i)
elif time_granularity == "quarterly":
new_date = last_date + pd.DateOffset(months=3 * i)
elif time_granularity == "yearly":
new_date = last_date + pd.DateOffset(years=i)
new_dates.append(new_date)
new_rows = [
{
self.date_column: pd.to_datetime(new_date),
**{
channel: allocation_strategy.get(channel, 0)
+ np.random.normal(
0, noise_level * allocation_strategy.get(channel, 0)
)
for channel in channels
},
**{control: 0 for control in _controls},
target_col: 0,
}
for new_date in new_dates
]
return pd.DataFrame(new_rows)
def allocate_budget_to_maximize_response(
self,
budget: float | int,
time_granularity: str,
num_periods: int,
budget_bounds: dict[str, tuple[float, float]] | None = None,
custom_constraints: dict[str, float] | None = None,
quantile: float = 0.5,
noise_level: float = 0.01,
) -> az.InferenceData:
"""Allocate the given budget to maximize the response over a specified time period."""
parameters_mid = self.format_recovered_transformation_parameters(quantile=quantile)
allocator = BudgetOptimizer(
adstock=self.adstock,
saturation=self.saturation,
parameters=parameters_mid,
adstock_first=self.adstock_first,
num_periods=num_periods,
scales=self.channel_transformer["scaler"].scale_,
)
self.optimal_allocation_dict, _ = allocator.allocate_budget(
total_budget=budget,
budget_bounds=budget_bounds,
custom_constraints=custom_constraints,
)
synth_dataset = self._create_synth_dataset(
df=self.X,
date_column=self.date_column,
allocation_strategy=self.optimal_allocation_dict,
channels=self.channel_columns,
controls=self.control_columns,
target_col=self.output_var,
time_granularity=time_granularity,
time_length=num_periods,
lag=self.adstock.l_max,
noise_level=noise_level,
)
return self.sample_posterior_predictive(
X_pred=synth_dataset,
extend_idata=False,
include_last_observations=True,
original_scale=False,
var_names=["y", "channel_contributions"],
progressbar=False,
)
def plot_budget_allocation(
self,
samples: Dataset,
figsize: tuple[float, float] = (12, 6),
ax: plt.Axes | None = None,
original_scale: bool = True,
) -> tuple[plt.Figure, plt.Axes]:
"""Plot the budget allocation and channel contributions."""
if original_scale:
channel_contributions = (
samples["channel_contributions"]
.mean(dim=["sample"])
.mean(dim=["date"])
.values
* self.get_target_transformer()["scaler"].scale_
)
allocate_spend = (
np.array(list(self.optimal_allocation_dict.values()))
* self.channel_transformer["scaler"].scale_
)
else:
channel_contributions = (
samples["channel_contributions"]
.mean(dim=["sample"])
.mean(dim=["date"])
.values
)
allocate_spend = np.array(list(self.optimal_allocation_dict.values()))
if ax is None:
fig, ax = plt.subplots(figsize=figsize)
else:
fig = ax.get_figure()
bar_width = 0.35
opacity = 0.7
index = np.arange(len(self.channel_columns))
bars1 = ax.bar(
index,
allocate_spend,
bar_width,
color="b",
alpha=opacity,
label="Allocate Spend",
)
ax2 = ax.twinx()
bars2 = ax2.bar(
index + bar_width,
channel_contributions,
bar_width,
color="r",
alpha=opacity,
label="Channel Contributions",
)
ax.set_xlabel("Channels")
ax.set_ylabel("Allocate Spend", color="b")
ax.tick_params(axis="x", rotation=90)
ax.set_xticks(index + bar_width / 2)
ax.set_xticklabels(self.channel_columns)
ax.legend(loc="upper left")
ax2.set_ylabel("Channel Contributions", color="r")
ax2.legend(loc="upper right")
plt.tight_layout()
return fig, ax
def plot_allocated_contribution_by_channel(
self,
samples: Dataset,
figsize: tuple[float, float] = (12, 6),
ax: plt.Axes | None = None,
original_scale: bool = True,
) -> tuple[plt.Figure, plt.Axes]:
"""Plot the allocated contribution by channel."""
if original_scale:
channel_contributions = (
samples["channel_contributions"]
.mean(dim=["sample"])
.mean(dim=["date"])
.values
* self.get_target_transformer()["scaler"].scale_
)
else:
channel_contributions = (
samples["channel_contributions"]
.mean(dim=["sample"])
.mean(dim=["date"])
.values
)
if ax is None:
fig, ax = plt.subplots(figsize=figsize)
else:
fig = ax.get_figure()
bar_width = 0.35
opacity = 0.7
index = np.arange(len(self.channel_columns))
bars = ax.bar(
index,
channel_contributions,
bar_width,
color="b",
alpha=opacity,
)
ax.set_xlabel("Channels")
ax.set_ylabel("Channel Contributions", color="b")
ax.tick_params(axis="x", rotation=90)
ax.set_xticks(index)
ax.set_xticklabels(self.channel_columns)
ax.legend(["Channel Contributions"], loc="upper left")
plt.tight_layout()
return fig, ax
def build_model(self, X: pd.DataFrame, y: pd.Series) -> None:
"""Build the PyMC model."""
# Implement the model building here
pass
def sample_posterior_predictive(self, X_pred: pd.DataFrame, **kwargs) -> az.InferenceData:
"""Sample from the posterior predictive distribution."""
# Implement the sampling here
pass
Kindly let me know if this is the right way to proceed?
Hello,
I was trying to implement a pipeline using the sklearn pipeline which would do the transformations if needed and then do the model fitting and posterior predictive sampling/predict with the MMM class. But it says that it is not compatible with the sklearn pipeline.
I understand why this is so but wanted to know if there is a way to do it?