728x90
오토인코더(Autoencoder)란
입력 데이터를 압축했다가 다시 복원하는 신경망 구조입니다.
즉,
입력 → 잠재공간(압축) → 출력(복원)
그리고 복원오차(reconstruction error) 를 통해 이상 여부를 판단합니다
이상 탐지 흐름
- 정상 데이터만을 이용해 LSTM Autoencoder를 학습시킵니다.
- 모델은 정상 패턴만 학습하게 됩니다.
- 이후 어떤 입력을 넣고 복원했을 때,
- 오차가 작으면 정상
- 오차가 크면 이상 으로 판단합니다.
- Derivative (기울기, 변화량) 기반 탐지
- 개념: 값 자체가 아니라, 변화량에 이상 탐지 적용
- 활용 예시:
- 5초 내에 온도가 10도 이상 급등 → 이상
- 전류가 1초에 3A 이상 급격히 상승
import numpy as np
import matplotlib.pyplot as plt
# 예시 시계열: 100 이후 급변
data = np.concatenate([
np.linspace(0, 1, 100),
np.linspace(1, 10, 20),
np.linspace(10, 11, 80)
])
# 변화량(기울기) 계산
gradient = np.diff(data)
threshold = 0.1 # 기울기 임계값
# 이상 지점 감지
anomalies = np.where(np.abs(gradient) > threshold)[0]
# 시각화
plt.figure(figsize=(10, 4))
plt.plot(data, label="Signal")
plt.scatter(anomalies, data[anomalies], color='red', label="Anomalies")
plt.title("Derivative-based Anomaly Detection")
plt.legend()
plt.grid(True)
plt.show()
변화량(기울기)을 감시하면 정상 범위 안의 급격한 변화도 잡을 수 있음
- Moving Average + Change Point Detection
- 기법: 일정 구간의 평균/분산과 현재 값을 비교 → 급변 시점 감지
- 도구: ruptures, changefinder, Bayesian Change Point
import numpy as np
import ruptures as rpt
import matplotlib.pyplot as plt
# 시계열 신호 생성
signal = np.concatenate([
np.random.normal(1, 0.1, 100),
np.random.normal(5, 0.1, 100),
np.random.normal(3, 0.1, 100)
])
# 모델 및 알고리즘 선택
model = "rbf" # 비선형 변화 감지
algo = rpt.Pelt(model=model).fit(signal)
result = algo.predict(pen=10) # pen 값으로 민감도 조절
# 시각화
rpt.display(signal, result)
plt.title("Ruptures - Change Point Detection")
plt.show()
이동 평균(Moving Average) 기반의 간단한 이상치 탐지(Anomaly Detection)
import numpy as np
import matplotlib.pyplot as plt
# 예시 시계열
data = np.concatenate([
np.random.normal(0, 0.1, 100),
np.random.normal(3, 0.1, 100)
])
# 이동 평균
window = 10
moving_avg = np.convolve(data, np.ones(window)/window, mode='same')
# 이동 평균 기준 변화량 계산
diff = np.abs(data - moving_avg)
threshold = 0.5 # 변화량 기준
# 이상 지점
anomalies = np.where(diff > threshold)[0]
# 시각화
plt.figure(figsize=(10, 4))
plt.plot(data, label="Signal")
plt.plot(moving_avg, label="Moving Average", linestyle='--')
plt.scatter(anomalies, data[anomalies], color='red', label="Anomalies")
plt.title("Moving Average + Change Detection")
plt.legend()
plt.grid(True)
plt.show()
ChangeFinder 알고리즘을 사용한 이상 탐지 / 변화 탐지
ruptures나 moving average 방식과는 다르게, 온라인(실시간) 환경에 적합하고, 통계 기반으로 설계된 알고리즘
import changefinder
import numpy as np
import matplotlib.pyplot as plt
# 시계열 데이터 생성
data = np.concatenate([
np.random.normal(1, 0.1, 100),
np.random.normal(5, 0.1, 100),
np.random.normal(3, 0.1, 100)
])
# ChangeFinder 초기화
cf = changefinder.ChangeFinder(r=0.01, order=1, smooth=5)
# 점수 계산
scores = [cf.update(d) for d in data]
# 시각화
plt.figure(figsize=(10, 4))
plt.plot(data, label="Signal")
plt.plot(scores, label="Anomaly Score")
plt.title("ChangeFinder - Anomaly Detection")
plt.legend()
plt.show()
Bayesian Online Change Point Detection (BOCPD) 알고리즘을 사용하여 시계열 데이터에서 변화점(분포가 바뀌는 지점) 을 실시간으로 탐지
BOCPD는 변화가 일어날 확률을 베이지안적으로 추론하며, 새로운 관측마다 변화 가능성을 업데이트하여 실시간으로 변화점을 탐지하는 강력한 알고리즘(ARIMA ⇒ AR기반으로 탐지한다.)
%matplotlib inline
import numpy as np
np.random.seed(555)
import matplotlib.pyplot as plt
import bocd
test_signal = np.concatenate(
[np.random.normal(0.7, 0.05, 300),
np.random.normal(1.5, 0.05, 300),
np.random.normal(0.6, 0.05, 300),
np.random.normal(1.3, 0.05, 300)])
plt.plot(test_signal)
bc = bocd.BayesianOnlineChangePointDetection(bocd.ConstantHazard(300), bocd.StudentT(mu=0, kappa=1, alpha=1, beta=1))
rt_mle = np.empty(test_signal.shape)
for i, d in enumerate(test_signal):
bc.update(d)
rt_mle[i] = bc.rt
plt.plot(test_signal, alpha=0.5, label="observation")
index_changes = np.where(np.diff(rt_mle)<0)[0]
plt.scatter(index_changes, test_signal[index_changes], c='green', label="change point")
Variance-based Rule
- Variance-based Rule은 일정한 기간(슬라이딩 윈도우 또는 구간) 동안의 분산(혹은 표준편차) 을 계산하고,
- 그 분산이 사전에 설정한 임계값(threshold)을 초과하면,“여기서 뭔가 변화가 생겼다(또는 이상하다)”
- 라고 판단하는 방식입니다.
왜 분산을 기준으로 삼는가?
- 정상 구간은 일반적으로 분산이 작고 일정합니다.
- 이상 구간은 평균이 바뀌거나, 급격한 값의 진동이 생기며 분산이 커집니다.
- 따라서 분산이 갑자기 튀는 지점을 이상 또는 변화점으로 탐지할 수 있습니다.
예시 알고리즘 흐름
python
복사편집
1. 일정 길이의 윈도우를 설정 (예: 20개 샘플)
2. 각 윈도우마다 분산(variance) 또는 표준편차(std)를 계산
3. 기준 분산 또는 임계값과 비교
4. 기준보다 높으면 이상 or 변화점으로 판단
- Guassian Tail Probability정규분포(Gaussian distribution) 를 따르는 데이터에서,
- 평균으로부터 특정 거리 이상 떨어진 값이 나올 확률
import numpy as np
import matplotlib.pyplot as plt
# 정상 데이터 + 분산 증가
data = np.concatenate([
np.random.normal(10, 0.2, 100),
np.random.normal(10, 2.0, 100) # 분산이 급격히 증가
])
# 이동 분산 계산 (rolling window)
window = 20
variances = [np.var(data[i-window:i]) if i >= window else 0 for i in range(len(data))]
# 임계값 기준 이상 감지
threshold = 1.0
anomalies = np.where(np.array(variances) > threshold)[0]
# 시각화
plt.figure(figsize=(10, 4))
plt.plot(data, label="Signal")
plt.plot(variances, label="Rolling Variance", color='orange')
plt.scatter(anomalies, data[anomalies], color='red', label="Variance-based Anomaly")
plt.title("Variance-based Rule Detection")
plt.legend()
plt.grid(True)
plt.show()
이 확률은 곧 "이 값이 정상 구간 밖에 있을 가능성" 이며,
일종의 이상치(outlier) 점수로 활용할 수 있어요.
- 이상치 탐지: 꼬리 확률이 매우 작으면 "이 값은 정상 분포에서 거의 안 나오는 값 → 이상"
- p-value 계산: 통계 검정에서 유의확률(p-value)을 계산할 때도 tail probability 사용
- 확률적 스코어링: 확률 기반 이상 탐지에 사용
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm
x = np.linspace(-5, 5, 1000)
y = norm.pdf(x, 0, 1)
plt.plot(x, y, label='Gaussian PDF')
plt.fill_between(x, y, where=(x > 2), color='red', alpha=0.3, label='Right Tail (Z > 2)')
plt.fill_between(x, y, where=(x < -2), color='blue', alpha=0.3, label='Left Tail (Z < -2)')
plt.title("Gaussian Tail Probability")
plt.legend()
plt.grid(True)
plt.show()
DBSCAN
- DBSCAN은 "점들이 얼마나 밀집되어 있는가"를 기준으로 클러스터를 자동으로 형성하고,
- 밀도가 낮은 고립된 점들은 이상치(noise)로 간주하는 알고리즘
- DBSCAN은 밀도 기반 클러스터링 알고리즘으로, 특히 이상치 탐지와 비선형 구조 데이터 분리에 강한 특징을 가진 알고리즘
동작 원리 요약
- 모든 점에 대해 eps 거리 안의 이웃 점들을 찾음
- 이웃이 min_samples 이상이면 → Core Point로 간주 → 클러스터 생성 시작
- 이웃의 이웃들을 재귀적으로 연결하며 클러스터 확장
- 어떤 클러스터에도 속하지 않는 점은 Noise (이상치) 처리
import numpy as np
import matplotlib.pyplot as plt
from sklearn.cluster import DBSCAN
# 1. 데이터 생성
np.random.seed(0)
data = np.concatenate([
np.random.normal(0, 0.5, 50),
np.random.normal(5, 0.5, 50),
np.array([10, 12, 15]) # 이상치
]).reshape(-1, 1)
# 2. DBSCAN 적용
dbscan = DBSCAN(eps=0.8, min_samples=5)
labels = dbscan.fit_predict(data)
# 3. 이상치 위치 (label == -1)
outliers = np.where(labels == -1)[0]
# 4. 시각화
plt.figure(figsize=(10, 4))
plt.plot(data, 'bo', label="Normal")
plt.plot(outliers, data[outliers], 'ro', label="Outliers")
plt.title("DBSCAN-based Anomaly Detection")
plt.legend()
plt.grid(True)
plt.show()