Part 1 of this series covered the initial tool setup, and testing of the environment.
In Part 2, we’re going to dive into structure, tests, and API connections.
In this post, I’m excited to dive into the development of IntelliSIEM’s data collection module. This step is crucial as it lays the foundation for how we will gather, process, and analyze threat data from various sources.
To see this project in its current state, head over to my repo and check it out: IntelliSIEM on GitHub
Step 1: Project Structure and Setup
To ensure the project remains organized and scalable, I began by designing a clear directory structure with separate folders for configuration, data, source code, and tests:
IntelliSIEM/
├── LICENSE
├── README.md
├── config/
│ └── config_template.yaml
├── data/
├── src/
│ ├── __init__.py
│ ├── data_collection.py
│ ├── error_handling.py
│ └── api_client.py
├── tests/
│ ├── __init__.py
│ └── test_data_collection.py
└── requirements.txtWith this design in mind, I created the config, data, src, and tests subdirectories in the project directory. In the project settings in PyCharm, I also marked src as “Sources”, tests as “Tests”, and config and data as “Resources”. I then added __init__.py files to the src/ and tests/ directories to make them Python packages, ensuring smooth module imports. Finally, I added requirements.txt so that end users can easily import the libraries necessary to run the application.
mkdir config
mkdir data
mkdir src
mkdir tests
touch src/__init__.py
touch tests/__init__.py
touch requirements.txtStep 2: README and License
Before pushing any code to GitHub, I documented the project setup and usage in the README.md file and applied the Apache License 2.0 to protect the code and clarify usage rights. Applying the license involves creating the LICENSE file, populating it with the entire license text, and creating a stub placed at the top of every code file. You’ll see that stub as we move through the project. To see the full text of either, check out the full project in the GitHub repo.
Step 3: Credential Handling
Sensitive information like API keys should never be hard-coded. For this early stage of development, I created a config/config_template.yaml file with the basic syntax that we’ll build on later to implement a more secure way of handling the keys:
api_keys:
alient_vault: "YOUR_ALIEN_VAULT_API_KEY"
virus_total: "YOUR_VIRUS_TOTAL_API_KEY"To prevent the exposure of our own keys, we’ll first add an entry to .gitignore for config/config.yaml. As a reminder, the .gitignore should now look something like this:
.venv/
.idea/
__pychache__/
/config/config.yamlThen, copy the template to config.yaml and fill in our keys. This will ensure that version control ignores the file to prevent accidental exposure.
cp config/config_template.yaml config/config.yamlStep 4: Implementing Test-Driven Development (TDD)
OK. I can already hear the groans, “Why are we wasting time on this?” or “TDD is too hard!”. Yeah, I get it, but the crux of the matter is that test-driven development helps catch issues early. I wrote some initial test cases using pytest and pytest-mock for the data collection module. First things first, we need to make sure we’ve added pytest to our project. Let’s utilize our newly created requirements.txt file to make sure we have everything we need so far. Open it up in the editor and it should look like this:
requests
pandas
matplotlib
seaborn
fpdf
reportlab
json5
pyyaml
pytest
pytest-mock
beautifulsoup4
lxml
sqlalchemySave that and open the console again and we’ll test it:
pip install -r requirements.txtNow that we have all our libraries installed, we’re going to create our first tests. Remember, in TDD we always expect the tests to fail first. Here’s a sample of the tests I wrote in tests/test_data_collection.py:
# IntelliSIEM Copyright 2024, Rob Perry
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from src.data_collection import fetch_threat_data, APIError
def test_fetch_threat_data_success(mocker):
"""
Test fetching threat data with valid API keys.
"""
mocker.patch('src.api_client.APIClient.get_data', return_value=[{'id': 'test'}])
threats = fetch_threat_data()
assert len(threats) > 0
def test_fetch_threat_data_failure(mocker):
"""
Test fetching threat data with invalid API keys.
"""
# Mock API client responses to raise an error
mocker.patch('src.api_client.APIClient.get_data', side_effect=APIError("API Error"))
# Expect the APIError exception to be raised and handle it in the function
threats = fetch_threat_data()
assert threats == [] # Expecting an empty list as a fallback
Now, as I said, we anticipate that this is going to fail and we can verify that by running our first tests.
pytest /testsCongratulations, you’ve failed your first test! Now, we can move on to the implementation and start passing. To keep this brief, I’m going to assume that you are consistently running your tests every time you make a change until all your tests pass, so I won’t keep covering that aspect repeatedly.
Step 5: Implementing the Data Collection Module
The data collection module is the heart of IntelliSIEM. It fetches data from various threat intelligence APIs. Following our tests logically, we’re first importing from src.data_collection, but we haven’t created that file yet. Let’s do that now in src/data_collection.py:
# IntelliSIEM Copyright 2024, Rob Perry
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from src.api_client import APIClient
from src.error_handling import APIError, log_error
from src.config import load_config
config = load_config()
def fetch_threat_data():
"""
Fetch threat data from various sources and return the combined result.
:return: (list) A list of threat intelligence data.
"""
try:
alien_vault = APIClient("https://otx.alienvault.com/api/v1",
{"X-OTX-API-KEY": config['api_keys']['alien_vault']})
vt = APIClient("https://www.virustotal.com/api/v3", {"x-apikey": config['api_keys']['virus_total']})
alien_vault_data = alien_vault.get_data("indicators/export")
vt_data = vt.get_data("files", params={"limit": 10})
return alien_vault_data + vt_data
except APIError as e:
log_error(f"Failed to fetch threat data: {e}")
return []
Continuing logically, we need to implement the APIClient class next. The APIClient class manages the API requests and provides error handling. We’ll build more robust error handling as we build better tests throughout the project. Here is the current src/api_client.py:
# IntelliSIEM Copyright 2024, Rob Perry
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import requests
from src.error_handling import APIError, log_error
class APIClient:
"""
A client for interacting with various threat intelligence APIs.
"""
def __init__(self, base_url, headers):
"""
Initialize the API client with the base URL and headers.
:param base_url: (str) The base URL of the API.
:param headers: (dict) Headers required for API requests.
"""
self.base_url = base_url
self.headers = headers
def get_data(self, endpoint, params=None):
"""
Fetch data from the specified endpoint.
:param endpoint: (str) The API endpoint to fetch data from.
:param params: (dict) Optional query parameters.
:return: (dict) JSON response from the API.
:raises APIError: If there is an error with the API request.
"""
url = f"{self.base_url}/{endpoint}"
try:
response = requests.get(url, headers=self.headers, params=params, timeout=10)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
log_error(f"API request failed: {e}")
raise APIError(f"API request failed: {e}")
I’m going to touch on error handling in Step 6, so before we move on to that, let’s ensure that we’ve implementing loading the API keys from the config.yaml file. We take care of that in config.py:
# IntelliSIEM Copyright 2024, Rob Perry
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import yaml
def load_config(config_file='config/config.yaml'):
"""
Load configuration from the specified YAML file.
:param config_file: The path to the YAML configuration file.
:return: dict: Configuration data.
"""
with open(config_file, 'r') as file:
return yaml.safe_load(file)
Step 6: Error Handling and Logging
To ensure smooth operation and easier debugging, I implemented a custom APIError class and a logging mechanism. Is it robust? Well, not really. As we work on really making this module more robust, we’ll improve this class and enhance the error handling and reporting. For now, we’re really interested in logging errors so that we can make sure we handle them gracefully when we reach a release-ready version. Here is the current src/error_handling.py:
# IntelliSIEM Copyright 2024, Rob Perry
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
logging.basicConfig(level=logging.ERROR, filename='data/error.log', format='%(asctime)s %(levelname)s: %(message)s')
class APIError(Exception):
"""
Custom exception for API errors.
"""
pass
def log_error(message):
"""
Log error messages to a file.
:param message: (str) The error message to log.
"""
logging.error(message)This helps track any issues that arise during API calls or data processing.
Hopefully, all your tests have passed. We have the basic implementation of the data collector, but currently, our tests only cover the basic success and failure states of the collector. In the next post, we will improve and enhance our tests. Once we have robust tests in place, we’ll move into better error handling and then revisit the API key handling.
Leave a Reply