Open ValterNobrega opened 7 months ago
Dear Valter,
Thank you for raising your issue. Could you send a minimal code snippet that I can run that gets you the error? Say two blocks that you aggregate and compute a nonlinear transition for? And then maybe also show the full error message?
Hopefully we can figure it out then!
Ludwig
Dear Ludwig
Thank you for your reply.
The model that I am trying to do is a neoclassical model with heterogeneous labor supply and permanent heterogeneity across discount factors and permanent ability.
To do so, I imported the household block with labor choice:
hh = hetblocks.hh_labor.hh
and then added the corresponding hetoutputs and hetinputs:
def utility(c, eis, vphi, n, frisch):
u = (c ** (1 - 1/eis) / (1 - 1/eis) ) - vphi * ((n ** (1 + frisch)) / (1 + frisch))
return u
def make_grids(rho_s, sigma_s, nS, amax, amin, nA):
e_grid, Pi, pi_e = utils.tauchen_hans(sigma_s, rho_s, nS)
a_grid = utils.a_grid_fortran(nx=nA, convex_param=4.0, ubound=amax, lbound=amin) # Function that mimics the grids from the Fortran Code
return e_grid, pi_e, Pi, a_grid
def wages(w, e_grid, tau_l, gamma0, ability):
we = (1 - tau_l) * w * np.exp(gamma0 + ability + e_grid)
return we
def labor_supply(n, e_grid, gamma0, w, ability):
ne = (np.exp(gamma0 + ability + e_grid))[:, np.newaxis] * n
av_e = (w * np.exp(gamma0 + ability + e_grid))[:, np.newaxis] * n
return ne, av_e
def transfers(Tax, e_grid):
T = -Tax / np.ones(len(e_grid))
return T
household = hh.add_hetinputs([transfers, wages, make_grids])
household = household.add_hetoutputs([labor_supply, utility])
Then, I add firms and government, and the corresponding market clearing conditions:
@simple
def firm(K, L, alpha, delta):
r = alpha * (K(-1) / L) ** (alpha-1) - delta
w = (1 - alpha) * (K(-1) / L) ** alpha
Y = K(-1) ** alpha * L ** (1 - alpha)
return r, w, Y
@simple
def mkt_clearing(A, NE, G, C, L, Y, B, delta, K):
asset_mkt = A - B - K
labor_mkt = NE - L
I = K - (1 - delta) * K(-1)
goods_mkt = Y - C - G - I
return asset_mkt, labor_mkt, goods_mkt
@simple
def fiscal(r, B, G, tau_l, w, L):
Tax = (1 + r) * B(-1) + G - B - tau_l * w * L
return Tax
Since TFP is normalized to 1, and we need to calibrate after K/Y, I created a new block for those ratios:
@simple
def macro_ratios(Y, G, B, Tax, K):
G_Y = G / Y
B_Y = B / Y
T_Y = Tax / Y
K_Y = K / Y
return G_Y, B_Y, T_Y, K_Y
After all this, I added the ingredients for the heterogeneity. Since I am trying to replicate a model with 3 betas and 2 different permanenent abilities, what I did was to aggregate them in the same way as in the KS example.
ability_grid = utils.tauchen_hans(0.712, 0, 2)[0]
hh_list = []
to_map = ['beta', 'ability', *household.outputs]
for i in range(1, 4): # Loop for beta variations
for j in range(1, len(ability_grid) + 1): # Loop for ability variations
beta_suffix = f'_beta{i}'
ability_suffix = f'_ability{j}'
remapped_keys = {k: k + beta_suffix + ability_suffix for k in to_map}
renamed_key = f'hh_beta{i}_ability{j}'
new_hh = household.remap(remapped_keys).rename(renamed_key)
hh_list.append(new_hh)
@simple
def aggregate(A_beta1_ability1, A_beta1_ability2,
A_beta2_ability1, A_beta2_ability2,
A_beta3_ability1, A_beta3_ability2,
C_beta1_ability1, C_beta1_ability2,
C_beta2_ability1, C_beta2_ability2,
C_beta3_ability1, C_beta3_ability2,
NE_beta1_ability1, NE_beta1_ability2,
NE_beta2_ability1, NE_beta2_ability2,
NE_beta3_ability1, NE_beta3_ability2,
N_beta1_ability1, N_beta1_ability2,
N_beta2_ability1, N_beta2_ability2,
N_beta3_ability1, N_beta3_ability2,
sigma_a, nAB):
# This has to be done by hand
mass_ability = utils.tauchen_hans(sigma_a, 0, nAB)[2]
# For A
A_beta1 = A_beta1_ability1 * mass_ability[0] + \
A_beta1_ability2 * mass_ability[1]
A_beta2 = A_beta2_ability1 * mass_ability[0] + \
A_beta2_ability2 * mass_ability[1]
A_beta3 = A_beta3_ability1 * mass_ability[0] + \
A_beta3_ability2 * mass_ability[1]
# For C
C_beta1 = C_beta1_ability1 * mass_ability[0] + \
C_beta1_ability2 * mass_ability[1]
C_beta2 = C_beta2_ability1 * mass_ability[0] + \
C_beta2_ability2 * mass_ability[1]
C_beta3 = C_beta3_ability1 * mass_ability[0] + \
C_beta3_ability2 * mass_ability[1]
# For NE
NE_beta1 = NE_beta1_ability1 * mass_ability[0] + \
NE_beta1_ability2 * mass_ability[1]
NE_beta2 = NE_beta2_ability1 * mass_ability[0] + \
NE_beta2_ability2 * mass_ability[1]
NE_beta3 = NE_beta3_ability1 * mass_ability[0] + \
NE_beta3_ability2 * mass_ability[1]
# For N
N_beta1 = N_beta1_ability1 * mass_ability[0] + \
N_beta1_ability2 * mass_ability[1]
N_beta2 = N_beta2_ability1 * mass_ability[0] + \
N_beta2_ability2 * mass_ability[1]
N_beta3 = N_beta3_ability1 * mass_ability[0] + \
N_beta3_ability2 * mass_ability[1]
A = (A_beta1 + A_beta2 + A_beta3) / 3
C = (C_beta1 + C_beta2 + C_beta3) / 3
NE = (NE_beta1 + NE_beta2 + NE_beta3) / 3
N = (N_beta1 + N_beta2 + N_beta3) / 3
return C, A, NE, N
Then, since the betas parameters that I use for calibration, in order to avoid having to write "beta_beta1_ability_1", etc..., I added a simple block that maps the values from the calibration dictionary to the household block:
@simple
def betas(beta1, beta2, beta3):
# Beta 1
beta_beta1_ability1 = beta1
beta_beta1_ability2 = beta1
# Beta 2
beta_beta2_ability1 = beta2
beta_beta2_ability2 = beta2
# Beta 3
beta_beta3_ability1 = beta3
beta_beta3_ability2 = beta3
return beta_beta1_ability1, beta_beta1_ability2, beta_beta2_ability1, beta_beta2_ability2, beta_beta3_ability1, beta_beta3_ability2
and to help to build the calibration dictionary, I created a function to add each of the new values abilities and betas (flexible to the number of points in the ability grid):
def parameters(beta_values, sigma_a, nAb):
ability_grid = utils.tauchen_hans(sigma_a, 0, nAb)[0]
parameter_values = {}
# Loop over beta indices
for i, beta in enumerate(beta_values, start=1):
# Loop over ability indices
for j in range(1, nAb + 1):
# Construct variable names
beta_variable_name = f'beta_beta{i}_ability{j}'
ability_variable_name = f'ability_beta{i}_ability{j}'
# Assign value to variables
parameter_values[beta_variable_name] = beta
parameter_values[ability_variable_name] = ability_grid[j - 1]
# Return variable names and values as a dictionary
return parameter_values
# Steady state
calibration.update(parameters([calibration['beta1'], calibration['beta2'], calibration['beta3']],calibration['sigma_a'], calibration['nAB']))
So the model is created as: neoclassic = create_model([*hh_list, firm, fiscal, aggregate, betas, mkt_clearing, macro_ratios], name="Neoclassical")
For this calibration, we get a steady state:
# Steady state
calibration = {'eis': 1.2**(-1),
'frisch': 1.0,
'delta': 0.015,
'alpha': 0.33,
'rho_s': 0.761,
'sigma_s': 0.211,
'nS': 5,
'nA': 200,
'amax': 400,
'amin': -1.7,
'gamma0': 0.,
'phi_tau': 0.1,
'beta1': 0.9858655,
'beta2': 0.9872655,
'beta3': 0.9882,
'sigma_a': 0.712,
'nAB': 2}
calibration.update(parameters([calibration['beta1'], calibration['beta2'], calibration['beta3']],calibration['sigma_a'], calibration['nAB']))
unknowns_ss = {'K': 18.34,
'L': 0.4337,
'vphi': 10,
'G': 0.22,
'B': 2.56,
'tau_l': 0.35,
'beta3': 0.9882}
targets_ss = {'asset_mkt': 0.,
'labor_mkt': 0.,
'N': 0.248,
'T_Y': -0.07,
'B_Y': 1.714,
'G_Y': 0.15,
'K_Y': 12.292}
ss_cal = neoclassic.solve_steady_state(calibration, unknowns_ss, targets_ss, solver='broyden_custom',
ttol=1e-5, solver_kwargs={'maxcount': 10000})
Then, if I apply a nonlinear MIT shock on G, the model also converges:
# setup
T = 300
exogenous = ['G']
unknowns = ['K', 'L']
targets = ['asset_mkt', 'labor_mkt']
rho = 0.975
dG = 0.01 * ss_cal['Y'] * rho ** (np.arange(T))
td_nonlin = neoclassic.solve_impulse_nonlinear(ss_cal, unknowns, targets, {"G": dG})
However, if I try to add the line "ss_initial", it does not run.
If I give the model the same steady state:
td_nonlin = neoclassic.solve_impulse_nonlinear(ss_cal, unknowns, targets, {"G": dG}, ss_initial=ss_cal)
I get the error message: ValueError: max() arg is an empty sequence
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
Cell In[20], line 11
8 dG = 0.01 * ss_cal['Y'] * rho ** (np.arange(T))
10 # general equilibrium jacobians
---> 11 td_nonlin = neoclassic.solve_impulse_nonlinear(ss_cal, unknowns, targets, {"G": dG}, ss_initial=ss_cal)
File D:\Projects\venv\lib\site-packages\sequence_jacobian\blocks\block.py:197, in Block.solve_impulse_nonlinear(self, ss, unknowns, targets, inputs, outputs, internals, Js, options, H_U_factored, ss_initial, **kwargs)
195 print(f'Solving {self.name} for {unknowns} to hit {targets}')
196 for it in range(opts['maxit']):
--> 197 results = self.impulse_nonlinear(ss, inputs | U, actual_outputs | targets, internals, Js, options, ss_initial, **kwargs)
198 errors = {k: np.max(np.abs(results[k])) for k in targets}
199 if opts['verbose']:
File D:\Projects\venv\lib\site-packages\sequence_jacobian\blocks\block.py:66, in Block.impulse_nonlinear(self, ss, inputs, outputs, internals, Js, options, ss_initial, **kwargs)
61 actual_outputs, inputs_as_outputs = self.process_outputs(ss,
62 self.make_ordered_set(inputs), self.make_ordered_set(outputs))
64 if isinstance(self, Parent):
65 # SolvedBlocks may use Js and may be nested in a CombinedBlock, so we need to pass them down to any parent
---> 66 out = self.M @ self._impulse_nonlinear(self.M.inv @ ss, self.M.inv @ inputs, self.M.inv @ actual_outputs, internals, Js, options, self.M.inv @ ss_initial, **own_options)
67 elif hasattr(self, 'internals'):
68 out = self.M @ self._impulse_nonlinear(self.M.inv @ ss, self.M.inv @ inputs, self.M.inv @ actual_outputs, self.internals_to_report(internals), self.M.inv @ ss_initial, **own_options)
File D:\Projects\venv\lib\site-packages\sequence_jacobian\blocks\combined_block.py:75, in CombinedBlock._impulse_nonlinear(self, ss, inputs, outputs, internals, Js, options, ss_initial)
70 input_args = {k: v for k, v in impulses.items() if k in block.inputs}
72 if input_args or ss_initial is not None:
73 # If this block is actually perturbed, or we start from different initial ss
74 # TODO: be more selective about ss_initial here - did any inputs change that matter for this one block?
---> 75 impulses.update(block.impulse_nonlinear(ss, input_args, outputs & block.outputs, internals, Js, options, ss_initial))
77 return ImpulseDict({k: impulses.toplevel[k] for k in original_outputs if k in impulses.toplevel}, impulses.internals, impulses.T)
File D:\Projects\venv\lib\site-packages\sequence_jacobian\blocks\block.py:60, in Block.impulse_nonlinear(self, ss, inputs, outputs, internals, Js, options, ss_initial, **kwargs)
57 """Calculate a partial equilibrium, non-linear impulse response of `outputs` to a set of shocks in `inputs`
58 around a steady state `ss`."""
59 own_options = self.get_options(options, kwargs, 'impulse_nonlinear')
---> 60 inputs = ImpulseDict(inputs)
61 actual_outputs, inputs_as_outputs = self.process_outputs(ss,
62 self.make_ordered_set(inputs), self.make_ordered_set(outputs))
64 if isinstance(self, Parent):
65 # SolvedBlocks may use Js and may be nested in a CombinedBlock, so we need to pass them down to any parent
File D:\Projects\venv\lib\site-packages\sequence_jacobian\classes\impulse_dict.py:22, in ImpulseDict.__init__(self, data, internals, T)
20 raise ValueError('ImpulseDicts are initialized with a `dict` of top-level impulse responses.')
21 super().__init__(data, internals)
---> 22 self.T = (T if T is not None else self.infer_length())
File D:\Projects\venv\lib\site-packages\sequence_jacobian\classes\impulse_dict.py:100, in ImpulseDict.infer_length(self)
98 def infer_length(self):
99 lengths = [len(v) for v in self.toplevel.values()]
--> 100 length = max(lengths)
101 if length != min(lengths):
102 raise ValueError(f'Building ImpulseDict with inconsistent lengths {max(lengths)} and {min(lengths)}')
ValueError: max() arg is an empty sequence
I believe the problem is precisely in one of the simple blocks I added, for example the "betas" block. What I did to overcome this problem was to create a "steady state model" with the "betas" block, and then run the "neoclassical" model without it, similar to what is on the "two asset" notebook, and it worked.
neoclassic_ss = create_model([*hh_list, firm, fiscal, aggregate, betas, mkt_clearing, macro_ratios], name="Neoclassical SS")
neoclassic = create_model([*hh_list, firm, fiscal, aggregate, mkt_clearing, macro_ratios], name="Neoclassical")
ss_1 = neoclassic_ss.solve_steady_state(calibration, unknowns_ss, targets_ss, solver='broyden_custom',
ttol=1e-5, solver_kwargs={'maxcount': 10000})
ss_cal = neoclassic.steady_state(ss_1)
However, for the purpose of my learning process, I would like to know and understand why the nonlinear impulse responses work without the 'ss_initial' argument when I run the code above, and with that, it doesn't.
Hi everyone.
I'm attempting to apply a permanent shock in a Neoclassical Model with heterogeneous labor supply and variation across discount factors and permanent abilities.
The model functions perfectly for a transitory fiscal shock. However, when it comes to the permanent fiscal shock, I encounter an issue. After calculating the new steady state following the shock, which I've termed "_ssterminal," I use the following code:
I consistently receive the following error: ValueError: max() arg is an empty sequence. It seems that the code fails to recognize any inputs when applying the _ssinitial option in the _CombinedBlock.impulsenonlinear function:
To investigate whether this issue is related to changes in ss_terminal, I attempted to use the same code but with the same steady state:
However, the error persists.
I verified this code with the option '_ssinitial' in a model lacking heterogeneity across discount factors and permanent abilities, and it worked flawlessly. To incorporate these factors, I created nb (number of betas) * na (number of abilities) het_blocks, then aggregated them all with respect to their weights (that I know).
What puzzles me is that everything has functioned smoothly thus far for transitory shocks. When I apply the code without the option 'ss_initial', it works fine. The problem only arises when I use this option, which is necessary for the permanent shock.
Do you have an idea where the problem might be?
Thank you Valter Nóbrega