from tqdm import tqdm
from mosek.fusion import Domain, Expr
import numpy as np
import gc
from scipy.stats import qmc
from apem.unit_based_model.error import Error
from power_flow_relaxations.models import Jabr
from power_flow_relaxations.utils.network import partition_graph
[docs]
class QC(Jabr):
"""
Implementation of QC relaxation for ACOPF using CVXPY.
"""
[docs]
def global_qmc_envelope_bounds(self):
"""
Compute bounds on theta_v - theta_w, sin(theta_v - theta_w), cos(theta_v - theta_w)
by sampling voltage magnitudes and angles, then filtering based on thermal limits.
Returns
-------
delta_theta_min, delta_theta_max, sin_min, sin_max, cos_min, cos_max : arrays [N, N]
"""
if self.degree == 0:
raise ValueError("Degree must be at least 1 to perform sampling.")
N = len(self.nodes)
T = len(self.periods)
# Initialize output arrays
delta_theta_min = np.full((N, N, T), -np.pi / 2)
delta_theta_max = np.full((N, N, T), np.pi / 2)
sin_min = np.full((N, N, T), -1.0)
sin_max = np.full((N, N, T), 1.0)
cos_min = np.full((N, N, T), -1.0)
cos_max = np.full((N, N, T), 1.0)
# Partition the graph
components = partition_graph(self.network, 12)
for comp_id, component in enumerate(components):
print(f"[QC] Processing component {comp_id + 1}/{len(components)} with {len(component)} nodes")
nodes = [(i_v, v) for i_v, v in self.nodes if v in component]
component_index = {v: i for i, v in enumerate(component)}
neighbours = {
node: [(self.node_indices[w], w) for w in self.network[node] if w in component[node]] for node in self.network if node in component
}
size = len(component)
sobol = qmc.Sobol(d=2 * size, scramble=True, seed=self.qmc_seed + comp_id).random_base2(m=self.degree)
V_samples = np.full((2 ** self.degree, size, T), 1.0)
theta_samples = np.full((2 ** self.degree, size, T), 0.0)
for i_v, v in nodes:
V_samples[:, component_index[v], :] = self.V_min[i_v] + (self.V_max[i_v] - self.V_min[i_v]) * sobol[:, 2 * component_index[v], None]
if i_v == self.reference_bus[0]:
theta_samples[:, component_index[v], :] = 0.0
else:
theta_samples[:, component_index[v], :] = np.pi * (2 * sobol[:, 2 * component_index[v] + 1, None] - 1)
del sobol
for i_v, v in nodes:
for i_w, w in neighbours[v]:
feasible = np.zeros((2 ** self.degree, T), dtype=bool)
G_vw = self.G[i_v, i_w]
B_vw = self.B[i_v, i_w]
S_max_vw = self.S_max[i_v, i_w]
Vv_samples = V_samples[:, component_index[v], :]
Vw_samples = V_samples[:, component_index[w], :]
thetav_samples = theta_samples[:, component_index[v], :]
thetaw_samples = theta_samples[:, component_index[w], :]
delta_theta = thetav_samples - thetaw_samples
delta_theta = np.arctan(np.sin(delta_theta) / np.cos(delta_theta))
del thetav_samples, thetaw_samples
cos_dt = np.cos(delta_theta)
sin_dt = np.sin(delta_theta)
p_vw = G_vw * Vv_samples**2 - Vv_samples * Vw_samples * (G_vw * cos_dt + B_vw * sin_dt)
q_vw = -B_vw * Vv_samples**2 - Vv_samples * Vw_samples * (-B_vw * cos_dt + G_vw * sin_dt)
S_vw = np.sqrt(p_vw**2 + q_vw**2)
del p_vw, q_vw, Vv_samples, Vw_samples
feasible |= (S_vw <= S_max_vw)
del S_vw
delta_theta_f = delta_theta[feasible]
sin_f = sin_dt[feasible]
cos_f = cos_dt[feasible]
del sin_dt, cos_dt, delta_theta
delta_theta_min[i_v, i_w, :], delta_theta_max[i_v, i_w, :] = np.min(delta_theta_f), np.max(delta_theta_f)
sin_min[i_v, i_w, :], sin_max[i_v, i_w, :] = np.min(sin_f), np.max(sin_f)
cos_min[i_v, i_w, :], cos_max[i_v, i_w, :] = np.min(cos_f), np.max(cos_f)
del V_samples, theta_samples, nodes, component_index, neighbours
gc.collect()
gc.collect()
return delta_theta_min, delta_theta_max, sin_min, sin_max, cos_min, cos_max
[docs]
def local_qmc_envelope_bounds(self, jabr_allocation, eps_V, eps_theta):
"""
Compute per-edge bounds on theta_v - theta_w, sin(theta_v - theta_w), cos(theta_v - theta_w)
by sampling around the Jabr solution with Sobol sequences, filtering based on thermal limits.
Adaptive theta perturbation is applied per edge.
"""
if self.degree == 0:
raise ValueError("Degree must be at least 1 to perform sampling.")
V_vt_jabr = jabr_allocation.V_vt # Dict[(node, period), (Vd, Vq)]
N = len(self.nodes)
T = len(self.periods)
# Extract voltage magnitudes and angles from Jabr solution
V_jabr = np.zeros((N, T))
theta_jabr = np.zeros((N, T))
for t_idx, period in self.periods:
for i_v, v in self.nodes:
Vd_vt, Vq_vt = V_vt_jabr[v, period]
V_jabr[i_v, t_idx] = np.sqrt(Vd_vt**2 + Vq_vt**2)
theta_jabr[i_v, t_idx] = np.arctan2(Vq_vt, Vd_vt)
print(f"[QC] Performing local sampling around Jabr solution (eps_V={eps_V}, eps_θ={eps_theta})")
# Initialize output arrays
delta_theta_min = np.full((N, N, T), -np.pi / 2)
delta_theta_max = np.full((N, N, T), np.pi / 2)
sin_min = np.full((N, N, T), -1.0)
sin_max = np.full((N, N, T), 1.0)
cos_min = np.full((N, N, T), -1.0)
cos_max = np.full((N, N, T), 1.0)
for t, period in self.periods:
components = partition_graph(self.network, 10, min_size=8, max_size=16)
for comp_id, component in enumerate(components):
nodes = [(i_v, v) for i_v, v in self.nodes if v in component]
component_index = {v: i for i, v in enumerate(component)}
neighbours = {
node: [(self.node_indices[w], w) for w in self.network[node] if w in component[node]] for node in self.network if node in component
}
size = len(component)
sobol = qmc.Sobol(d=2 * size, scramble=True, seed=self.qmc_seed + comp_id).random_base2(m=self.degree)
V_samples = np.full((2 ** self.degree, size, T), 1.0)
theta_samples = np.full((2 ** self.degree, size, T), 0.0)
for i_v, v in nodes:
if i_v == self.reference_bus[0]:
V_samples[:, component_index[v], t] = 1.0
theta_samples[:, component_index[v], t] = 0.0
else:
V_samples[:, component_index[v], t] = V_jabr[i_v, t] + eps_V * (self.V_max[i_v] - self.V_min[i_v]) / 2 * sobol[:, 2 * component_index[v]]
theta_samples[:, component_index[v], t] = theta_jabr[i_v, t] + eps_theta * np.pi * (2 * sobol[:, 2 * component_index[v] + 1] - 1)
del sobol
for i_v, v in nodes:
for i_w, w in neighbours[v]:
feasible = np.zeros((2 ** self.degree,), dtype=bool)
G_vw = self.G[i_v, i_w]
B_vw = self.B[i_v, i_w]
S_max_vw = self.S_max[i_v, i_w]
Vv_samples = V_samples[:, component_index[v], t]
Vw_samples = V_samples[:, component_index[w], t]
thetav_samples = theta_samples[:, component_index[v], t]
thetaw_samples = theta_samples[:, component_index[w], t]
delta_theta = thetav_samples - thetaw_samples
delta_theta = np.arctan(np.sin(delta_theta) / np.cos(delta_theta))
del thetav_samples, thetaw_samples
cos_dt = np.cos(delta_theta)
sin_dt = np.sin(delta_theta)
p_vw = G_vw * Vv_samples**2 - Vv_samples * Vw_samples * (G_vw * cos_dt + B_vw * sin_dt)
q_vw = -B_vw * Vv_samples**2 - Vv_samples * Vw_samples * (-B_vw * cos_dt + G_vw * sin_dt)
S_vw = np.sqrt(p_vw**2 + q_vw**2)
del p_vw, q_vw, Vv_samples, Vw_samples
feasible |= (S_vw <= S_max_vw)
del S_vw
if not np.any(feasible):
print(f"[QC] Warning: No feasible samples found for edge ({v}, {w}) at period {period} during local sampling.")
continue
delta_theta_f = delta_theta[feasible]
sin_f = sin_dt[feasible]
cos_f = cos_dt[feasible]
del delta_theta, sin_dt, cos_dt
delta_theta_min[i_v, i_w, t], delta_theta_max[i_v, i_w, t] = np.min(delta_theta_f), np.max(delta_theta_f)
sin_min[i_v, i_w, t], sin_max[i_v, i_w, t] = np.min(sin_f), np.max(sin_f)
cos_min[i_v, i_w, t], cos_max[i_v, i_w, t] = np.min(cos_f), np.max(cos_f)
del V_samples, theta_samples, nodes, component_index, neighbours
gc.collect()
gc.collect()
return delta_theta_min, delta_theta_max, sin_min, sin_max, cos_min, cos_max
[docs]
def mccormick_envelope(self, t, x, y, xL, xU, yL, yU):
"""
Add McCormick envelope constraints for the bilinear term `t = x * y`.
Uses variable bounds `(xL, xU)` and `(yL, yU)` to add the four linear
inequalities that define the convex hull relaxation.
"""
self.model.constraint(t >= xL * y + yL * x - xL * yL)
self.model.constraint(t >= xU * y + yU * x - xU * yU)
self.model.constraint(t <= xL * y + yU * x - xL * yU)
self.model.constraint(t <= xU * y + yL * x - xU * yL)
[docs]
def square_envelope(self, t, x, xL, xU):
"""
Add convex-envelope constraints for the quadratic term `t = x^2`.
Uses an upper linear secant bound on `[xL, xU]` and a rotated-cone lower
bound to keep `t` as a convex relaxation of `x^2`.
"""
self.model.constraint(t <= (xL + xU) * x - xL * xU)
self.model.constraint(Expr.vstack([t, Expr.constTerm(0.5), x]) == Domain.inRotatedQCone())
[docs]
def sin_envelope(self, t, x, xL, xU):
"""
Add convex relaxation bounds for the nonlinear term `t = sin(x)`.
Builds tangent/chord-based bounds over `[xL, xU]` to approximate the sine
graph with linear constraints.
"""
xM = max(abs(xL), abs(xU))
self.model.constraint(t <= np.cos(xM / 2) * (x - xM / 2) + np.sin(xM / 2))
self.model.constraint(t >= np.cos(xM / 2) * (x + xM / 2) - np.sin(xM / 2))
if xL < xU and xL >= 0:
self.model.constraint(
t >= (np.sin(xL) - np.sin(xU)) / (xL - xU) * (x - xL) + np.sin(xL)
)
if xL < xU and xU <= 0:
self.model.constraint(
t <= (np.sin(xL) - np.sin(xU)) / (xL - xU) * (x - xL) + np.sin(xL)
)
[docs]
def cos_envelope(self, t, x, xL, xU):
"""
Add convex relaxation bounds for the nonlinear term `t = cos(x)`.
Uses a quadratic-cone-supported upper bound and a chord lower bound over
`[xL, xU]` to approximate the cosine graph.
"""
xM = max(abs(xL), abs(xU))
if xM != 0:
alpha = (1 - np.cos(xM)) / (xM**2)
y = self.model.variable(1, Domain.unbounded())
self.model.constraint(Expr.vstack([y, Expr.constTerm(0.5), x]) == Domain.inRotatedQCone())
self.model.constraint(t + alpha * y <= 1)
if xL < xU:
self.model.constraint(t >= (np.cos(xL) - np.cos(xU)) / (xL - xU) * (x - xL) + np.cos(xL))
def __init__(self, scenario, configuration, degree=8, seed=42, local_sampling=True, eps_V=0.1, eps_theta=0.15, custom_envelope_bounds=None, **kwargs):
super().__init__(scenario, configuration, **kwargs)
if degree < 1:
raise ValueError("Degree must be at least 1 to perform sampling.")
self.degree = degree if not hasattr(self, "degree") else self.degree
self.V_abs_vt = self.model.variable("V_abs_vt", [len(self.nodes), len(self.periods)], Domain.greaterThan(0))
self.theta_vt = self.model.variable("theta_vt", [len(self.nodes), len(self.periods)])
self.qmc_seed = seed if not hasattr(self, "qmc_seed") else self.qmc_seed
# Sampling parameters
self.local_sampling = local_sampling if not hasattr(self, "local_sampling") else self.local_sampling
self.eps_V = eps_V if not hasattr(self, "eps_V") else self.eps_V
self.eps_theta = eps_theta if not hasattr(self, "eps_theta") else self.eps_theta
self.custom_envelope_bounds = custom_envelope_bounds if not hasattr(self, "custom_envelope_bounds") else self.custom_envelope_bounds
self.jabr_kwargs = kwargs if not hasattr(self, "jabr_kwargs") else self.jabr_kwargs
self.jabr_allocation = None if not hasattr(self, "jabr_allocation") else self.jabr_allocation
[docs]
def solve(self, **kwargs):
allocation = super().solve(**kwargs)
if not self.local_sampling:
return allocation
if isinstance(allocation, Error):
if isinstance(self.jabr_allocation, (Error, None)):
return allocation
else:
return self.jabr_allocation
jabr_violations = self.jabr_allocation.compute_feasibility_violations(violations=["line"], print_summary=False)
actual_violations = allocation.compute_feasibility_violations(violations=["line"], print_summary=False)
jabr_line = jabr_violations.get("line (A)")
actual_line = actual_violations.get("line (A)")
if jabr_line is None or actual_line is None:
return allocation
if actual_line > jabr_line:
print(f"[QC] Warning: QC solution has higher line violations ({actual_violations['line (A)']:.4f} A) than Jabr ({jabr_violations['line (A)']:.4f} A). Reverting to Jabr solution.")
return self.jabr_allocation
else:
return allocation
[docs]
def power_constraints(self):
super().power_constraints()
if self.custom_envelope_bounds is not None:
delta_theta_min, delta_theta_max, sin_min, sin_max, cos_min, cos_max = self.custom_envelope_bounds
else:
if self.local_sampling:
print("[QC] Solving Jabr relaxation for local sampling...")
jabr_model = Jabr(self.scenario, self.configuration, **self.jabr_kwargs)
self.jabr_allocation = jabr_model.solve(force_integrality=True)
delta_theta_min, delta_theta_max, sin_min, sin_max, cos_min, cos_max = self.local_qmc_envelope_bounds(self.jabr_allocation, self.eps_V, self.eps_theta)
self.custom_envelope_bounds = (delta_theta_min, delta_theta_max, sin_min, sin_max, cos_min, cos_max)
else:
print("[QC] Performing global sampling across full voltage/angle ranges")
delta_theta_min, delta_theta_max, sin_min, sin_max, cos_min, cos_max = self.global_qmc_envelope_bounds()
self.custom_envelope_bounds = (delta_theta_min, delta_theta_max, sin_min, sin_max, cos_min, cos_max)
for t, _ in self.periods:
for i_v, v in self.nodes:
self.square_envelope(self.c_vwt[t][i_v, i_v],
self.V_abs_vt[i_v, t],
self.V_min[i_v],
self.V_max[i_v],
)
for i_w, _ in self.neighbours[v]:
m_vwt = self.model.variable(f"m_[{i_v}, {i_w}, {t}]", 1, Domain.greaterThan(0)) # V_abs_vt[i_v, t] * V_abs_vt[i_w, t]
self.mccormick_envelope(
m_vwt,
self.V_abs_vt[i_v, t],
self.V_abs_vt[i_w, t],
self.V_min[i_v],
self.V_max[i_v],
self.V_min[i_w],
self.V_max[i_w]
)
cos_vwt = self.model.variable(f"cos_[{i_v}, {i_w}, {t}]", 1) # cos(theta_vt[i_v, t] - theta_vt[i_w, t])
self.cos_envelope(
cos_vwt,
self.theta_vt[i_v, t] - self.theta_vt[i_w, t],
delta_theta_min[i_v, i_w, t],
delta_theta_max[i_v, i_w, t]
)
sin_vwt = self.model.variable(f"sin_[{i_v}, {i_w}, {t}]", 1) # sin(theta_vt[i_v, t] - theta_vt[i_w, t])
self.sin_envelope(
sin_vwt,
self.theta_vt[i_v, t] - self.theta_vt[i_w, t],
delta_theta_min[i_v, i_w, t],
delta_theta_max[i_v, i_w, t]
)
self.mccormick_envelope(
self.c_vwt[t][i_v, i_w],
m_vwt,
cos_vwt,
self.V_min[i_v] * self.V_min[i_w],
self.V_max[i_v] * self.V_max[i_w],
cos_min[i_v, i_w, t],
cos_max[i_v, i_w, t]
)
self.mccormick_envelope(
self.s_vwt[t][i_v, i_w],
m_vwt,
sin_vwt,
self.V_min[i_v] * self.V_min[i_w],
self.V_max[i_v] * self.V_max[i_w],
sin_min[i_v, i_w, t],
sin_max[i_v, i_w, t]
)
def __str__(self): # type: ignore
return "QC"