Обнаружение скомпрометированных учетных записей с HIBP и Wazuh

Утечки данных и скомпрометированные учетные данные стали постоянной угрозой в сфере кибербезопасности, раскрывая конфиденциальную информацию, такую как имена пользователей, пароли и электронные адреса. Когда злоумышленники получают доступ к этим данным, они могут использовать их для несанкционированного доступа, фишинговых атак или кражи личных данных. Риск для бизнеса и отдельных пользователей является значительным, независимо от того, идет ли речь об учетных данных, скомпрометированных через сторонние сервисы, или о внутренней конфиденциальной информации. Быстрое выявление и реагирование на скомпрометированные учетные записи является критически важным для минимизации потенциальных убытков, предотвращения несанкционированного доступа и защиты конфиденциальной информации.

Wazuh, платформа безопасности с открытым исходным кодом, предлагает возможности для мониторинга и обнаружения скомпрометированных аккаунтов в режиме реального времени. Интегрируясь с внешними источниками разведки угроз, такими как базы данных о нарушениях и инструменты мониторинга даркнета, Wazuh может проактивно оповещать команды безопасности, когда возникает подозрение, что учетная запись скомпрометирована.

В этой статье рассматривается, как Wazuh обнаруживает скомпрометированные аккаунты с помощью платформы Have I Been Pwned (HIBP). HIBP – это онлайн-платформа, которая позволяет частным лицам и организациям проверять, не подвергались ли их адреса электронной почты или пароли известным утечкам данных. Здесь речь идет о том, как настроить автоматическое обнаружение нарушений и лучшие практики реагирования на такие инциденты.

Инфраструктура

Для демонстрации обнаружения скомпрометированных аккаунтов с помощью Wazuh используется следующая инфраструктура:

  • Предустановленная, готовая к использованию последняя доступная версия Wazuh, которая включает центральные компоненты Wazuh (сервер Wazuh, индексатор и дешборд). Чтобы загрузить и настроить Wazuh в виртуальной среде, стоит следовать этому руководству.
  • Ключ API Have I Been Pwned. Для этого нужна подписка на один из доступных сервисов.

Конфигурация

Зависимости Python

Примечание: Используется Python 3.8, поскольку он считается одной из самых стабильных версий.

1. Установить зависимости Python на сервер Wazuh с помощью команд ниже:

# sudo amazon-linux-extras enable python3.8
# sudo yum install python3.8

2. Убедиться, что установлена версия Python выше 3.8.

# python3.8 --version

Результат:

Python 3.8.20

3. Установить Pip:

# curl -O https://bootstrap.pypa.io/get-pip.py
# sudo python3.8 get-pip.py

4. Установить requests и понизить версию urllib3:

# python3.8 -m pip install requests
# python3.8 -m pip uninstall urllib3
# python3.8 -m pip install urllib3==1.26.15

Нужно понизить версию urllib3, поскольку текущая версия требует OpenSSL 1.1.1+, а на сервере Wazuh установлена старая версия OpenSSL (1.0.2). Обновление OpenSSL может вызвать некоторые проблемы совместимости для Python-скрипта.

Скрипт Python

1. Создать скрипт hibp.py в папке /home/wazuh-user/hibp.py сервера Wazuh:

import requests
import time
import json
import os
from datetime import datetime, timedelta

# Configuration
API_KEY = "<YOUR_HIBP_API_KEY>"  # Replace with your HIBP API key
EMAIL_LIST_FILE = "/home/wazuh-user/email_list.txt"  # File containing email addresses (one per line)
OUTPUT_LOG_FILE = "/var/log/hibp_breach_checks.log"  # Log file path
CACHE_FILE = "hibp_cache.json"  # To store recently checked emails and avoid redundant checks
BREACH_DETAILS_CACHE_FILE = "breach_details_cache.json"  # Cache for breach descriptions
RATE_LIMIT_DELAY = 60  # Seconds to wait between API calls to respect rate limits
CACHE_EXPIRATION_DAYS = 6  # How long to consider cached results valid

# Ensure the output log file directory exists
os.makedirs(os.path.dirname(OUTPUT_LOG_FILE), exist_ok=True)

# Load cache (if exists and valid)
if os.path.exists(CACHE_FILE):
    try:
        with open(CACHE_FILE, "r") as f:
            cache = json.load(f)
    except (json.JSONDecodeError, ValueError):
        print("Cache file is empty or corrupted. Initializing empty cache.")
        cache = {}
else:
    cache = {}

# Load breach details cache (if exists and valid)
if os.path.exists(BREACH_DETAILS_CACHE_FILE):
    try:
        with open(BREACH_DETAILS_CACHE_FILE, "r") as f:
            breach_details_cache = json.load(f)
    except (json.JSONDecodeError, ValueError):
        print("Breach details cache file is empty or corrupted. Initializing empty cache.")
        breach_details_cache = {}
else:
    breach_details_cache = {}

def save_cache():
    """Save the email check cache to a file."""
    with open(CACHE_FILE, "w") as f:
        json.dump(cache, f)

def save_breach_details_cache():
    """Save the breach details cache to a file."""
    with open(BREACH_DETAILS_CACHE_FILE, "w") as f:
        json.dump(breach_details_cache, f)

def is_recently_checked(email):
    """Check if an email has been checked recently."""
    if email in cache:
        last_checked = datetime.strptime(cache[email]["last_checked"], "%Y-%m-%dT%H:%M:%S")
        if datetime.now() - last_checked < timedelta(days=CACHE_EXPIRATION_DAYS):
            return True
    return False

def get_breach_details(breach_name):
    """Fetch detailed information about a breach, using cache if available."""
    if breach_name in breach_details_cache:
        return breach_details_cache[breach_name]

    url = f"https://haveibeenpwned.com/api/v3/breach/{breach_name}"
    headers = {"hibp-api-key": API_KEY, "User-Agent": "HIBP-Wazuh-Integration"}
    response = requests.get(url, headers=headers)

    if response.status_code == 200:
        breach_data = response.json()
        description = breach_data.get("Description", "No description available")
        breach_details_cache[breach_name] = description
        save_breach_details_cache()
        return description
    else:
        return "No description available"

def log_breach_info(email, breach, description):
    """Log each breach separately with its description to a file."""
    log_entry = {
        "email": email,
        "breaches": {breach: description},
        "last_checked": datetime.now().strftime("%Y-%m-%dT%H:%M:%S"),
        "Source": "hibpwned"
    }
    with open(OUTPUT_LOG_FILE, "a") as log_file:
        log_file.write(json.dumps(log_entry) + "\n")

def check_email_breaches(email):
    """Check if an email has been breached using the HIBP API."""
    url = f"https://haveibeenpwned.com/api/v3/breachedaccount/{email}"
    headers = {"hibp-api-key": API_KEY, "User-Agent": "HIBP-Wazuh-Integration"}
    response = requests.get(url, headers=headers)

    if response.status_code == 200:
        breaches = response.json()
        breach_names = [breach['Name'] for breach in breaches]  # Extract breach names
        cache[email] = {"breaches": breach_names, "last_checked": datetime.now().strftime("%Y-%m-%dT%H:%M:%S")}

        for breach in breach_names:
            description = get_breach_details(breach)
            log_breach_info(email, breach, description)

        print(f"{email} found in breaches: {breach_names}")
    elif response.status_code == 404:
        print(f"{email} not found in any breaches.")
        cache[email] = {"breaches": [], "last_checked": datetime.now().strftime("%Y-%m-%dT%H:%M:%S")}
    else:
        print(f"Error checking {email}: {response.status_code} - {response.text}")

def main():
    """Main function to read emails and check breaches."""
    with open(EMAIL_LIST_FILE, "r") as f:
        emails = [line.strip() for line in f.readlines()]

    for email in emails:
        if is_recently_checked(email):
            print(f"Skipping recently checked email: {email}")
            continue

        check_email_breaches(email)
        save_cache()
        time.sleep(RATE_LIMIT_DELAY)

if __name__ == "__main__":
    main()

Можно настроить следующие значения в соответствии с нужными требованиями:

  • API_KEY = “<YOUR_HIBP_API_KEY>”: Заменить <YOUR_HIBP_API_KEY> на необходимый ключ HIBP API.
  • EMAIL_LIST_FILE = “/home/wazuh-user/email_list.txt”: Этот файл содержит адреса электронной почты, на которые есть ссылки (по одному в строке).
  • OUTPUT_LOG_FILE = “/var/log/hibp_breach_checks.log”: Это путь к файлу журнала. Он будет включен в конфигурационный файл Wazuh /var/ossec/etc/ossec.conf для мониторинга.
  • CACHE_FILE = “hibp_cache.json”: Этот файл сохраняет недавно проверенные письма, чтобы избежать лишних проверок.
  • BREACH_DETAILS_CACHE_FILE = “breach_details_cache.json”: Этот кэш хранит описания нарушений.
  • RATE_LIMIT_DELAY = 60: Можно заменить это значение, чтобы настроить количество секунд ожидания между вызовами API. Стоит обратить внимание на то, что HIPB имеет ограничение на 10 вызовов API в минуту.
  • CACHE_EXPIRATION_DAYS = 7: это значение показывает, сколько времени требуется, чтобы считать кэшированные результаты действительными.

2. Установить правильные разрешения:

# sudo chown root:wazuh-user /home/wazuh-user/hibp.py
# sudo chmod 750 /home/wazuh-user/hibp.py

3. Создать список адресов электронной почты для скрипта в каталоге /home/wazuh-user/ и сохранить название файла как email_list.txt. В этом файле будут храниться адреса электронной почты пользователей, для которых нужно осуществлять регулярные проверки.

Задания cron

Создать задание cron, чтобы выполнять этот скрипт каждые 7 дней (каждую неделю).

1. Запустить следующую команду, чтобы открыть таблицу вкладок cron для текущего пользователя:

# export VISUAL=nano
# export EDITOR=nano
# crontab -e

2. Добавить следующую строку в таблицу crontab:

0 0 */7 * * /usr/bin/python3.8 /home/wazuh-user/hibp.py

Приведенная выше команда задания cron запускает скрипт /home/wazuh-user/hibp.py в полночь (00:00) каждые 7 дней.

Сохранить и выйти.

Правила

1. Добавить следующие правила в файл /var/ossec/etc/rules/local_rules.xml:

<group name="hibp, compromised_accounts, security">
  <rule id="100100" level="10">
      <decoded_as>json</decoded_as>
      <match>hibpwned</match>
      <description>Potential data breach detected for monitored email</description>
  </rule>
</group>

Сбор логов

1. Добавить следующую команду в блок <ossec_config> конфигурационного файла сервера Wazuh /var/ossec/etc/ossec.conf:

  <localfile>
    <log_format>json</log_format>
    <location>/var/log/hibp_breach_checks.log</location>
  </localfile>

Где:

  • <log_format>: представляет формат полученного лога.
  • <location>: представляет путь к исходному журналу для скрипта Python hibp.py.

2. Перезапустить службу менеджера Wazuh:

# systemctl restart wazuh-manager

Визуализация уведомлений на дешборде Wazuh

Созданный ранее cronjob будет автоматически выполнять скрипт hibp.py каждую неделю, используя список адресов электронной почты email_list.txt в качестве основы. Всегда можно изменить частоту выполнения и добавить больше адресов в существующий список, чтобы охватить как можно большее количество адресов.

1. После выполнения скрипта нужно перейти на страницу Threat Hunting на дешборде Wazuh.

2. Перейти на вкладку Events и нажать + Add filter. В поле Field выбрать rule.id, в поле Operator выбрать is, а в поле Value ввести 100100.

3. Нажать Save, чтобы включить фильтр.

compromised-accounts-alerts

4. Нажать Inspect document details, чтобы просмотреть детали журналов.

compromised-accounts-alerts-wazuh-dashboard

Вывод

Обнаружение скомпрометированных аккаунтов является важным элементом стратегии кибербезопасности любой организации. Интегрируя внешние данные разведки о нарушениях с Wazuh, компании могут активно отслеживать утечку учетных данных и быстро реагировать на потенциальные угрозы.

Wazuh – это open-source платформа безопасности, которая предоставляет возможности для обнаружения и защиты цифровых активов от атак. Можно интегрировать развертывание Wazuh с внешними платформами для расширенного покрытия.

Подписаться на новости