Виявлення скомпрометованих облікових записів з HIBP та Wazuh

Витоки даних та скомпрометовані облікові дані стали постійною загрозою у сфері кібербезпеки, розкриваючи конфіденційну інформацію, таку як імена користувачів, паролі та електронні адреси. Коли зловмисники отримують доступ до цих даних, вони можуть використовувати їх для несанкціонованого доступу, фішингових атак або крадіжки особистих даних. Ризик для бізнесу та окремих користувачів є значним, незалежно від того, чи йдеться про облікові дані, скомпрометовані через сторонні сервіси, чи про внутрішню конфіденційну інформацію. Швидке виявлення та реагування на скомпрометовані облікові записи є критично важливим для мінімізації потенційних збитків, запобігання несанкціонованому доступу та захисту конфіденційної інформації.

Wazuh, платформа безпеки з відкритим вихідним кодом, пропонує можливості для моніторингу та виявлення скомпрометованих акаунтів у режимі реального часу. Інтегруючись із зовнішніми джерелами розвідки загроз, такими як бази даних про порушення та інструменти моніторингу даркнету, Wazuh може проактивно сповіщати команди безпеки, коли виникає підозра, що обліковий запис скомпрометовано.

У цій статті розглядається, як Wazuh виявляє скомпрометовані акаунти за допомогою платформи Have I Been Pwned (HIBP). HIBP – це онлайн-платформа, яка дозволяє приватним особам і організаціям перевіряти, чи не піддавалися їхні адреси електронної пошти або паролі відомим витокам даних. Тут йдеться про те, як налаштувати автоматичне виявлення порушень та найкращі практики реагування на такі інциденти.

Інфраструктура

Для демонстрації виявлення скомпрометованих акаунтів за допомогою Wazuh використовується наступна інфраструктура:

  • Попередньо встановлена, готова до використання остання доступна версія Wazuh, яка включає центральні компоненти Wazuh (сервер Wazuh, індексатор та дешборд). Щоб завантажити та налаштувати Wazuh у віртуальному середовищі, варто дотримуватися цього посібника.
  • Ключ API Have I Been Pwned. Для цього потрібна підписка на один з доступних сервісів.

Конфігурація

Залежності Python

Примітка: Використовується Python 3.8, оскільки він вважається однією з найстабільніших версій.

1. Встановити залежності Python на сервер Wazuh за допомогою команд нижче:

# sudo amazon-linux-extras enable python3.8
# sudo yum install python3.8

2. Переконатися, що встановлена версія Python вище 3.8.

# python3.8 --version

Результат:

Python 3.8.20

3. Встановити Pip:

# curl -O https://bootstrap.pypa.io/get-pip.py
# sudo python3.8 get-pip.py

4. Встановити requests та понизити версію urllib3:

# python3.8 -m pip install requests
# python3.8 -m pip uninstall urllib3
# python3.8 -m pip install urllib3==1.26.15

Потрібно понизити версію urllib3, оскільки поточна версія вимагає OpenSSL 1.1.1+, а на сервері Wazuh встановлено стару версію OpenSSL (1.0.2). Оновлення OpenSSL може спричинити деякі проблеми сумісності для Python-скрипту.

Скрипт Python

1. Створити скрипт hibp.py у папці /home/wazuh-user/hibp.py сервера Wazuh:

import requests
import time
import json
import os
from datetime import datetime, timedelta

# Configuration
API_KEY = "<YOUR_HIBP_API_KEY>"  # Replace with your HIBP API key
EMAIL_LIST_FILE = "/home/wazuh-user/email_list.txt"  # File containing email addresses (one per line)
OUTPUT_LOG_FILE = "/var/log/hibp_breach_checks.log"  # Log file path
CACHE_FILE = "hibp_cache.json"  # To store recently checked emails and avoid redundant checks
BREACH_DETAILS_CACHE_FILE = "breach_details_cache.json"  # Cache for breach descriptions
RATE_LIMIT_DELAY = 60  # Seconds to wait between API calls to respect rate limits
CACHE_EXPIRATION_DAYS = 6  # How long to consider cached results valid

# Ensure the output log file directory exists
os.makedirs(os.path.dirname(OUTPUT_LOG_FILE), exist_ok=True)

# Load cache (if exists and valid)
if os.path.exists(CACHE_FILE):
    try:
        with open(CACHE_FILE, "r") as f:
            cache = json.load(f)
    except (json.JSONDecodeError, ValueError):
        print("Cache file is empty or corrupted. Initializing empty cache.")
        cache = {}
else:
    cache = {}

# Load breach details cache (if exists and valid)
if os.path.exists(BREACH_DETAILS_CACHE_FILE):
    try:
        with open(BREACH_DETAILS_CACHE_FILE, "r") as f:
            breach_details_cache = json.load(f)
    except (json.JSONDecodeError, ValueError):
        print("Breach details cache file is empty or corrupted. Initializing empty cache.")
        breach_details_cache = {}
else:
    breach_details_cache = {}

def save_cache():
    """Save the email check cache to a file."""
    with open(CACHE_FILE, "w") as f:
        json.dump(cache, f)

def save_breach_details_cache():
    """Save the breach details cache to a file."""
    with open(BREACH_DETAILS_CACHE_FILE, "w") as f:
        json.dump(breach_details_cache, f)

def is_recently_checked(email):
    """Check if an email has been checked recently."""
    if email in cache:
        last_checked = datetime.strptime(cache[email]["last_checked"], "%Y-%m-%dT%H:%M:%S")
        if datetime.now() - last_checked < timedelta(days=CACHE_EXPIRATION_DAYS):
            return True
    return False

def get_breach_details(breach_name):
    """Fetch detailed information about a breach, using cache if available."""
    if breach_name in breach_details_cache:
        return breach_details_cache[breach_name]

    url = f"https://haveibeenpwned.com/api/v3/breach/{breach_name}"
    headers = {"hibp-api-key": API_KEY, "User-Agent": "HIBP-Wazuh-Integration"}
    response = requests.get(url, headers=headers)

    if response.status_code == 200:
        breach_data = response.json()
        description = breach_data.get("Description", "No description available")
        breach_details_cache[breach_name] = description
        save_breach_details_cache()
        return description
    else:
        return "No description available"

def log_breach_info(email, breach, description):
    """Log each breach separately with its description to a file."""
    log_entry = {
        "email": email,
        "breaches": {breach: description},
        "last_checked": datetime.now().strftime("%Y-%m-%dT%H:%M:%S"),
        "Source": "hibpwned"
    }
    with open(OUTPUT_LOG_FILE, "a") as log_file:
        log_file.write(json.dumps(log_entry) + "\n")

def check_email_breaches(email):
    """Check if an email has been breached using the HIBP API."""
    url = f"https://haveibeenpwned.com/api/v3/breachedaccount/{email}"
    headers = {"hibp-api-key": API_KEY, "User-Agent": "HIBP-Wazuh-Integration"}
    response = requests.get(url, headers=headers)

    if response.status_code == 200:
        breaches = response.json()
        breach_names = [breach['Name'] for breach in breaches]  # Extract breach names
        cache[email] = {"breaches": breach_names, "last_checked": datetime.now().strftime("%Y-%m-%dT%H:%M:%S")}

        for breach in breach_names:
            description = get_breach_details(breach)
            log_breach_info(email, breach, description)

        print(f"{email} found in breaches: {breach_names}")
    elif response.status_code == 404:
        print(f"{email} not found in any breaches.")
        cache[email] = {"breaches": [], "last_checked": datetime.now().strftime("%Y-%m-%dT%H:%M:%S")}
    else:
        print(f"Error checking {email}: {response.status_code} - {response.text}")

def main():
    """Main function to read emails and check breaches."""
    with open(EMAIL_LIST_FILE, "r") as f:
        emails = [line.strip() for line in f.readlines()]

    for email in emails:
        if is_recently_checked(email):
            print(f"Skipping recently checked email: {email}")
            continue

        check_email_breaches(email)
        save_cache()
        time.sleep(RATE_LIMIT_DELAY)

if __name__ == "__main__":
    main()

Можна налаштувати наступні значення відповідно до потрібних вимог:

  • API_KEY = “<YOUR_HIBP_API_KEY>”: Замінити <YOUR_HIBP_API_KEY> на необхідний ключ HIBP API.
  • EMAIL_LIST_FILE = “/home/wazuh-user/email_list.txt”: Цей файл містить адреси електронної пошти, на які є посилання (по одній у рядку).
  • OUTPUT_LOG_FILE = “/var/log/hibp_breach_checks.log”: Це шлях до файлу журналу. Він буде включений до конфігураційного файлу Wazuh /var/ossec/etc/ossec.conf для моніторингу.
  • CACHE_FILE = “hibp_cache.json”: Цей файл зберігає нещодавно перевірені листи, щоб уникнути зайвих перевірок.
  • BREACH_DETAILS_CACHE_FILE = “breach_details_cache.json”: Цей кеш зберігає описи порушень.
  • RATE_LIMIT_DELAY = 60: Можна замінити це значення, щоб налаштувати кількість секунд очікування між викликами API. Варто звернути увагу на те, що HIPB має обмеження на 10 викликів API на хвилину.
  • CACHE_EXPIRATION_DAYS = 7: Це значення показує, скільки часу потрібно, щоб вважати кешовані результати дійсними.

2. Встановити правильні дозволи:

# sudo chown root:wazuh-user /home/wazuh-user/hibp.py
# sudo chmod 750 /home/wazuh-user/hibp.py

3. Створити список адрес електронної пошти для скрипту в каталозі /home/wazuh-user/ і зберегти назву файлу як email_list.txt. У цьому файлі будуть зберігатися адреси електронної пошти користувачів, для яких потрібно здійснювати регулярні перевірки.

Завдання cron

Створити завдання cron, щоб виконувати цей скрипт кожні 7 днів (щотижня).

1. Запустити наступну команду, щоб відкрити таблицю вкладок cron для поточного користувача:

# export VISUAL=nano
# export EDITOR=nano
# crontab -e

2. Додати наступний рядок до таблиці crontab:

0 0 */7 * * /usr/bin/python3.8 /home/wazuh-user/hibp.py

Наведена вище команда завдання cron запускає скрипт /home/wazuh-user/hibp.py опівночі (00:00) кожні 7 днів.

Зберегти й вийти.

Правила

1. Додати наступні правила до файлу /var/ossec/etc/rules/local_rules.xml:

<group name="hibp, compromised_accounts, security">
  <rule id="100100" level="10">
      <decoded_as>json</decoded_as>
      <match>hibpwned</match>
      <description>Potential data breach detected for monitored email</description>
  </rule>
</group>

Збирання логів

1. Додати наступну команду до блоку <ossec_config> конфігураційного файлу сервера Wazuh /var/ossec/etc/ossec.conf:

  <localfile>
    <log_format>json</log_format>
    <location>/var/log/hibp_breach_checks.log</location>
  </localfile>

Де:

  • <log_format>: представляє формат отриманого логу.
  • <location>: представляє шлях до вихідного журналу для скрипту Python hibp.py.

2. Перезапустити службу менеджера Wazuh:

# systemctl restart wazuh-manager

Візуалізація сповіщень на дешборді Wazuh

Створений раніше cronjob автоматично виконуватиме скрипт hibp.py щотижня, використовуючи список адрес електронної пошти email_list.txt як основу. Завжди можна змінити частоту виконання і додати більше адрес до наявного списку, щоб охопити якомога більшу кількість адрес.

1. Після виконання скрипту потрібно перейти на сторінку Threat Hunting на дешборді Wazuh.

2. Перейти на вкладку Events й натиснути + Add filter. У полі Field обрати rule.id, у полі Operator обрати is, а в полі Value ввести 100100.

3. Натиснути Save, щоб увімкнути фільтр.

compromised-accounts-alerts

4. Натиснути Inspect document details, щоб переглянути деталі журналів.

compromised-accounts-alerts-wazuh-dashboard

Висновок

Виявлення скомпрометованих акаунтів є важливим елементом стратегії кібербезпеки будь-якої організації. Інтегруючи зовнішні дані розвідки про порушення з Wazuh, компанії можуть активно відстежувати витік облікових даних і швидко реагувати на потенційні загрози.

Wazuh – це open-source платформа безпеки, яка надає можливості для виявлення та захисту цифрових активів від атак. Можна інтегрувати розгортання Wazuh із зовнішніми платформами для розширеного покриття.

Підписатися на новини