Previous entries in this series covered the initial tool setup and testing of the environment, establishing the structure of the project, initial tests, and connecting to APIs. In this third entry, we’re going to enhance our testing approach through a Test-Driven Development (TDD) methodology to build a more robust and reliable system.
Initially, our tests covered basic scenarios but didn’t fully account for real-world challenges like invalid configurations, API rate limits, and malformed data. We will expand our test cases significantly to address this, focusing on edge cases and error handling.
To see this project in its current state, head over to my repo and check it out: IntelliSIEM on GitHub
Enhancing Our Test Scenarios
In our initial design, we tested for success and failure of threat data collection from the APIs, but what about other potential issues? Let’s expand out our testing to ensure that we’re handling other potential data issues such as an empty or corrupt response. Here’s the new tests/test_data_collection.py:
# IntelliSIEM Copyright 2024, Rob Perry
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# tests/test_data_collection.py
import pytest
from src.data_collection import fetch_threat_data, APIError
def test_fetch_threat_data_success(mocker):
"""
Test fetching threat data with valid API keys.
"""
mocker.patch('src.api_client.APIClient.get_data', return_value=[{'id': 'test'}])
threats = fetch_threat_data()
assert len(threats) > 0
def test_fetch_threat_data_failure(mocker):
"""
Test fetching threat data with invalid API keys.
"""
# Mock API client responses to raise an error
mocker.patch('src.api_client.APIClient.get_data', side_effect=APIError("API Error"))
# Expect the APIError exception to be raised and handle it in the function
threats = fetch_threat_data()
assert threats == [] # Expecting an empty list as a fallback
def test_fetch_threat_data_empty_response(mocker):
"""
Test fetching threat data with an empty response.
"""
mocker.patch('src.api_client.APIClient.get_data', return_value=[])
threats = fetch_threat_data()
assert len(threats) == 0 # Expecting empty list when no data is returned
def test_fetch_threat_data_corrupt_data(mocker):
"""
Test fetching threat data with corrupt response data.
"""
mocker.patch('src.api_client.APIClient.get_data', return_value=[{'malformed': 'data'}])
threats = fetch_threat_data()
assert threats == [] # Expecting empty list on data parsing failure
In the same vein, we also need to make sure we can handle things like a missing or invalid config file, so we’ll add a new test file tests/test_config.py:
# IntelliSIEM Copyright 2024, Rob Perry
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
from src.config import load_config
from src.error_handling import APIError
def test_load_config_missing_file():
"""
Test loading configuration with a missing file.
"""
with pytest.raises(FileNotFoundError):
load_config('config/non_existent_config.yaml')
def test_load_config_invalid_format(mocker):
"""
Test loading configuration file with an invalid format.
"""
mocker.patch('builtins.open', mocker.mock_open(read_data="invalid_yaml"))
with pytest.raises(APIError):
load_config()
APIs can be unpredictable sometimes. We should also make sure we can handle other potential API issues that come up, like timeouts, rate limiting, and invalid responses, so we’ll also add a new tests/test_api_client.py:
# IntelliSIEM Copyright 2024, Rob Perry
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
import requests
from src.api_client import APIClient, APIError
def test_api_client_timeout(mocker):
"""
Test API client handling of a timeout error.
"""
mocker.patch('requests.get', side_effect=requests.exceptions.Timeout)
client = APIClient("https://example.com/api", {"Authorization": "Bearer token"})
# Match a more general pattern, ignoring the specific exception message.
with pytest.raises(APIError, match="API request timed out"):
client.get_data("endpoint")
def test_api_client_invalid_response(mocker):
"""
Test API client handling of invalid JSON response.
"""
mocker.patch('requests.get', return_value=mocker.Mock(status_code=200, json=mocker.Mock(side_effect=ValueError)))
client = APIClient("https://example.com/api", {"Authorization": "Bearer token"})
with pytest.raises(APIError, match="Invalid response format."):
client.get_data("endpoint")
def test_api_client_rate_limit(mocker):
"""
Test API client handling of rate limiting.
"""
mock_response = mocker.Mock(status_code=429, headers={"Retry-After": "5"})
mocker.patch('requests.get', return_value=mock_response)
client = APIClient("https://example.com/api", {"Authorization": "Bearer token"})
with pytest.raises(APIError, match="Rate limit exceeded."):
client.get_data("endpoint")Time To Retest Our Code
Now that we’ve got our shiny new tests in place, it’s time to see how well we did.
pytest /testsI hope you like the color red because there’s quite a bit of it. The good news is that because we’re letting our tests determine how we implement, all these error messages will point us in exactly the right direction to help ensure our error handling and function implementations are as correct as we can make them.
Some of you, depending on which version of the project you’re on, may notice an error relating to the failure to load APIError from the config.py file. Don’t worry; we’ll fix that below as we work through the other issues.
Note: the snipped output below does not reflect the actual color you may see in your terminal. My preferred dev environment generally uses the various JetBrains tools on Fedora, so my error output is bright red. The plugin I use for this site uses the default VSCode color scheme.
=========================================================================================================================== short test summary info ============================================================================================================================
FAILED tests/test_api_client.py::test_api_client_timeout - AssertionError: Regex pattern did not match.
FAILED tests/test_api_client.py::test_api_client_invalid_response - ValueError
FAILED tests/test_api_client.py::test_api_client_rate_limit - Failed: DID NOT RAISE <class 'src.error_handling.APIError'>
FAILED tests/test_config.py::test_load_config_invalid_format - Failed: DID NOT RAISE <class 'src.error_handling.APIError'>
FAILED tests/test_data_collection.py::test_fetch_threat_data_corrupt_data - AssertionError: assert [{'malformed'...med': 'data'}] == []
========================================================================================================================= 5 failed, 4 passed in 0.33s ==========================================================================================================================
Remember, we implemented the tests before we actually implemented the code to handle those situations! Let’s walk through each of these in order and fix them. Note that I will only provide the changes to implement for each file until all changes have been implemented, then I will show the full final version.
Failure 1: test_api_client_timeout – AssertionError
The regex pattern for the raised APIError did not match the actual error message. The simple fact is, we haven’t implemented a specific error handler for this yet. Here’s the specific test we’re looking at:
def test_api_client_timeout(mocker):
"""
Test API client handling of a timeout error.
"""
mocker.patch('requests.get', side_effect=requests.exceptions.Timeout)
client = APIClient("https://example.com/api", {"Authorization": "Bearer token"})
# Match a more general pattern, ignoring the specific exception message.
with pytest.raises(APIError, match="API request timed out"):
client.get_data("endpoint")We need to modify the src/api_client.py get_data method to raise a specific error message for a timeout error:
except requests.exceptions.Timeout:
log_error("API request timed out")
raise APIError("API request timed out")If we re-run our tests again, there should be one less to worry about.
Failure 2: test_api_client_invalid_response – ValueError
This test looks for JSON parsing errors, but the current implementation doesn’t specifically catch them:
def test_api_client_invalid_response(mocker):
"""
Test API client handling of invalid JSON response.
"""
mocker.patch('requests.get', return_value=mocker.Mock(status_code=200, json=mocker.Mock(side_effect=ValueError)))
client = APIClient("https://example.com/api", {"Authorization": "Bearer token"})
with pytest.raises(APIError, match="Invalid response format."):
client.get_data("endpoint")Still working in api_client.py, we should catch this exception and raise an appropriate APIError.
except ValueError:
log_error("Invalid JSON response format from API")
raise APIError("Invalid response format.") # New error for invalid JSON responses
Failure 3: test_api_client_rate_limit – Failed: DID NOT RAISE
This message is a bit different from the previous two. When our account is rate-limited by the remote service, we should get a specific status code that triggers an error for us to handle. This test case expects an APIError to be raised, but the current implementation doesn’t specifically handle the 429 status code.
def test_api_client_rate_limit(mocker):
"""
Test API client handling of rate limiting.
"""
mock_response = mocker.Mock(status_code=429, headers={"Retry-After": "5"})
mocker.patch('requests.get', return_value=mock_response)
client = APIClient("https://example.com/api", {"Authorization": "Bearer token"})
with pytest.raises(APIError, match="Rate limit exceeded."):
client.get_data("endpoint")Let’s go ahead and make sure we handle this case. For now, we want to raise the exception. Later, we may wish to consider refactoring this code to address the situation differently. This is the last error specific to the api_client.py file, so here’s the final code to date:
# IntelliSIEM Copyright 2024, Rob Perry
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import requests
from src.error_handling import APIError, log_error
class APIClient:
"""
A client for interacting with various threat intelligence APIs.
"""
def __init__(self, base_url, headers):
"""
Initialize the API client with the base URL and headers.
:param base_url: (str) The base URL of the API.
:param headers: (dict) Headers required for API requests.
"""
self.base_url = base_url
self.headers = headers
def get_data(self, endpoint, params=None):
"""
Fetch data from the specified endpoint.
:param endpoint: (str) The API endpoint to fetch data from.
:param params: (dict) Optional query parameters.
:return: (dict) JSON response from the API.
:raises APIError: If there is an error with the API request.
"""
url = f"{self.base_url}/{endpoint}"
try:
response = requests.get(url, headers=self.headers, params=params, timeout=10)
if response.status_code == 429:
retry_after = response.headers.get("Retry-After", "unknown time")
log_error(f"Rate limit exceeded. Retry after: {retry_after}")
raise APIError("Rate limit exceeded.")
response.raise_for_status()
return response.json()
except requests.exceptions.Timeout:
log_error("API request timed out")
raise APIError("API request timed out")
except requests.exceptions.RequestException as e:
log_error(f"API request failed: {e}")
raise APIError(f"API request failed: {e}")
except ValueError:
log_error("Invalid JSON response format from API")
raise APIError("Invalid response format.") # New error for invalid JSON responses
Failure 4: test_load_config_invalid_format – Failed: DID NOT RAISE
The test_load_config_invalid_format test case expects an APIError, but the current load_config function doesn’t raise it.
def test_load_config_invalid_format(mocker):
"""
Test loading configuration file with an invalid format.
"""
mocker.patch('builtins.open', mocker.mock_open(read_data="invalid_yaml"))
with pytest.raises(APIError):
load_config()Let’s go ahead and update load_config() to raise APIError for parsing issues. Now, is this truly an API error? Nope. Should there be a separate class of errors relating to the application configuration? Probably. We’ll add that to our TODO list, and circle back on the next post when we take a careful look at our error handling strategy. For now, this is the only failure related to config.py, so here’s the current file:
# IntelliSIEM Copyright 2024, Rob Perry
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import yaml
from src.error_handling import log_error, APIError
def load_config(config_file='config/config.yaml'):
"""
Load configuration from the specified YAML file.
:param config_file: (str) The path to the YAML configuration file.
:return: dict: Configuration data.
:raises APIError: If there is an error with the configuration file.
"""
try:
with open(config_file, 'r') as file:
return yaml.safe_load(file)
except yaml.YAMLError as e:
log_error(f"YAML parsing error while loading configuration: {e}")
raise APIError(f"Configuration file error: {e}")
except FileNotFoundError:
log_error(f"Configuration file not found: {config_file}")
raise FileNotFoundError(f"Configuration file not found: {config_file}")
Now, interestingly, with this particular case there is also an issue with the test itself where mocker isn’t providing test data quite correctly. To resolve this, we’re also going to update our test_config.py file to be more specific with our test:
# IntelliSIEM Copyright 2024, Rob Perry
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
from src.config import load_config, APIError
def test_load_config_missing_file():
"""
Test loading configuration with a missing file.
"""
with pytest.raises(FileNotFoundError):
load_config('config/non_existent_config.yaml')
def test_load_config_invalid_format(mocker):
"""
Test loading configuration file with an invalid format.
"""
invalid_yaml = """
api_keys:
alien_vault: !!invalid
"""
mocker.patch('builtins.open', mocker.mock_open(read_data=invalid_yaml))
with pytest.raises(APIError, match="Configuration file error"):
load_config()Failure 5: test_fetch_threat_data_corrupt_data – AssertionError
OK. Last test failure. In this case, another Assertion Error. The test_fetch_threat_data_corrupt_data test case expects an empty list for malformed data, but currently, the function does not filter it:
def test_fetch_threat_data_corrupt_data(mocker):
"""
Test fetching threat data with corrupt response data.
"""
mocker.patch('src.api_client.APIClient.get_data', return_value=[{'malformed': 'data'}])
threats = fetch_threat_data()
assert threats == [] # Expecting empty list on data parsing failureWe need to update the fetch_threat_data function to handle malformed data. This is the only edit to data_collection.py, so here’s the full code:
# IntelliSIEM Copyright 2024, Rob Perry
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from src.api_client import APIClient
from src.error_handling import APIError, log_error
from src.config import load_config
config = load_config()
def fetch_threat_data():
"""
Fetch threat data from various sources and return the combined result.
:return: (list) A list of threat intelligence data.
"""
try:
alien_vault = APIClient("https://otx.alienvault.com/api/v1",
{"X-OTX-API-KEY": config['api_keys']['alien_vault']})
vt = APIClient("https://www.virustotal.com/api/v3", {"x-apikey": config['api_keys']['virus_total']})
alien_vault_data = alien_vault.get_data("indicators/export")
vt_data = vt.get_data("files", params={"limit": 10})
# Filter out malformed data
valid_data = [entry for entry in alien_vault_data + vt_data if 'id' in entry]
return valid_data
except APIError as e:
log_error(f"Failed to fetch threat data: {e}")
return [] # Return an empty list on failureWith that done, all tests should pass once again!
These additional tests help make IntelliSIEM more resilient, reliable, and capable of handling unexpected situations gracefully. Incorporating a testing framework lays the groundwork for future development, helping ensure that we catch issues early and maintain a high level of code quality. Next up, we’ll dive in deeper and make sure we have a robust error-handling structure in place.
Leave a Reply