Closed weinbe58 closed 1 year ago
should we store all the task or just store the assigned parameters? storing task seems unnecessary to me here? And how do you return this object from optimizer? I still don't see a concrete example on hybrid optimization since the last time we discussed it...
My worry is thing gets over-abstract here - you can just complete the same thing with a simple for loop with assign
, it's explicit and compatible with any external optimizer package
should we store all the task or just store the assigned parameters? storing task seems unnecessary to me here?
Yes, I think you're correct that storing the entire task is a bit overkill but storing the assigments is a necessary.
And how do you return this object from optimizer? I still don't see a concrete example on hybrid optimization since the last time we discussed it...
Perhaps @shsack can post his code in this issue?
My worry is thing gets over-abstract here - you can just complete the same thing with a simple for loop with
assign
, it's explicit and compatible with any external optimizer package
I am pretty sure that it is useful to have some object that is keeping a history of the parameters scanned by every kind of hybrid algorithm. I think this object would not be the hybrid algorithm but would be useful for organizing and executing a hybrid algorithm
Sure, here is a code example. After talking with @jon-wurtz yesterday we concluded to shift the tracking of parameters and bitstrings to the Optimizer class. That is a work in progress.
I do think it would be useful to also keep track of job ids, just to have all possible information. That can be very helpful in case anything stops the optimization loop and one wants to find out what happened and wants to restart from the last point.
from bloqade import piecewise_linear, start
from bloqade.ir.location import Square
import numpy as np
from scipy.optimize import minimize
from bloqade.builder.optimizer import Optimization
from bloqade.submission.capabilities import get_capabilities
import matplotlib.pyplot as plt
import networkx as nx
import datetime
import json
import warnings
warnings.filterwarnings('ignore')
def parameter_transform(duration_values, detuning_values):
# Transform parameter values to correct range
duration_values = np.abs(duration_values)
duration_values = (duration_values / np.sum(duration_values)) * total_time
detuning_values = det_min + (det_max - det_min) / (1 + np.exp(-detuning_values))
return duration_values, detuning_values
def are_neighbors(pos1, pos2):
dx = abs(float(pos1[0].value) - float(pos2[0].value))
dy = abs(float(pos1[1].value) - float(pos2[1].value))
return dx <= 5 and dy <= 5 and (dx + dy) > 0
def generate_and_plot_graph(mis_udg_program, bitstring=None, mis=False):
# Create a new graph
G = nx.Graph()
locations = mis_udg_program.program.register.location_list
# Maintain an ordered list of nodes
nodes = []
# Create dictionaries for colors and positions
colors = {}
pos = {}
# If bitstring is not provided, create a bitstring with all ones
if bitstring is None:
bitstring = np.ones(len(locations))
# Add nodes for all locations
for loc, bit in zip(locations, bitstring):
node = (int(loc.position[0].value), int(loc.position[1].value))
nodes.append(node)
G.add_node(node)
# Set the node color and position based on the filling
if loc.filling.value == 1:
colors[node] = 'red' if bit == 0 else 'black'
else:
colors[node] = 'gray'
pos[node] = node
# Add edges for neighboring filled locations
filled_locations = [loc for loc in locations if loc.filling.value == 1]
for loc1 in filled_locations:
for loc2 in filled_locations:
if loc1 != loc2 and are_neighbors(loc1.position, loc2.position):
G.add_edge((int(loc1.position[0].value), int(loc1.position[1].value)),
(int(loc2.position[0].value), int(loc2.position[1].value)))
# If MIS should be computed and displayed, compute it
if mis:
filled_nodes = [node for node, color in colors.items() if color != 'gray']
G_filled = G.subgraph(filled_nodes)
mis_set = nx.maximal_independent_set(G_filled)
for node in mis_set:
colors[node] = 'red'
# Get the color list in the same order as the nodes
color_list = [colors[node] for node in G.nodes]
# Draw the graph
nx.draw(G, pos, node_color=color_list, with_labels=False)
plt.show()
return nodes, G
def get_bitstrings(duration_values, detuning_values, q_hardware=False):
# Assign parameters for MIS program
kwargs_duration = {"duration_" + str(i+1): val for i, val in enumerate(duration_values)}
kwargs_detuning = {"detuning_" + str(i+1): val for i, val in enumerate(detuning_values)}
kwargs = {**kwargs_duration, **kwargs_detuning}
mis_udg_job = mis_udg_program.assign(**kwargs)
# Run on hardware or local simulator
if q_hardware == True:
hw_job = mis_udg_job.braket(num_shots).submit()
else:
hw_job = mis_udg_job.braket_local_simulator(num_shots).submit()
# TODO: Add some storage of the bitstrings
# Perhaps the best solution would be if we get a task id back
# which we can store and use later to get the result back
return np.array(hw_job.report().bitstrings[0])
def post_process_MIS(bitstrings):
post_processed_bitstrings = []
for bitstring in bitstrings:
inds = np.nonzero(bitstring==0)[0] # Find indices of IS vertices
# TODO: this should also check if the site is even occupied or not
nodes_to_check = [nodes[i] for i in inds] # Map indices to nodes
subgraph = nx.subgraph(G, nodes_to_check) # Generate a subgraph from those vertices.
inds2 = nx.maximal_independent_set(subgraph, seed=0)
post_processed_bitstring = np.array([0 if node in inds2 else 1 for node in nodes])
post_processed_bitstrings.append(post_processed_bitstring)
if len(post_processed_bitstrings) == 0:
raise ValueError("no independent sets found! increase number of shots.")
return np.asarray(post_processed_bitstrings)
def avg_MIS_size(bitstrings):
return (1-bitstrings).sum(axis=1).mean()
def cost_function(x, q_hardware=False):
duration_values = x[0:num_time_points]
detuning_values = x[num_time_points:]
duration_values, detuning_values = parameter_transform(duration_values, detuning_values)
bitstrings = get_bitstrings(duration_values, detuning_values, q_hardware)
post_processed_bitstrings = post_process_MIS(bitstrings)
# It is not clear to me how tracking of the bitstrings can be done within
# the Optimization class since the cost function will only ever return a scalar
# It could perhaps optionally return the bitstrings which are then picked up by the
# Optimizer class and stored
return -avg_MIS_size(post_processed_bitstrings)
if __name__ == "__main__":
# Probably some environment settings for harware runs
# Get parameter bounds
capabilities = get_capabilities()
det_max = capabilities.capabilities.rydberg.global_.detuning_max / 1E6
det_min = capabilities.capabilities.rydberg.global_.detuning_min / 1E6
amp_min = capabilities.capabilities.rydberg.global_.rabi_frequency_min / 1E6
amp_max = capabilities.capabilities.rydberg.global_.rabi_frequency_max / 1E6
total_time = capabilities.capabilities.rydberg.global_.time_max * 1E6
# Set hyperparameters
num_shots = 50
num_time_points = 3
# Initialize MIS program
mis_udg_program = (
Square(3, 5)
.apply_defect_density(0.4, np.random.default_rng(10))
.rydberg.rabi
.amplitude.uniform.piecewise_linear(["duration_" + str(i+1) for i in range(num_time_points)], [0.] + [amp_max] * (num_time_points-1) + [0.])
.detuning.uniform.piecewise_linear(["duration_" + str(i+1) for i in range(num_time_points)], ["detuning_" + str(i+1) for i in range(num_time_points+1)])
)
# FIXME: This is blocking until the figure is closed
nodes, G = generate_and_plot_graph(mis_udg_program)
# Initialize the parameters and optimizer
x0 = 2 * np.random.rand(2*num_time_points+1) - 1
opt_cobyla = Optimization(callback_step=1, cost_function=cost_function, method="COBYLA")
# Run the optimization
opt_cobyla.optimize(x0=x0, maxiter=50)
res = opt_cobyla.get_res()
cost_history = opt_cobyla.get_cost_history()
parameter_history = opt_cobyla.get_parameter_history()
# Generate a timestamp
timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# Save the results to a json file
#with open(f'test_run_{timestamp}.json', 'w') as f:
# json.dump({"res": res, "cost_history": cost_history, "parameter_history": parameter_history}, f)
My concern is having a class is overkill. You store the report anyways, why do we have to abstract this away? How is it better comparing to just declare a list? this seems killing the flexibility of using good external optimization packages without learning the deep internals. To me, if you just work on assign
it is already simple, and the following works for all known optimization packages
my_history = []
def func(xs) -> Report:
report = my_pulse.assign(x=xs[0], y=xs[1], z=xs[2]).braket().submit()
my_history.append(report) # you can always do this
return my_history
the problem here is we need to support a wide range of optimization interfaces, which I don't think there is a good abstraction without rewriting them on your own.
I do think it would be useful to also keep track of job ids, just to have all possible information. That can be very helpful in case anything stops the optimization loop and one wants to find out what happened and wants to restart from the last point.
that's just storing the report objects, which you will have to do anyways.
also, it is worth mentioning this is not the first time we are trying to provide an optimizer interface, we had a lot infras in Julia before for the exactly same task, in order to not rewrite all the optimizers from scratch we always end up in just ask for a black box function returns a cost. We can wrap optimizers in a black-box interface but that only requires you to write a separate module/package to do that or just use an existing one like the one from qiskit.
I am pretty sure that it is useful to have some object that is keeping a history of the parameters scanned by every kind of hybrid algorithm. I think this object would not be the hybrid algorithm but would be useful for organizing and executing a hybrid algorithm
many optimizer packages do this for you, or the optimization algorithm requires storing the history. Because a) you may not want to store the entire history, b) you may want to store the optimal point as history instead of everything, especially for algo like CMAES. c) packages like scipy allows you to do this with a callback already, and such interface is algo dependent. e.g if you want to use Adam with finite difference, the interface is likely a loop based one.
Yes, I think this is a general philosophical question perhaps @jon-wurtz can comment :)
from triage discussion
# durations, values
my_pulse_program.assign(d1=1,d2=...,...)
my_pulse_program.flatten(['d2', 'd1', 'd3']) -> # callable
# durations, values
my_pulse_program.assign(d1=1,d2=...,...)
my_pulse_program.flatten() -> # callable
From @jon-wurtz:
The idea for problem is to have it carry the description of the problem (such as udg vertices) and a problem.cost(solution) method. The ansatz class should be initiated by ansatz(problem, parameters) with a method anzatz.get_solution() or similar that returns valid solutions, so it includes the full hybrid algorithm. Then, optimizer gets optimizer(problem, ansatz) and has some method optimizer.optimize(), and should have a record of each step.
`class MIS_problem(Problem):
def __init__(self, graph) -> None:
self.graph = graph
def cost_function(self, ansatz, x):
duration_values = x[0:ansatz.num_time_points]
detuning_values = x[ansatz.num_time_points:]
duration_values, detuning_values = ansatz.parameter_transform(duration_values, detuning_values)
bitstrings = ansatz.get_bitstrings(duration_values, detuning_values)
post_processed_bitstrings = ansatz.post_process_MIS(bitstrings)
return -ansatz.avg_MIS_size(post_processed_bitstrings), post_processed_bitstrings`
`class MIS_ansatz(Ansatz):
def __init__(self, problem, q_hardware, num_shots, num_time_points) -> None:
self.num_time_points = num_time_points
self.num_shots = num_shots
self.graph = problem.graph
self.q_hardware = q_hardware
# Get parameter bounds
caps = get_capabilities()
self.det_max = caps.capabilities.rydberg.global_.detuning_max / 1E6
self.det_min = caps.capabilities.rydberg.global_.detuning_min / 1E6
self.amp_min = caps.capabilities.rydberg.global_.rabi_frequency_min / 1E6
self.amp_max = caps.capabilities.rydberg.global_.rabi_frequency_max / 1E6
self.total_time = caps.capabilities.rydberg.global_.time_max * 1E6
self._ansatz = self.ansatz()
def ansatz(self):
# Initialize MIS program
mis_udg_program = (
start.add_positions(self.graph)
.rydberg.rabi
.amplitude.uniform.piecewise_linear(["duration_" + str(i+1) for i in range(self.num_time_points)], [0.] + [self.amp_max] * (self.num_time_points-1) + [0.])
.detuning.uniform.piecewise_linear(["duration_" + str(i+1) for i in range(self.num_time_points)], ["detuning_" + str(i+1) for i in range(self.num_time_points+1)])
)
return mis_udg_program
def get_bitstrings(self, duration_values, detuning_values):
# Assign parameters for MIS program
kwargs_duration = {"duration_" + str(i+1): val for i, val in enumerate(duration_values)}
kwargs_detuning = {"detuning_" + str(i+1): val for i, val in enumerate(detuning_values)}
kwargs = {**kwargs_duration, **kwargs_detuning}
mis_udg_job = self._ansatz.assign(**kwargs)
# Run on hardware or local simulator
if self.q_hardware == True:
hw_job = mis_udg_job.braket(self.num_shots).submit()
else:
hw_job = mis_udg_job.braket_local_simulator(self.num_shots).submit()
return np.array(hw_job.report().bitstrings[0])
def parameter_transform(self, duration_values, detuning_values):
# Transform parameter values to correct range
duration_values = np.abs(duration_values)
duration_values = (duration_values / np.sum(duration_values)) * self.total_time
detuning_values = self.det_min + (self.det_max - self.det_min) / (1 + np.exp(-detuning_values))
return duration_values, detuning_values
def post_process_MIS(self, bitstrings):
post_processed_bitstrings = []
for bitstring in bitstrings:
inds = np.nonzero(bitstring==0)[0] # Find indices of IS vertices
# TODO: this should also check if the site is even occupied or not
# FIXME: This currently does not work
subgraph = nx.subgraph(self.graph, inds)
# nodes_to_check = [self.graph.nodes[i] for i in inds] # Map indices to nodes
subgraph = nx.subgraph(self.graph, self.graph.nodes) # Generate a subgraph from those vertices.
inds2 = nx.maximal_independent_set(subgraph, seed=0)
post_processed_bitstring = np.array([0 if node in inds2 else 1 for node in self.graph.nodes])
post_processed_bitstrings.append(post_processed_bitstring)
if len(post_processed_bitstrings) == 0:
raise ValueError("no independent sets found! increase number of shots.")
return np.asarray(post_processed_bitstrings)
def avg_MIS_size(self, bitstrings):
return (1-bitstrings).sum(axis=1).mean()
`
we can have a Callable
object generated by flatten
my_pulse_program.flatten(args_orders) -> Callable[list, Builder]
it returns the builder object if being called at this stage.
and the Callable
can be further configured lazily, so that eventually it can be used as the "cost function" directly,
my_pulse_program.flatten(args_orders) -> Callable[list, Builder]
my_pulse_program.flatten(args_orders).braket().submit().report() -> Callable[list, Builder]
and all it does is just append whatever method is being called after to the generated program.
note: I think we need another (maybe separate) triage meeting for this
my_pulse_program.flatten(args_orders).braket().submit().report()
does calling this multiple times with the same arguments create multiple jobs, or does it look up the previous call?
it submits a new task on a different parameter every time you call this callable
Let us move this discussion to #355
@jon-wurtz Proposed a structure like the following:
The general idea here is that you have an object that will execute a task store history of the parameters as well as the results for all previously executed tasks for the lifetime of the object.
I don't think we shuold restrict ourselves to just storting the bit-strings. This is similar to what I had proposed for the Optimization interface as an intermediate object that the optimization object can use to do the hybrid loop.