Previously, we implemented a basic testing framework to start handling data errors, corrupt data, and edge cases. In Part 4, we’re going to kick it up a notch. It’s time to implement robust error handling for each discrete task!
We left off having implemented a basic testing framework to drive code implementation for edge cases and errors. Now we’re going to build up that framework by developing tests specifically for each discrete function and attempting to ensure we have solid error handling for anything that comes our way. We’re also going to implement log rotation while we’re at it, to ensure that there’s a reasonable amount of logs captured without taking up too much drive space. Let’s get going!
To see this project in its current state, head over to my repo and check it out: IntelliSIEM on GitHub
Writing Better Tests To Drive Implementation
One of the unwritten goals of this project is to develop the application while adhering to Test-Driven Development (TDD) principles as much as possible. This means writing tests first and then modifying our code to meet the expectations of those tests. One area where TDD is particularly effective is in implementing robust error handling. In this stage, the aim is to capture as many possible error scenarios in the data collection process as possible, such as invalid data types, missing keys, corrupt data, or empty responses. With those goals in mind, we need to refactor our tests, breaking related tests into their own files and adding more discrete tests to define better how the code should work. This can be a somewhat tedious process, so in the interest of not putting you through it all, here is the newly refactored test suite:
# IntelliSIEM Copyright 2024, Rob Perry
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
import requests
from src.api_client import APIClient, APIError
def test_api_client_success(mocker):
"""
Test API client handling a successful request.
This ensures that valid data is returned when the API call is successful.
"""
mock_response = [{'id': 'test_data'}]
mocker.patch('requests.get',
return_value=mocker.Mock(status_code=200, json=mocker.Mock(return_value=mock_response)))
client = APIClient("https://example.com/api", {"Authorization": "Bearer token"})
result = client.get_data("endpoint")
assert result == mock_response # Expecting mock response data
def test_api_client_timeout(mocker):
"""
Test API client handling of a timeout error.
This ensures that the appropriate exception and message are raised and logged.
"""
mocker.patch('requests.get', side_effect=requests.exceptions.Timeout)
client = APIClient("https://example.com/api", {"Authorization": "Bearer token"})
with pytest.raises(APIError, match="API request timed out."):
client.get_data("endpoint")
def test_api_client_rate_limit(mocker):
"""
Test API client handling of rate limiting.
This ensures that the APIError exception is raised when the rate limit is exceeded.
"""
mock_response = mocker.Mock(status_code=429, headers={"Retry-After": "5"})
mocker.patch('requests.get', return_value=mock_response)
client = APIClient("https://example.com/api", {"Authorization": "Bearer token"})
with pytest.raises(APIError, match="Rate limit exceeded."):
client.get_data("endpoint")
def test_api_client_invalid_json(mocker):
"""
Test API client handling of invalid JSON response.
This test simulates a ValueError when the JSON response is not valid.
"""
mock_response = mocker.Mock(status_code=200, json=mocker.Mock(side_effect=ValueError))
mocker.patch('requests.get', return_value=mock_response)
client = APIClient("https://example.com/api", {"Authorization": "Bearer token"})
with pytest.raises(APIError, match="Invalid response format."):
client.get_data("endpoint")
def test_api_client_connection_error(mocker):
"""
Test API client handling of a connection error.
This ensures that connection issues are properly caught and logged.
"""
mocker.patch('requests.get', side_effect=requests.ConnectionError)
client = APIClient("https://example.com/api", {"Authorization": "Bearer token"})
with pytest.raises(APIError, match="Connection error occurred."):
client.get_data("endpoint")
def test_api_client_authentication_error(mocker):
"""
Test API client handling of an authentication error.
This simulates an authentication failure with a 401 status code.
"""
mock_response = mocker.Mock(status_code=401)
mocker.patch('requests.get', return_value=mock_response)
client = APIClient("https://example.com/api", {"Authorization": "Invalid token"})
with pytest.raises(APIError, match="Authentication failed."):
client.get_data("endpoint")
def test_api_client_permission_error(mocker):
"""
Test API client handling of a permission error.
This simulates a permission failure with a 403 status code.
"""
mock_response = mocker.Mock(status_code=403)
mocker.patch('requests.get', return_value=mock_response)
client = APIClient("https://example.com/api", {"Authorization": "Bearer token"})
with pytest.raises(APIError, match="Permission denied."):
client.get_data("endpoint")Python# IntelliSIEM Copyright 2024, Rob Perry
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
from src.config import load_config, ConfigError
def test_load_config_success(mocker):
"""
Test successful loading of the configuration file.
This ensures that a valid configuration is loaded and returned correctly.
"""
mock_config = """
api_keys:
alien_vault: "VALID_ALIEN_VAULT_API_KEY"
virus_total: "VALID_VIRUS_TOTAL_API_KEY"
"""
mocker.patch('builtins.open', mocker.mock_open(read_data=mock_config))
config = load_config()
assert config['api_keys']['alien_vault'] == "VALID_ALIEN_VAULT_API_KEY"
assert config['api_keys']['virus_total'] == "VALID_VIRUS_TOTAL_API_KEY"
def test_load_config_missing_file():
"""
Test configuration loading with a missing file.
This ensures that a FileNotFoundError is raised when the config file does not exist.
"""
with pytest.raises(FileNotFoundError):
load_config('config/non_existent_config.yaml')
def test_load_config_invalid_format(mocker):
"""
Test configuration loading with an invalid format.
This simulates an invalid YAML format and expects a ConfigError.
"""
invalid_yaml = """
api_keys:
alien_vault: !!invalid
"""
mocker.patch('builtins.open', mocker.mock_open(read_data=invalid_yaml))
with pytest.raises(ConfigError, match="Configuration file error."):
load_config()
def test_load_config_permission_error(mocker):
"""
Test configuration loading with a permission error.
This simulates a PermissionError when accessing the config file.
"""
mock_open = mocker.patch('builtins.open', mocker.mock_open())
mock_open.side_effect = PermissionError
with pytest.raises(ConfigError, match="Permission denied for config file."):
load_config()
def test_load_config_disk_space_error(mocker):
"""
Test configuration loading with a disk space error.
This simulates an OSError due to lack of disk space.
"""
mock_open = mocker.patch('builtins.open', mocker.mock_open())
mock_open.side_effect = OSError("No space left on device")
with pytest.raises(ConfigError, match="Disk space issue while loading configuration."):
load_config()Python# IntelliSIEM Copyright 2024, Rob Perry
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# tests/test_data_collection.py
import pytest
from src.data_collection import fetch_threat_data, DataError
def test_fetch_threat_data_success(mocker):
"""
Test successful fetching of threat data.
This ensures that data is correctly collected from the APIs and returned.
"""
mock_data_alien = [{'id': 'test1'}]
mock_data_virus = [{'id': 'test2'}]
# Create separate mock instances for each APIClient
mock_alien_vault_client = mocker.Mock()
mock_alien_vault_client.get_data.return_value = mock_data_alien
mock_virus_total_client = mocker.Mock()
mock_virus_total_client.get_data.return_value = mock_data_virus
# Patch the APIClient constructor to return the respective mock instances
mocker.patch('src.data_collection.APIClient', side_effect=[mock_alien_vault_client, mock_virus_total_client])
threats = fetch_threat_data()
# Verify that the mock was called only once per client
mock_alien_vault_client.get_data.assert_called_once()
mock_virus_total_client.get_data.assert_called_once()
assert len(threats) == 2 # Expecting two entries: one from each mock data list
assert threats == [{'id': 'test1'}, {'id': 'test2'}]
def test_fetch_threat_data_missing_keys(mocker):
"""
Test data collection handling of missing required data keys.
This ensures that missing keys in the data trigger the appropriate exception.
"""
# Mock get_data to return an entry missing the required 'id' key
mocker.patch('src.api_client.APIClient.get_data', return_value=[{'missing_key': 'value'}])
with pytest.raises(DataError, match="Missing required data keys."):
fetch_threat_data()
def test_fetch_threat_data_corrupt_data(mocker):
"""
Test data collection handling of corrupt data.
This ensures that corrupt data is caught and triggers the DataError exception.
"""
# Mock get_data to return structurally corrupt data with an unexpected type for 'id'
mocker.patch('src.api_client.APIClient.get_data',
return_value=[{'id': {'unexpected': 'dict'}, 'nested_data': ['unexpected_list']}])
# Expect DataError to be raised for corrupt data
with pytest.raises(DataError, match="Corrupt data received."):
fetch_threat_data()
def test_fetch_threat_data_empty_response(mocker):
"""
Test fetching threat data with an empty response.
This ensures that the function handles empty responses gracefully.
"""
mocker.patch('src.api_client.APIClient.get_data', return_value=[])
threats = fetch_threat_data()
assert threats == [] # Expecting an empty list when no data is returned
def test_fetch_threat_data_invalid_data_type(mocker):
"""
Test data collection handling of invalid data type.
This simulates an unexpected data type from the API response.
"""
# Mock get_data to return a non-list data type, such as a string
mocker.patch('src.api_client.APIClient.get_data', return_value="invalid_data")
with pytest.raises(DataError, match="Unexpected data type received."):
fetch_threat_data()Python# IntelliSIEM Copyright 2024, Rob Perry
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
from src.error_handling import APIError, ConfigError, DataError, log_error
import logging
def test_api_error():
"""
Test that the APIError exception can be raised and caught correctly.
"""
with pytest.raises(APIError, match="API issue occurred"):
raise APIError("API issue occurred.")
def test_config_error():
"""
Test that the ConfigError exception can be raised and caught correctly.
"""
with pytest.raises(ConfigError, match="Configuration issue occurred"):
raise ConfigError("Configuration issue occurred.")
def test_data_error():
"""
Test that the DataError exception can be raised and caught correctly.
"""
with pytest.raises(DataError, match="Data issue occurred"):
raise DataError("Data issue occurred.")
def test_log_error(mocker):
"""
Test that the log_error function logs an error message correctly.
"""
mocker.patch('logging.error')
log_error("Test error message.")
logging.error.assert_called_once_with("Test error message.")
def test_log_warning(mocker):
"""
Test that the log_error function logs a warning message correctly.
"""
mocker.patch('logging.warning')
log_error("Test warning message.", level="WARNING")
logging.warning.assert_called_once_with("Test warning message.")
def test_log_info(mocker):
"""
Test that the log_error function logs an informational message correctly.
"""
mocker.patch('logging.info')
log_error("Test info message.", level="INFO")
logging.info.assert_called_once_with("Test info message.")Python# IntelliSIEM Copyright 2024, Rob Perry
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
from logging.handlers import RotatingFileHandler
import pytest
@pytest.fixture
def setup_rotating_log():
"""
Set up a rotating log handler for testing.
"""
# Define the log file path and cleanup if exists
log_file = 'data/test_error.log'
for i in range(6):
try:
os.remove(f"{log_file}.{i}")
except FileNotFoundError:
pass
try:
os.remove(log_file)
except FileNotFoundError:
pass
# Set up rotating file handler
handler = RotatingFileHandler(log_file, maxBytes=200, backupCount=5)
handler.setLevel(logging.ERROR)
handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s: %(message)s'))
logger = logging.getLogger('test_logger')
logger.setLevel(logging.ERROR)
logger.addHandler(handler)
return logger
def test_log_rotation(setup_rotating_log):
"""
Test log rotation by generating log entries to exceed maxBytes limit.
"""
logger = setup_rotating_log
log_message = "This is a test log message to fill up the log file and trigger rotation."
log_file = 'data/test_error.log'
# Write multiple log entries to exceed the maxBytes limit
for _ in range(50): # Increase number of entries if needed
logger.error(log_message)
# Ensure the main log file and at least one backup exist
assert os.path.exists(log_file), f"Log file {log_file} does not exist."
assert os.path.exists(f"{log_file}.1"), f"Backup log file {log_file}.1 does not exist."PythonI know there’s a lot there, and the TDD purists among you are probably having an apoplectic fit at the moment (I can hear it now: You can’t write all these tests in one shot and be doing TDD right!). Please don’t misunderstand me, the design and implementation of these tests took considerable time and didn’t happen in a vacuum. Each test was designed individually, with the satisfying function developed and modified after each test. Sometimes, the original tests were inadequate or poorly designed and needed reworking. Other tests made a previous implementation fail a prior test and required rethinking the overall approach to allow both tests to pass. This is the most fun and, honestly, the most challenging part for me. I love puzzles, and trying to figure out how to make all these tests simultaneously true is a giant puzzle. The hardest part was probably making the data collection tests pass, specifically the tests for corrupt data, empty responses, and invalid data types. Finding a way through that maze to ensure all three were covered took the most time.
Writing Better Functions Based On Tests
Remember, if you run PyTest right now, there will be a lot of failures because none of the new tests have complementary implementations to meet the requirements. Now that you can see how the tests are designed and what we’re testing for let’s take a look at how each one was solved.
Let’s start by taking a look at error_handling.py where I added some new custom exceptions and updated the logging to handle different error levels:
# IntelliSIEM Copyright 2024, Rob Perry
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from logging.handlers import RotatingFileHandler
# Set up rotating file handler
handler = RotatingFileHandler('data/error.log', maxBytes=1000000, backupCount=5)
handler.setLevel(logging.ERROR)
handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s: %(message)s'))
# Add the handler to the root logger
logging.basicConfig(level=logging.ERROR, handlers=[handler])
class APIError(Exception):
"""Custom exception for API-related errors."""
pass
class ConfigError(Exception):
"""Custom exception for configuration-related errors."""
pass
class DataError(Exception):
"""Custom exception for data processing errors."""
pass
def log_error(message, level="ERROR"):
"""
Log error messages to a file with severity level.
:param message: (str) The error message to log.
:param level: (str) The severity level (default is "ERROR").
"""
if level == "WARNING":
logging.warning(message)
elif level == "INFO":
logging.info(message)
else:
logging.error(message)PythonAlong with the new exceptions and better logging, I also added the rotating file handler to keep log files under control. While the test handler was built with a 200KB, the actual implementation uses a 1MB cutoff to trigger rotation.
The next area of focus was the config.py implementation. Here, the focus was predominantly on config file issues such as a missing file, insufficient permissions, or parsing errors.
# IntelliSIEM Copyright 2024, Rob Perry
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import yaml
from src.error_handling import log_error, ConfigError
def load_config(config_file='config/config.yaml'):
"""
Load configuration from the specified YAML file.
:param config_file: (str) The path to the YAML configuration file.
:return: dict: Configuration data.
:raises ConfigError: If there is an error with the configuration file.
"""
try:
with open(config_file, 'r') as file:
return yaml.safe_load(file)
except yaml.YAMLError as e:
log_error(f"YAML parsing error while loading configuration: {e}.")
raise ConfigError(f"Configuration file error: {e}.")
except FileNotFoundError:
log_error(f"Configuration file not found: {config_file}.")
raise FileNotFoundError(f"Configuration file not found: {config_file}.")
except PermissionError:
log_error(f"Permission denied for config file: {config_file}.")
raise ConfigError("Permission denied for config file.")
except OSError as e:
log_error(f"Disk space issue while loading configuration: {e}.")
raise ConfigError("Disk space issue while loading configuration.")PythonI anticipate quite a bit more work in this implementation when I circle back to implement stronger security controls around the API keys.
The API client itself took quite a bit of reworking, especially when working through the different exceptions and how to isolate the tests correctly to ensure I was getting valid results:
# IntelliSIEM Copyright 2024, Rob Perry
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import requests
from src.error_handling import APIError, log_error
class APIClient:
"""
A client for interacting with various threat intelligence APIs.
"""
def __init__(self, base_url, headers):
"""
Initialize the API client with the base URL and headers.
:param base_url: (str) The base URL of the API.
:param headers: (dict) Headers required for API requests.
"""
self.base_url = base_url
self.headers = headers
def get_data(self, endpoint, params=None):
"""
Fetch data from the specified endpoint.
:param endpoint: (str) The API endpoint to fetch data from.
:param params: (dict) Optional query parameters.
:return: (dict) JSON response from the API.
:raises APIError: If there is an error with the API request.
"""
url = f"{self.base_url}/{endpoint}"
try:
response = requests.get(url, headers=self.headers, params=params, timeout=10)
if response.status_code == 429:
retry_after = response.headers.get("Retry-After", "unknown time")
log_error(f"Rate limit exceeded. Retry after: {retry_after}.")
raise APIError("Rate limit exceeded.")
if response.status_code == 401:
log_error("Authentication failed: Invalid API key.")
raise APIError("Authentication failed.")
if response.status_code == 403:
log_error("Permission denied: Access to resource is forbidden.")
raise APIError("Permission denied.")
response.raise_for_status()
return response.json()
except requests.exceptions.Timeout:
log_error("API request timed out.")
raise APIError("API request timed out.")
except requests.exceptions.ConnectionError:
log_error("Connection error occurred.")
raise APIError("Connection error occurred.")
except requests.exceptions.RequestException as e:
log_error(f"API request failed: {e}.")
raise APIError(f"API request failed: {e}.")
except ValueError:
log_error("Invalid JSON response format from API.")
raise APIError("Invalid response format.")PythonThis particular piece of the implementation should be pretty close to wrapped up now, but I won’t feel 100% about it until after some more “live-fire” testing.
The final piece that took a lot of reworking is the data collection function, especially the data validation. Getting the proper order and distinguishing between missing and corrupt data was particularly challenging and effectively required nearly a complete rewrite of the function. The great thing about it, as it is now, is that after I got it to this point, it looks like it’ll be a fairly straightforward endeavour to refactor again and make it relatively easy to allow for adding and removing threat feeds based on user preference.
# IntelliSIEM Copyright 2024, Rob Perry
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from src.api_client import APIClient
from src.error_handling import APIError, DataError, log_error
from src.config import load_config
config = load_config()
def fetch_threat_data():
"""
Fetch threat data from various sources and return the combined result.
:return: (list) A list of threat intelligence data.
:raises DataError: If there is an error with data integrity or structure.
"""
try:
alien_vault = APIClient("https://otx.alienvault.com/api/v1",
{"X-OTX-API-KEY": config['api_keys']['alien_vault']})
vt = APIClient("https://www.virustotal.com/api/v3", {"x-apikey": config['api_keys']['virus_total']})
alien_vault_data = alien_vault.get_data("indicators/export")
vt_data = vt.get_data("files", params={"limit": 10})
# Validate that the data is a list
if not isinstance(alien_vault_data, list) or not isinstance(vt_data, list):
log_error(f"Invalid data type received: {type(alien_vault_data)} or {type(vt_data)}.")
raise DataError("Unexpected data type received.")
# Define the required keys for validation
required_keys = ['id']
# Validate data and raise DataError if required keys are missing or if data is corrupt
valid_data = []
for entry in alien_vault_data + vt_data:
if not isinstance(entry, dict):
log_error(f"Unexpected data type in entry: {entry}")
raise DataError("Unexpected data type received.")
# Check if the 'id' key is present and its value is of expected type
if 'id' in entry and not isinstance(entry['id'], (str, int)):
log_error(f"Corrupt data in entry: {entry}. 'id' key has invalid type.")
raise DataError("Corrupt data received.")
if any(key not in entry for key in required_keys):
log_error(f"Missing required data keys in entry: {entry}.")
raise DataError("Missing required data keys.")
valid_data.append(entry)
return valid_data
except APIError as e:
log_error(f"Failed to fetch threat data: {e}")
return []
except (KeyError, TypeError, ValueError) as e:
log_error(f"Data error occurred: {e}")
raise DataError("Corrupt data received.")PythonI find this particular implementation rather clunky at the moment, but it works. I think implementing a better structure for feed sources will make a significant improvement, but this is going on the TODO list for later.
Implementing robust error handling and log rotation features has significantly improved IntelliSIEM’s resilience. The system can now gracefully handle unexpected data scenarios, and we can track issues effectively through logs without worrying about file sizes. Next, we will turn our attention to beginning the basic UI implementation. Stay tuned as we continue to build out this powerful tool!
Leave a Reply