Утечки данных и скомпрометированные учетные данные стали постоянной угрозой в сфере кибербезопасности, раскрывая конфиденциальную информацию, такую как имена пользователей, пароли и электронные адреса. Когда злоумышленники получают доступ к этим данным, они могут использовать их для несанкционированного доступа, фишинговых атак или кражи личных данных. Риск для бизнеса и отдельных пользователей является значительным, независимо от того, идет ли речь об учетных данных, скомпрометированных через сторонние сервисы, или о внутренней конфиденциальной информации. Быстрое выявление и реагирование на скомпрометированные учетные записи является критически важным для минимизации потенциальных убытков, предотвращения несанкционированного доступа и защиты конфиденциальной информации.
Wazuh, платформа безопасности с открытым исходным кодом, предлагает возможности для мониторинга и обнаружения скомпрометированных аккаунтов в режиме реального времени. Интегрируясь с внешними источниками разведки угроз, такими как базы данных о нарушениях и инструменты мониторинга даркнета, Wazuh может проактивно оповещать команды безопасности, когда возникает подозрение, что учетная запись скомпрометирована.
В этой статье рассматривается, как Wazuh обнаруживает скомпрометированные аккаунты с помощью платформы Have I Been Pwned (HIBP). HIBP – это онлайн-платформа, которая позволяет частным лицам и организациям проверять, не подвергались ли их адреса электронной почты или пароли известным утечкам данных. Здесь речь идет о том, как настроить автоматическое обнаружение нарушений и лучшие практики реагирования на такие инциденты.
Инфраструктура
Для демонстрации обнаружения скомпрометированных аккаунтов с помощью Wazuh используется следующая инфраструктура:
- Предустановленная, готовая к использованию последняя доступная версия Wazuh, которая включает центральные компоненты Wazuh (сервер Wazuh, индексатор и дешборд). Чтобы загрузить и настроить Wazuh в виртуальной среде, стоит следовать этому руководству.
- Ключ API Have I Been Pwned. Для этого нужна подписка на один из доступных сервисов.
Конфигурация
Зависимости Python
Примечание: Используется Python 3.8, поскольку он считается одной из самых стабильных версий.
1. Установить зависимости Python на сервер Wazuh с помощью команд ниже:
# sudo amazon-linux-extras enable python3.8
# sudo yum install python3.8
2. Убедиться, что установлена версия Python выше 3.8.
# python3.8 --version
Результат:
Python 3.8.20
3. Установить Pip:
# curl -O https://bootstrap.pypa.io/get-pip.py
# sudo python3.8 get-pip.py
4. Установить requests и понизить версию urllib3:
# python3.8 -m pip install requests
# python3.8 -m pip uninstall urllib3
# python3.8 -m pip install urllib3==1.26.15
Нужно понизить версию urllib3, поскольку текущая версия требует OpenSSL 1.1.1+, а на сервере Wazuh установлена старая версия OpenSSL (1.0.2). Обновление OpenSSL может вызвать некоторые проблемы совместимости для Python-скрипта.
Скрипт Python
1. Создать скрипт hibp.py в папке /home/wazuh-user/hibp.py сервера Wazuh:
import requests
import time
import json
import os
from datetime import datetime, timedelta
# Configuration
API_KEY = "<YOUR_HIBP_API_KEY>" # Replace with your HIBP API key
EMAIL_LIST_FILE = "/home/wazuh-user/email_list.txt" # File containing email addresses (one per line)
OUTPUT_LOG_FILE = "/var/log/hibp_breach_checks.log" # Log file path
CACHE_FILE = "hibp_cache.json" # To store recently checked emails and avoid redundant checks
BREACH_DETAILS_CACHE_FILE = "breach_details_cache.json" # Cache for breach descriptions
RATE_LIMIT_DELAY = 60 # Seconds to wait between API calls to respect rate limits
CACHE_EXPIRATION_DAYS = 6 # How long to consider cached results valid
# Ensure the output log file directory exists
os.makedirs(os.path.dirname(OUTPUT_LOG_FILE), exist_ok=True)
# Load cache (if exists and valid)
if os.path.exists(CACHE_FILE):
try:
with open(CACHE_FILE, "r") as f:
cache = json.load(f)
except (json.JSONDecodeError, ValueError):
print("Cache file is empty or corrupted. Initializing empty cache.")
cache = {}
else:
cache = {}
# Load breach details cache (if exists and valid)
if os.path.exists(BREACH_DETAILS_CACHE_FILE):
try:
with open(BREACH_DETAILS_CACHE_FILE, "r") as f:
breach_details_cache = json.load(f)
except (json.JSONDecodeError, ValueError):
print("Breach details cache file is empty or corrupted. Initializing empty cache.")
breach_details_cache = {}
else:
breach_details_cache = {}
def save_cache():
"""Save the email check cache to a file."""
with open(CACHE_FILE, "w") as f:
json.dump(cache, f)
def save_breach_details_cache():
"""Save the breach details cache to a file."""
with open(BREACH_DETAILS_CACHE_FILE, "w") as f:
json.dump(breach_details_cache, f)
def is_recently_checked(email):
"""Check if an email has been checked recently."""
if email in cache:
last_checked = datetime.strptime(cache[email]["last_checked"], "%Y-%m-%dT%H:%M:%S")
if datetime.now() - last_checked < timedelta(days=CACHE_EXPIRATION_DAYS):
return True
return False
def get_breach_details(breach_name):
"""Fetch detailed information about a breach, using cache if available."""
if breach_name in breach_details_cache:
return breach_details_cache[breach_name]
url = f"https://haveibeenpwned.com/api/v3/breach/{breach_name}"
headers = {"hibp-api-key": API_KEY, "User-Agent": "HIBP-Wazuh-Integration"}
response = requests.get(url, headers=headers)
if response.status_code == 200:
breach_data = response.json()
description = breach_data.get("Description", "No description available")
breach_details_cache[breach_name] = description
save_breach_details_cache()
return description
else:
return "No description available"
def log_breach_info(email, breach, description):
"""Log each breach separately with its description to a file."""
log_entry = {
"email": email,
"breaches": {breach: description},
"last_checked": datetime.now().strftime("%Y-%m-%dT%H:%M:%S"),
"Source": "hibpwned"
}
with open(OUTPUT_LOG_FILE, "a") as log_file:
log_file.write(json.dumps(log_entry) + "\n")
def check_email_breaches(email):
"""Check if an email has been breached using the HIBP API."""
url = f"https://haveibeenpwned.com/api/v3/breachedaccount/{email}"
headers = {"hibp-api-key": API_KEY, "User-Agent": "HIBP-Wazuh-Integration"}
response = requests.get(url, headers=headers)
if response.status_code == 200:
breaches = response.json()
breach_names = [breach['Name'] for breach in breaches] # Extract breach names
cache[email] = {"breaches": breach_names, "last_checked": datetime.now().strftime("%Y-%m-%dT%H:%M:%S")}
for breach in breach_names:
description = get_breach_details(breach)
log_breach_info(email, breach, description)
print(f"{email} found in breaches: {breach_names}")
elif response.status_code == 404:
print(f"{email} not found in any breaches.")
cache[email] = {"breaches": [], "last_checked": datetime.now().strftime("%Y-%m-%dT%H:%M:%S")}
else:
print(f"Error checking {email}: {response.status_code} - {response.text}")
def main():
"""Main function to read emails and check breaches."""
with open(EMAIL_LIST_FILE, "r") as f:
emails = [line.strip() for line in f.readlines()]
for email in emails:
if is_recently_checked(email):
print(f"Skipping recently checked email: {email}")
continue
check_email_breaches(email)
save_cache()
time.sleep(RATE_LIMIT_DELAY)
if __name__ == "__main__":
main()
Можно настроить следующие значения в соответствии с нужными требованиями:
- API_KEY = “<YOUR_HIBP_API_KEY>”: Заменить <YOUR_HIBP_API_KEY> на необходимый ключ HIBP API.
- EMAIL_LIST_FILE = “/home/wazuh-user/email_list.txt”: Этот файл содержит адреса электронной почты, на которые есть ссылки (по одному в строке).
- OUTPUT_LOG_FILE = “/var/log/hibp_breach_checks.log”: Это путь к файлу журнала. Он будет включен в конфигурационный файл Wazuh /var/ossec/etc/ossec.conf для мониторинга.
- CACHE_FILE = “hibp_cache.json”: Этот файл сохраняет недавно проверенные письма, чтобы избежать лишних проверок.
- BREACH_DETAILS_CACHE_FILE = “breach_details_cache.json”: Этот кэш хранит описания нарушений.
- RATE_LIMIT_DELAY = 60: Можно заменить это значение, чтобы настроить количество секунд ожидания между вызовами API. Стоит обратить внимание на то, что HIPB имеет ограничение на 10 вызовов API в минуту.
- CACHE_EXPIRATION_DAYS = 7: это значение показывает, сколько времени требуется, чтобы считать кэшированные результаты действительными.
2. Установить правильные разрешения:
# sudo chown root:wazuh-user /home/wazuh-user/hibp.py
# sudo chmod 750 /home/wazuh-user/hibp.py
3. Создать список адресов электронной почты для скрипта в каталоге /home/wazuh-user/ и сохранить название файла как email_list.txt. В этом файле будут храниться адреса электронной почты пользователей, для которых нужно осуществлять регулярные проверки.
Задания cron
Создать задание cron, чтобы выполнять этот скрипт каждые 7 дней (каждую неделю).
1. Запустить следующую команду, чтобы открыть таблицу вкладок cron для текущего пользователя:
# export VISUAL=nano
# export EDITOR=nano
# crontab -e
2. Добавить следующую строку в таблицу crontab:
0 0 */7 * * /usr/bin/python3.8 /home/wazuh-user/hibp.py
Приведенная выше команда задания cron запускает скрипт /home/wazuh-user/hibp.py в полночь (00:00) каждые 7 дней.
Сохранить и выйти.
Правила
1. Добавить следующие правила в файл /var/ossec/etc/rules/local_rules.xml:
<group name="hibp, compromised_accounts, security">
<rule id="100100" level="10">
<decoded_as>json</decoded_as>
<match>hibpwned</match>
<description>Potential data breach detected for monitored email</description>
</rule>
</group>
Сбор логов
1. Добавить следующую команду в блок <ossec_config> конфигурационного файла сервера Wazuh /var/ossec/etc/ossec.conf:
<localfile>
<log_format>json</log_format>
<location>/var/log/hibp_breach_checks.log</location>
</localfile>
Где:
- <log_format>: представляет формат полученного лога.
- <location>: представляет путь к исходному журналу для скрипта Python hibp.py.
2. Перезапустить службу менеджера Wazuh:
# systemctl restart wazuh-manager
Визуализация уведомлений на дешборде Wazuh
Созданный ранее cronjob будет автоматически выполнять скрипт hibp.py каждую неделю, используя список адресов электронной почты email_list.txt в качестве основы. Всегда можно изменить частоту выполнения и добавить больше адресов в существующий список, чтобы охватить как можно большее количество адресов.
1. После выполнения скрипта нужно перейти на страницу Threat Hunting на дешборде Wazuh.
2. Перейти на вкладку Events и нажать + Add filter. В поле Field выбрать rule.id, в поле Operator выбрать is, а в поле Value ввести 100100.
3. Нажать Save, чтобы включить фильтр.

4. Нажать Inspect document details, чтобы просмотреть детали журналов.

Вывод
Обнаружение скомпрометированных аккаунтов является важным элементом стратегии кибербезопасности любой организации. Интегрируя внешние данные разведки о нарушениях с Wazuh, компании могут активно отслеживать утечку учетных данных и быстро реагировать на потенциальные угрозы.
Wazuh – это open-source платформа безопасности, которая предоставляет возможности для обнаружения и защиты цифровых активов от атак. Можно интегрировать развертывание Wazuh с внешними платформами для расширенного покрытия.







