У вас классическая задача слепого выравнивания (blind alignment) в условиях жестких помех. Стандартные методы (обычная корреляция или разность квадратов) здесь не работают, потому что они опираются на энергию сигнала (амплитуду), а у вас энергия искажена зашкалом и разбросом масштабов.
Вам нужно опираться на структуру (фазу). Для сигналов типа

самой надежной характеристикой, которая сохраняется при любом усилении и клиппинге, являются пересечения нуля (Zero Crossings) и фронты.
Вот надежное решение, которое работает за линейное время

и решает ваши проблемы.
Стратегия решения
Вместо сравнения «всех со всеми» (

), мы используем итеративный подход Master-Slave:
Берем первый сигнал как черновой «Эталон».
Каждый новый сигнал выравниваем относительно Эталона.
Находим его масштабный коэффициент (нормируем).
Вливаем его в Эталон (накапливаем среднее/медиану), очищая его от шума.
Шаг 1. Грубое выравнивание (Sign Correlation)
Как найти сдвиг, если амплитуды отличаются в 100 раз, а верхушки срезаны? Используйте знаковую корреляцию. Превратите оба сигнала в последовательность +1 и -1:
![$$ B[i] = \text{sign}(Signal[i]) $$ $$ B[i] = \text{sign}(Signal[i]) $$](https://dxdy-04.korotkov.co.uk/f/f/c/4/fc4e72a8120885fe4ad279ee2be66b6982.png)
Почему это работает: Функция sign полностью игнорирует амплитуду и клиппинг (если сигнал был положительным, он останется +1 хоть при зашкале, хоть без).
Корреляция знаковых масок correlate(sign(Ref), sign(New)) даст четкий треугольный пик в месте совпадения фаз (нулей). Это работает даже на очень зашумленных данных.
Шаг 2. Точная подгонка и Нормировка (Masked Least Squares)
После того как сдвиг найден, нужно найти коэффициент k, чтобы New≈k⋅Ref. Обычное деление средних здесь даст ошибку. Используйте Маскированный МНК. Мы ищем k, минимизируя ошибку только на валидных точках (где нет зашкала ни у эталона, ни у нового сигнала).
Формула (аналитическая):
![$$ k = \frac{\sum (Ref[i] \cdot New[i] \cdot Mask[i])}{\sum (New[i]^2 \cdot Mask[i])} $$ $$ k = \frac{\sum (Ref[i] \cdot New[i] \cdot Mask[i])}{\sum (New[i]^2 \cdot Mask[i])} $$](https://dxdy-03.korotkov.co.uk/f/a/1/9/a19a27e7fada1abbcfab62cae8e3f67b82.png)
Где Mask[i]=1, только если обе точки «живые», и 0, если хоть одна в зашкале.
Шаг 3. Финальная очистка (SVD)
Вы хотите построить «линейную комбинацию». Математически идеальный способ выделить общую форму из кучи сигналов с разной амплитудой — это SVD (Сингулярное разложение).
Соберите все выровненные сигналы в матрицу.
Сделайте SVD.
Первая компонента (первый собственный вектор) будет самой чистой формой вашего сигнала, очищенной от шума статистически оптимальным способом.
Пример реализации (Python)
Этот код реализует описанную логику. Он устойчив к зашкалам и разбросу амплитуд.
import numpy as np
from scipy.signal import correlate
from sklearn.decomposition import TruncatedSVD
def robust_align_and_clean(signals, masks):
"""
signals: список numpy массивов (N,)
masks: список булевых масок (True, если точка В ЗАШКАЛЕ/битая)
"""
# 1. Инициализация: выбираем лучший первый эталон (где меньше всего зашкалов)
# Оцениваем "здоровье" сигналов в начале (первые 400 точек)
roi_len = 400
health_scores = [np.sum(~m[:roi_len]) for m in masks]
best_start_idx = np.argmax(health_scores)
# Master - наш накапливаемый эталон
master_sig = signals[best_start_idx].copy()
aligned_stack = [master_sig]
# Чтобы не накапливать ошибки, следующий поиск делаем по 'master_sig',
# который будет обновляться (становиться чище).
for i in range(len(signals)):
if i == best_start_idx: continue
curr_sig = signals[i]
curr_bad_mask = masks[i] # True если bad
master_bad_mask = masks[best_start_idx] # (Упрощение: маска эталона статична, или обновлять её тоже)
# --- ЭТАП 1: ПОИСК СДВИГА (SIGN CORRELATION) ---
# Работаем только с "головой" сигнала, хвост с шумом только мешает
roi = slice(0, roi_len)
# Знаковая корреляция - устойчива к амплитуде 100x и клиппингу
# sign возвращает -1, 0, 1.
ref_sign = np.sign(master_sig[roi])
curr_sign = np.sign(curr_sig[roi])
# Кросс-корреляция
xcorr = correlate(ref_sign, curr_sign, mode='full')
shift_idx = np.argmax(xcorr)
lag = shift_idx - (len(curr_sign) - 1)
# --- ЭТАП 2: ВЫРАВНИВАНИЕ ---
# Сдвигаем сигнал и маску
# (Используем roll для простоты, в продакшене лучше паддинг нулями)
aligned_sig = np.roll(curr_sig, lag)
aligned_bad_mask = np.roll(curr_bad_mask, lag)
# Обработка краев после сдвига (забиваем мусором/маской)
if lag > 0:
aligned_bad_mask[:lag] = True
aligned_sig[:lag] = 0
elif lag < 0:
aligned_bad_mask[lag:] = True
aligned_sig[lag:] = 0
# --- ЭТАП 3: ПОДГОНКА АМПЛИТУДЫ (MASKED LS) ---
# Считаем коэффициент k только по ВАЛИДНЫМ пересечениям
# Точка валидна, если она не в зашкале НИ У КОГО
# Маска "где данные хорошие у обоих"
# master_bad_mask можно накапливать, но для простоты берем от стартового
valid_idx = (~master_bad_mask) & (~aligned_bad_mask)
# Ограничимся ROI для подсчета амплитуды (хвост слишком шумит)
valid_idx[roi_len:] = False
if np.sum(valid_idx) < 10:
# Слишком мало общих точек, пропускаем сигнал
continue
y_ref = master_sig[valid_idx]
x_new = aligned_sig[valid_idx]
# k = dot(ref, new) / dot(new, new)
denom = np.dot(x_new, x_new)
if denom < 1e-9: k = 1.0
else: k = np.dot(y_ref, x_new) / denom
# Нормируем сигнал к эталону
final_sig = aligned_sig * k
aligned_stack.append(final_sig)
# --- ЭТАП 4: ОБНОВЛЕНИЕ ЭТАЛОНА ---
# Простое скользящее среднее: master становится чуть чище
# (Можно делать weighted average, но mean тоже ок)
master_sig = (master_sig * i + final_sig) / (i + 1)
# --- ФИНАЛ: SVD для идеальной формы ---
data_matrix = np.array(aligned_stack)
# Обрежем матрицу до ROI или чуть больше, если хвост не нужен
# Или используем SVD на всей длине
# TruncatedSVD найдет главную компоненту (форму)
svd = TruncatedSVD(n_components=1)
svd.fit(data_matrix)
clean_signal = svd.components_[0]
# SVD может перевернуть знак, проверим
if np.sign(clean_signal[np.argmax(np.abs(clean_signal))]) != np.sign(master_sig[np.argmax(np.abs(master_sig))]):
clean_signal *= -1
return clean_signal
Почему это сработает в вашем случае?
np.sign() вместо Фурье: Вы переходите в пространство фазы. Это убивает разницу амплитуд и делает зашкал невидимым для алгоритма поиска сдвига.
Маскированный МНК: Вычисляя коэффициент масштаба k, вы математически исключаете «плохие» точки из уравнения. Это дает точную амплитуду даже если 30% пика срезано.
SVD: Это лучший математический способ найти «общий знаменатель» для набора зашумленных векторов.
Совет: Если сигналы приходят потоком и памяти мало, вместо SVD в конце можно просто накапливать медиану (np.nanmedian) по столбцам стека. Медиана работает хуже SVD по шуму, но она абсолютно неубиваема одиночными грубыми выбросами (если один сигнал выровнялся неправильно, медиана его проигнорирует, а среднее/SVD немного смажется).
Я про эту область вообще ничего не знаю, поэтому оценить осмысленность не могу.
может ли такая железка, при правильном использовании обгонять по производительности обычного математика-программиста
Смотря в чём. В численном интегрировании - легко
В генерации кода - очень вряд ли.
Тут еще вопрос, что у Вас за странная железка с памятью 10 GPU и ядрами одного.