"""
Block-Level Knowledge Transfer DE (BLKT-DE)
This module implements BLKT-DE for multi-task optimization using block-level
decision variable decomposition with k-means clustering for cross-task
knowledge transfer.
References
----------
[1] Jiang, Yi, et al. "Block-Level Knowledge Transfer for Evolutionary
Multitask Optimization." IEEE Transactions on Cybernetics, 1-14, 2023.
Notes
-----
Author: Jiangtao Shen (DDMTOLab adaptation)
Date: 2026.02.22
Version: 1.0
"""
import time
import numpy as np
from tqdm import tqdm
from sklearn.cluster import KMeans
from ddmtolab.Methods.Algo_Methods.algo_utils import *
[docs]
class BLKT_DE:
"""
Block-Level Knowledge Transfer DE.
Decomposes decision variables into fixed-size blocks, clusters them
across all tasks and individuals using k-means, then performs DE/rand/1
within each cluster for cross-task knowledge transfer. Combined with
standard DE/rand/1/bin offspring per task.
Adaptive block size (divD) and cluster count (divK) based on per-task
improvement: full reset if all tasks stagnate, slight perturbation if
some tasks stagnate, no change if all improve.
Attributes
----------
algorithm_information : dict
Dictionary containing algorithm capabilities and requirements
"""
algorithm_information = {
'n_tasks': '[2, K]',
'dims': 'unequal',
'objs': 'equal',
'n_objs': '1',
'cons': 'unequal',
'n_cons': '[0, C]',
'expensive': 'False',
'knowledge_transfer': 'True',
'n': 'equal',
'max_nfes': 'equal'
}
@classmethod
def get_algorithm_information(cls, print_info=True):
return get_algorithm_information(cls, print_info)
[docs]
def __init__(self, problem, n=None, max_nfes=None, F=0.5, CR=0.7,
save_data=True, save_path='./Data', name='BLKT-DE',
disable_tqdm=True):
"""
Initialize BLKT-DE algorithm.
Parameters
----------
problem : MTOP
Multi-task optimization problem instance
n : int, optional
Population size per task (default: 100)
max_nfes : int, optional
Maximum number of function evaluations per task (default: 10000)
F : float, optional
DE mutation scale factor (default: 0.5)
CR : float, optional
DE crossover rate (default: 0.7)
save_data : bool, optional
Whether to save optimization data (default: True)
save_path : str, optional
Path to save results (default: './Data')
name : str, optional
Name for the experiment (default: 'BLKT-DE')
disable_tqdm : bool, optional
Whether to disable progress bar (default: True)
"""
self.problem = problem
self.n = n if n is not None else 100
self.max_nfes = max_nfes if max_nfes is not None else 10000
self.F = F
self.CR = CR
self.save_data = save_data
self.save_path = save_path
self.name = name
self.disable_tqdm = disable_tqdm
[docs]
def optimize(self):
"""
Execute the BLKT-DE algorithm.
Returns
-------
Results
Optimization results containing decision variables, objectives, and runtime
"""
start_time = time.time()
problem = self.problem
nt = problem.n_tasks
dims = problem.dims
n = self.n
max_nfes_per_task = par_list(self.max_nfes, nt)
max_nfes = self.max_nfes * nt
# Initialize population and evaluate
decs = initialization(problem, n)
objs, cons = evaluation(problem, decs)
nfes = n * nt
all_decs, all_objs, all_cons = init_history(decs, objs, cons)
# Block parameters
maxD = min(dims)
divD = np.random.randint(1, maxD + 1)
minK = 2
maxK = n // 2
divK = np.random.randint(minK, maxK + 1)
pbar = tqdm(total=max_nfes, initial=nfes, desc=f"{self.name}",
disable=self.disable_tqdm)
while nfes < max_nfes:
# === Build correspondence vectors ===
# Each entry: (task, individual, dim_start, dim_end)
corre = []
for t in range(nt):
n_blocks = int(np.ceil(dims[t] / divD))
for i in range(n):
for j in range(n_blocks):
d_start = j * divD
d_end = min((j + 1) * divD, dims[t])
corre.append((t, i, d_start, d_end))
n_total_blocks = len(corre)
# Extract block values (padded to divD)
dim_val = np.zeros((n_total_blocks, divD))
for idx in range(n_total_blocks):
t, i, ds, de = corre[idx]
dim_val[idx, :de - ds] = decs[t][i, ds:de]
# === K-means clustering ===
actual_k = max(2, min(divK, n_total_blocks))
km = KMeans(n_clusters=actual_k, n_init=1, max_iter=100)
labels = km.fit_predict(dim_val)
# Group block indices by cluster
subpops = [[] for _ in range(actual_k)]
for idx in range(n_total_blocks):
subpops[labels[idx]].append(idx)
# === Block-level DE within clusters ===
off_block_vals = {} # block index -> new block values
for k in range(actual_k):
cluster = subpops[k]
cs = len(cluster)
if cs < 4:
continue
for i_loc in range(cs):
# Select 3 distinct others from the cluster
candidates = list(range(cs))
candidates.remove(i_loc)
chosen = np.random.choice(candidates, 3, replace=False)
r1, r2, r3 = chosen
dp1 = dim_val[cluster[r1]]
dp2 = dim_val[cluster[r2]]
dp3 = dim_val[cluster[r3]]
# DE/rand/1 mutation
v = dp1 + self.F * (dp2 - dp3)
v = np.clip(v, 0, 1)
# Binomial crossover with original block
u = dim_val[cluster[i_loc]].copy()
j_rand = np.random.randint(divD)
mask = np.random.rand(divD) < self.CR
mask[j_rand] = True
u[mask] = v[mask]
off_block_vals[cluster[i_loc]] = u
# === Reassemble offspring1 (block-transferred copy) ===
offspring1_decs = [d.copy() for d in decs]
for idx, new_val in off_block_vals.items():
t, i, ds, de = corre[idx]
block_len = de - ds
offspring1_decs[t][i, ds:de] = new_val[:block_len]
# === Per-task: standard DE + combine + evaluate + select ===
succ_flags = [False] * nt
for t in range(nt):
# offspring2: standard DE/rand/1/bin
off2_decs = de_generation(decs[t], self.F, self.CR)
# Combine offspring1[t] and offspring2, randomly sample n
combined_off = np.vstack([off2_decs, offspring1_decs[t]])
perm = np.random.permutation(combined_off.shape[0])[:n]
off_decs_t = np.clip(combined_off[perm], 0, 1)
# Evaluate offspring
off_objs_t = np.full_like(objs[t], np.inf)
off_cons_t = np.zeros_like(cons[t])
for i in range(n):
off_objs_t[i:i + 1], off_cons_t[i:i + 1] = \
evaluation_single(problem, off_decs_t[i], t)
nfes += n
pbar.update(n)
# Track best before selection
best_before = np.min(objs[t])
# Elitist selection (parents + offspring)
merged_decs = np.vstack([decs[t], off_decs_t])
merged_objs = np.vstack([objs[t], off_objs_t])
merged_cons = np.vstack([cons[t], off_cons_t])
sel = selection_elit(objs=merged_objs, n=n, cons=merged_cons)
decs[t] = merged_decs[sel]
objs[t] = merged_objs[sel]
cons[t] = merged_cons[sel]
# Check improvement
if np.min(objs[t]) < best_before:
succ_flags[t] = True
# Record history
append_history(all_decs, decs, all_objs, objs, all_cons, cons)
# === Adaptive block parameters ===
if all(not f for f in succ_flags):
# All tasks stagnated: full random reset
divD = np.random.randint(1, maxD + 1)
divK = np.random.randint(minK, maxK + 1)
elif any(not f for f in succ_flags):
# Some tasks stagnated: slight perturbation (±1)
divD = min(maxD, max(1, np.random.randint(
max(1, divD - 1), min(maxD, divD + 1) + 1)))
divK = min(maxK, max(minK, np.random.randint(
max(minK, divK - 1), min(maxK, divK + 1) + 1)))
pbar.close()
runtime = time.time() - start_time
results = build_save_results(
all_decs=all_decs, all_objs=all_objs, runtime=runtime,
max_nfes=max_nfes_per_task, all_cons=all_cons,
bounds=problem.bounds, save_path=self.save_path,
filename=self.name, save_data=self.save_data)
return results