카테고리 없음

62. 센서 데이터와 이상 탐지

kaseomi 2025. 3. 27. 23:58
728x90

 

오토인코더(Autoencoder)란

입력 데이터를 압축했다가 다시 복원하는 신경망 구조입니다.

즉,

입력 → 잠재공간(압축) → 출력(복원)

그리고 복원오차(reconstruction error) 를 통해 이상 여부를 판단합니다

이상 탐지 흐름

  1. 정상 데이터만을 이용해 LSTM Autoencoder를 학습시킵니다.
  2. 모델은 정상 패턴만 학습하게 됩니다.
  3. 이후 어떤 입력을 넣고 복원했을 때,
    • 오차가 작으면 정상
    • 오차가 크면 이상 으로 판단합니다.
  • Derivative (기울기, 변화량) 기반 탐지
    • 개념: 값 자체가 아니라, 변화량에 이상 탐지 적용
    • 활용 예시:
      • 5초 내에 온도가 10도 이상 급등 → 이상
      • 전류가 1초에 3A 이상 급격히 상승
import numpy as np
import matplotlib.pyplot as plt

# 예시 시계열: 100 이후 급변
data = np.concatenate([
    np.linspace(0, 1, 100),
    np.linspace(1, 10, 20),
    np.linspace(10, 11, 80)
])

# 변화량(기울기) 계산
gradient = np.diff(data)
threshold = 0.1  # 기울기 임계값

# 이상 지점 감지
anomalies = np.where(np.abs(gradient) > threshold)[0]

# 시각화
plt.figure(figsize=(10, 4))
plt.plot(data, label="Signal")
plt.scatter(anomalies, data[anomalies], color='red', label="Anomalies")
plt.title("Derivative-based Anomaly Detection")
plt.legend()
plt.grid(True)
plt.show()

변화량(기울기)을 감시하면 정상 범위 안의 급격한 변화도 잡을 수 있음

  • Moving Average + Change Point Detection
    • 기법: 일정 구간의 평균/분산과 현재 값을 비교 → 급변 시점 감지
    • 도구: ruptures, changefinder, Bayesian Change Point
    ruptures는 시계열 데이터에서 변화점(change point) 을 감지하는 데 사용되는 Python 라이브러리, 변화점 감지(change point detection)는 데이터의 통계적 성질(평균, 분산 등)이 변하는 시점을 찾아내는 것이 목적
import numpy as np
import ruptures as rpt
import matplotlib.pyplot as plt

# 시계열 신호 생성
signal = np.concatenate([
    np.random.normal(1, 0.1, 100),
    np.random.normal(5, 0.1, 100),
    np.random.normal(3, 0.1, 100)
])

# 모델 및 알고리즘 선택
model = "rbf"  # 비선형 변화 감지
algo = rpt.Pelt(model=model).fit(signal)
result = algo.predict(pen=10)  # pen 값으로 민감도 조절

# 시각화
rpt.display(signal, result)
plt.title("Ruptures - Change Point Detection")
plt.show()

이동 평균(Moving Average) 기반의 간단한 이상치 탐지(Anomaly Detection)

import numpy as np
import matplotlib.pyplot as plt

# 예시 시계열
data = np.concatenate([
    np.random.normal(0, 0.1, 100),
    np.random.normal(3, 0.1, 100)
])

# 이동 평균
window = 10
moving_avg = np.convolve(data, np.ones(window)/window, mode='same')

# 이동 평균 기준 변화량 계산
diff = np.abs(data - moving_avg)
threshold = 0.5  # 변화량 기준

# 이상 지점
anomalies = np.where(diff > threshold)[0]

# 시각화
plt.figure(figsize=(10, 4))
plt.plot(data, label="Signal")
plt.plot(moving_avg, label="Moving Average", linestyle='--')
plt.scatter(anomalies, data[anomalies], color='red', label="Anomalies")
plt.title("Moving Average + Change Detection")
plt.legend()
plt.grid(True)
plt.show()

ChangeFinder 알고리즘을 사용한 이상 탐지 / 변화 탐지

ruptures moving average 방식과는 다르게, 온라인(실시간) 환경에 적합하고, 통계 기반으로 설계된 알고리즘

import changefinder
import numpy as np
import matplotlib.pyplot as plt

# 시계열 데이터 생성
data = np.concatenate([
    np.random.normal(1, 0.1, 100),
    np.random.normal(5, 0.1, 100),
    np.random.normal(3, 0.1, 100)
])

# ChangeFinder 초기화
cf = changefinder.ChangeFinder(r=0.01, order=1, smooth=5)

# 점수 계산
scores = [cf.update(d) for d in data]

# 시각화
plt.figure(figsize=(10, 4))
plt.plot(data, label="Signal")
plt.plot(scores, label="Anomaly Score")
plt.title("ChangeFinder - Anomaly Detection")
plt.legend()
plt.show()

Bayesian Online Change Point Detection (BOCPD) 알고리즘을 사용하여 시계열 데이터에서 변화점(분포가 바뀌는 지점) 을 실시간으로 탐지

BOCPD는 변화가 일어날 확률을 베이지안적으로 추론하며, 새로운 관측마다 변화 가능성을 업데이트하여 실시간으로 변화점을 탐지하는 강력한 알고리즘(ARIMA ⇒ AR기반으로 탐지한다.)

%matplotlib inline
import numpy as np
np.random.seed(555)
import matplotlib.pyplot as plt
import bocd

test_signal = np.concatenate(
    [np.random.normal(0.7, 0.05, 300), 
     np.random.normal(1.5, 0.05, 300),
     np.random.normal(0.6, 0.05, 300),
     np.random.normal(1.3, 0.05, 300)])
plt.plot(test_signal)


bc = bocd.BayesianOnlineChangePointDetection(bocd.ConstantHazard(300), bocd.StudentT(mu=0, kappa=1, alpha=1, beta=1))


rt_mle = np.empty(test_signal.shape)
for i, d in enumerate(test_signal):
    bc.update(d)
    rt_mle[i] = bc.rt 

plt.plot(test_signal, alpha=0.5, label="observation")
index_changes = np.where(np.diff(rt_mle)<0)[0]
plt.scatter(index_changes, test_signal[index_changes], c='green', label="change point")

Variance-based Rule

  • Variance-based Rule은 일정한 기간(슬라이딩 윈도우 또는 구간) 동안의 분산(혹은 표준편차) 을 계산하고,
  • 그 분산이 사전에 설정한 임계값(threshold)을 초과하면,“여기서 뭔가 변화가 생겼다(또는 이상하다)”
  • 라고 판단하는 방식입니다.

왜 분산을 기준으로 삼는가?

  • 정상 구간은 일반적으로 분산이 작고 일정합니다.
  • 이상 구간은 평균이 바뀌거나, 급격한 값의 진동이 생기며 분산이 커집니다.
  • 따라서 분산이 갑자기 튀는 지점을 이상 또는 변화점으로 탐지할 수 있습니다.

예시 알고리즘 흐름

python
복사편집
1. 일정 길이의 윈도우를 설정 (예: 20개 샘플)
2. 각 윈도우마다 분산(variance) 또는 표준편차(std)를 계산
3. 기준 분산 또는 임계값과 비교
4. 기준보다 높으면 이상 or 변화점으로 판단
  • Guassian Tail Probability정규분포(Gaussian distribution) 를 따르는 데이터에서,
  • 평균으로부터 특정 거리 이상 떨어진 값이 나올 확률
import numpy as np
import matplotlib.pyplot as plt

# 정상 데이터 + 분산 증가
data = np.concatenate([
    np.random.normal(10, 0.2, 100),
    np.random.normal(10, 2.0, 100)  # 분산이 급격히 증가
])

# 이동 분산 계산 (rolling window)
window = 20
variances = [np.var(data[i-window:i]) if i >= window else 0 for i in range(len(data))]

# 임계값 기준 이상 감지
threshold = 1.0
anomalies = np.where(np.array(variances) > threshold)[0]

# 시각화
plt.figure(figsize=(10, 4))
plt.plot(data, label="Signal")
plt.plot(variances, label="Rolling Variance", color='orange')
plt.scatter(anomalies, data[anomalies], color='red', label="Variance-based Anomaly")
plt.title("Variance-based Rule Detection")
plt.legend()
plt.grid(True)
plt.show()

이 확률은 곧 "이 값이 정상 구간 밖에 있을 가능성" 이며,

일종의 이상치(outlier) 점수로 활용할 수 있어요.

  • 이상치 탐지: 꼬리 확률이 매우 작으면 "이 값은 정상 분포에서 거의 안 나오는 값 → 이상"
  • p-value 계산: 통계 검정에서 유의확률(p-value)을 계산할 때도 tail probability 사용
  • 확률적 스코어링: 확률 기반 이상 탐지에 사용
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm

x = np.linspace(-5, 5, 1000)
y = norm.pdf(x, 0, 1)

plt.plot(x, y, label='Gaussian PDF')
plt.fill_between(x, y, where=(x > 2), color='red', alpha=0.3, label='Right Tail (Z > 2)')
plt.fill_between(x, y, where=(x < -2), color='blue', alpha=0.3, label='Left Tail (Z < -2)')
plt.title("Gaussian Tail Probability")
plt.legend()
plt.grid(True)
plt.show()

DBSCAN

  • DBSCAN은 "점들이 얼마나 밀집되어 있는가"를 기준으로 클러스터를 자동으로 형성하고,
  • 밀도가 낮은 고립된 점들은 이상치(noise)로 간주하는 알고리즘
  • DBSCAN은 밀도 기반 클러스터링 알고리즘으로, 특히 이상치 탐지 비선형 구조 데이터 분리에 강한 특징을 가진 알고리즘

동작 원리 요약

  1. 모든 점에 대해 eps 거리 안의 이웃 점들을 찾음
  2. 이웃이 min_samples 이상이면 → Core Point로 간주 → 클러스터 생성 시작
  3. 이웃의 이웃들을 재귀적으로 연결하며 클러스터 확장
  4. 어떤 클러스터에도 속하지 않는 점은 Noise (이상치) 처리
import numpy as np
import matplotlib.pyplot as plt
from sklearn.cluster import DBSCAN

# 1. 데이터 생성
np.random.seed(0)
data = np.concatenate([
    np.random.normal(0, 0.5, 50),
    np.random.normal(5, 0.5, 50),
    np.array([10, 12, 15])  # 이상치
]).reshape(-1, 1)

# 2. DBSCAN 적용
dbscan = DBSCAN(eps=0.8, min_samples=5)
labels = dbscan.fit_predict(data)

# 3. 이상치 위치 (label == -1)
outliers = np.where(labels == -1)[0]

# 4. 시각화
plt.figure(figsize=(10, 4))
plt.plot(data, 'bo', label="Normal")
plt.plot(outliers, data[outliers], 'ro', label="Outliers")
plt.title("DBSCAN-based Anomaly Detection")
plt.legend()
plt.grid(True)
plt.show()