Open dustinvtran opened 6 years ago
This implementation of the graded response model seems to be operational - feel free to adapt this if it is helpful: https://colab.research.google.com/drive/1ZAmkuQF2XoV0Jy9_EJtGTJrD3Tn4UlAJ
"""
Probabilities for a single person
discriminations I (item)
difficulties I x K-1
abilities N
returns
prob N x I x K
"""
@tf.function
def grm_model_prob(abilities, discriminations, difficulties, expanded=False):
offsets = difficulties[tf.newaxis,:,:] - abilities[:, tf.newaxis, tf.newaxis]
scaled = offsets*discriminations[tf.newaxis, :, tf.newaxis]
logits = 1.0/(1+tf.exp(scaled))
logits = tf.pad(logits, paddings=(
(0, 0), (0, 0), (1, 0)), mode='constant', constant_values=1.)
logits = tf.pad(logits, paddings=(
(0, 0), (0, 0), (0, 1)), mode='constant', constant_values=0.)
probs = logits[:, :, :-1] - logits[:, :, 1:]
return probs
"""
Probabilities for single items
discriminations I (item)
difficulties I x K - 1
abilities N
mu (difficulty local) I
tau (difficulty) I x K-2
"""
@tf.function
def joint_log_prob(responses, discriminations, difficulties0, ddifficulties, abilities, mu):
# assemble the difficulties
d0 = tf.concat([difficulties0[:,tf.newaxis],ddifficulties],axis=1)
difficulties = tf.cumsum(d0,axis=1)
return tf.reduce_sum(log_likelihood(responses, discriminations, difficulties, abilities)) + \
joint_log_prior(discriminations, difficulties0, ddifficulties, abilities, mu)
@tf.function
def log_likelihood(responses, discriminations, difficulties, abilities):
rv_responses = tfd.Categorical(grm_model_prob(
abilities, discriminations, difficulties))
return rv_responses.log_prob(responses)
@tf.function
def joint_log_prior(discriminations, difficulties0, ddifficulties, abilities, mu):
rv_discriminations = tfd.HalfNormal(scale=tf.ones_like(discriminations))
rv_difficulties0 = tfd.Normal(loc=mu, scale=1.*tf.ones_like(difficulties0))
rv_ddifficulties = tfd.HalfNormal(scale=tf.ones_like(ddifficulties))
rv_abilities = tfd.Normal(loc=tf.zeros_like(abilities), scale=1.)
rv_mu = tfd.Normal(loc=tf.zeros_like(mu),scale=1.)
return tf.reduce_sum(rv_discriminations.log_prob(discriminations)) + \
tf.reduce_sum(rv_difficulties0.log_prob(difficulties0)) + \
tf.reduce_sum(rv_ddifficulties.log_prob(ddifficulties)) + \
tf.reduce_sum(rv_abilities.log_prob(abilities))
difficulties0 = np.sort(np.random.normal(size=(I, K-1)), axis=1)
abilities0 = np.random.normal(size=N)
# Set the chain's start state.
initial_chain_state = [
tf.ones((I), name='init_discriminations'),
tf.cast(difficulties0[:,0],tf.float32, name='init_difficulties0'), # may be causing problems
tf.cast(difficulties0[:,1:]-difficulties0[:,:-1],tf.float32, name='init_ddifficulties'),
tf.cast(abilities0,tf.float32, name='init_abilities'),
tf.zeros((I),name='init_mu')
]
# Since MCMC operates over unconstrained space, we need to transform the
# samples so they live in real-space.
unconstraining_bijectors = [
tfp.bijectors.Softplus(), # R^+ \to R
tfp.bijectors.Identity(), # Maps R to R.
tfp.bijectors.Softplus(),
tfp.bijectors.Identity(), # Maps R to R.
tfp.bijectors.Identity() # Maps R to R.
]
# Define a closure over our joint_log_prob.
unnormalized_posterior_log_prob = tf.function(lambda *args: joint_log_prob(data, *args))
# Initialize the step_size. (It will be automatically adapted.)
number_of_steps=10000
burnin=6000
num_leapfrog_steps=4
hmc=tfp.mcmc.TransformedTransitionKernel(
inner_kernel=tfp.mcmc.HamiltonianMonteCarlo(
target_log_prob_fn=unnormalized_posterior_log_prob,
num_leapfrog_steps=num_leapfrog_steps,
step_size=0.1,
state_gradients_are_stopped=True),
bijector=unconstraining_bijectors)
mh = tfp.mcmc.TransformedTransitionKernel(
inner_kernel=tfp.mcmc.RandomWalkMetropolis(
target_log_prob_fn=unnormalized_posterior_log_prob),
bijector=unconstraining_bijectors)
kernel = tfp.mcmc.SimpleStepSizeAdaptation(
inner_kernel=hmc, num_adaptation_steps=int(burnin * 0.8))
def trace_everything(states, previous_kernel_results):
return previous_kernel_results
[discriminations,difficulties0,ddifficulties,abilities,mu], kernel_results = tfp.mcmc.sample_chain(
num_results=number_of_steps,
num_burnin_steps=burnin,
current_state=initial_chain_state,
trace_fn=trace_everything,
kernel=kernel)
with tf.device('/device:GPU:0'):
[
discriminations_,
difficulties0_,
ddifficulties_,
abilities_,mu_,
kernel_results_
] = evaluate(
[discriminations,
difficulties0,
ddifficulties,
abilities,
mu,
kernel_results
])
TODO: variational inference - the Stan black-box VB works well for this model
See also