Code Blocks
SEC1332C: Level up your Response Actions:
Hands-on Building Splunk SOAR Apps using the SOAR App Wizard
Use the links below to get directly to the code section
- Sample API Server
- Config Details
- Sample logo
- _get_token
- init
- initialize
- _make_rest_call
- _handle_test_connectivity
- _handle_getusers
- _handle_getdevices
- _handle_containdevice
Sample API Server
You can access the API Server at api.splunktools.net
The login credentials to make queries are:
- host: https://api.splunktools.net
- username: name
- password: password
- Verify: False
Check out the API Docs generated by FastAPI here.
Sample logo
You can use this logo for your test app:
Config Details
Field Name | Description | Data Type |
---|---|---|
username | username | string |
password | password | password |
host | host | string |
verify | Verify Cert | boolean |
_get_token code
def _get_token(self, action_result, from_action=False, **kwargs):
from requests.auth import HTTPBasicAuth
auth = HTTPBasicAuth(self._username, self._password)
headers = {
'Content-Type': 'application/json'
}
url = self._base_url
try:
r = requests.request(url=
"{}{}".format(url,'/auth'),
# auth=(username, password), # basic authentication
headers=headers,
verify=self._verify,
auth=auth,
method="GET",
**kwargs
)
except Exception as e:
return RetVal(
action_result.set_status(
phantom.APP_ERROR, "Error Connecting to server. Details: {0}".format(str(e))
), resp_json
)
if r.status_code != 200:
self.debug_print("Failed to fetch a session token!")
self._session_id = None
return action_result.get_status()
self._session_id = r.text.replace('"', '')
self.save_state(self._state)
return phantom.APP_SUCCESS
init code
def __init__(self):
# Call the BaseConnectors init first
super(AppNameConnector, self).__init__()
self._state = None
# Variable to hold a base_url in case the app makes REST calls
# Do note that the app json defines the asset config, so please
# modify this as you deem fit.
self._base_url = None
self._session_id = None
self._username = None
self._password = None
self._verify = False
initialize code
def initialize(self):
# Load the state in initialize, use it to store data
# that needs to be accessed across actions
self._state = self.load_state()
# get the asset config
config = self.get_config()
"""
# Access values in asset config by the name
# Required values can be accessed directly
required_config_name = config['required_config_name']
# Optional values should use the .get() function
optional_config_name = config.get('optional_config_name')
"""
self._base_url = config.get('host')
self._username = config.get('username')
self._password = config.get('password')
self._verfiy = config.get('verify')
return phantom.APP_SUCCESS
_make_rest_call code
def _make_rest_call(self, endpoint, action_result, method="GET", **kwargs):
# **kwargs can be any additional parameters that requests.request accepts
config = self.get_config()
resp_json = None
# Create a URL to connect to
url = self._base_url
headers = {}
if not self._session_id:
ret_val = self._get_token(action_result)
if phantom.is_fail(ret_val):
return action_result.get_status(), None
headers.update({
'api-key': str(self._session_id),
'Content-Type': 'application/json'
})
try:
r = requests.request(url=
"{}{}".format(url,endpoint),
# auth=(username, password), # basic authentication
headers=headers,
verify=self._verify,
method=method,
#**kwargs
)
except Exception as e:
return RetVal(
action_result.set_status(
phantom.APP_ERROR, "Error Connecting to server. Details: {0}".format(str(e))
), resp_json
)
return self._process_response(r, action_result)
_handle_test_connectivity code
def _handle_test_connectivity(self, param):
# Add an action result object to self (BaseConnector) to represent the action for this param
action_result = self.add_action_result(ActionResult(dict(param)))
# NOTE: test connectivity does _NOT_ take any parameters
# i.e. the param dictionary passed to this handler will be empty.
# Also typically it does not add any data into an action_result either.
# The status and progress messages are more important.
self.save_progress("Connecting to endpoint")
# make rest call
ret_val, response = self._make_rest_call(
'/test', action_result, params=None, headers=None
)
if phantom.is_fail(ret_val):
# the call to the 3rd party device or service failed, action result should contain all the error details
# for now the return is commented out, but after implementation, return from here
self.save_progress("Test Connectivity Failed.")
# return action_result.get_status()
# Return success
self.save_progress("Test Connectivity Passed")
return action_result.set_status(phantom.APP_SUCCESS)
# For now return Error with a message, in case of success we don't set the message, but use the summary
#return action_result.set_status(phantom.APP_ERROR, "Action not yet implemented")
_handle_getusers code
def _handle_getusers(self, param):
# Implement the handler here
# use self.save_progress(...) to send progress messages back to the platform
self.save_progress("In action handler for: {0}".format(self.get_action_identifier()))
# Add an action result object to self (BaseConnector) to represent the action for this param
action_result = self.add_action_result(ActionResult(dict(param)))
# Access action parameters passed in the 'param' dictionary
# Required values can be accessed directly
# required_parameter = param['required_parameter']
# Optional values should use the .get() function
# optional_parameter = param.get('optional_parameter', 'default_value')
# make rest call
ret_val, response = self._make_rest_call(
'/getusers', action_result, params=None, headers=None
)
if phantom.is_fail(ret_val):
# the call to the 3rd party device or service failed, action result should contain all the error details
# for now the return is commented out, but after implementation, return from here
# return action_result.get_status()
pass
# Now post process the data, uncomment code as you deem fit
# Add the response into the data section
data=response["data"]
number_of_results=response["number_of_results"]
num_pages=response["num_pages"]
results={'data':[]}
for i in range(int(num_pages)):
uri = '/getusers?page={}'.format(i)
ret_val, response = self._make_rest_call(
uri, action_result, params=None, headers=None
)
data=response["data"]
for item in data:
results['data'].append(item)
action_result.add_data(results)
# Add a dictionary that is made up of the most important values from data into the summary
# summary = action_result.update_summary({})
# summary['num_data'] = len(action_result['data'])
# Return success, no need to set the message, only the status
# BaseConnector will create a textual message based off of the summary dictionary
return action_result.set_status(phantom.APP_SUCCESS)
# For now return Error with a message, in case of success we don't set the message, but use the summary
#return action_result.set_status(phantom.APP_ERROR, "Action not yet implemented")
_handle_getdevices code
def _handle_getdevices(self, param):
# Implement the handler here
# use self.save_progress(...) to send progress messages back to the platform
self.save_progress("In action handler for: {0}".format(self.get_action_identifier()))
# Add an action result object to self (BaseConnector) to represent the action for this param
action_result = self.add_action_result(ActionResult(dict(param)))
# Access action parameters passed in the 'param' dictionary
# Required values can be accessed directly
# required_parameter = param['required_parameter']
# Optional values should use the .get() function
status = param.get('status', None)
if status:
status_query="&status={}".format(status)
ret_val, response = self._make_rest_call(
'/getdevices?{}'.format(status_query), action_result, params=None, headers=None
)
else:
status_query=None
ret_val, response = self._make_rest_call(
'/getdevices', action_result, params=None, headers=None
)
# make rest call
if phantom.is_fail(ret_val):
# the call to the 3rd party device or service failed, action result should contain all the error details
# for now the return is commented out, but after implementation, return from here
# return action_result.get_status()
pass
# Now post process the data, uncomment code as you deem fit
data=response["data"]
number_of_results=response["number_of_results"]
num_pages=response["num_pages"]
results={'data':[]}
if status_query:
data=response["data"]
for item in data:
results['data'].append(item)
else:
for i in range(int(num_pages)):
uri = '/getdevices?page={}'.format(i)
ret_val, response = self._make_rest_call(
uri, action_result, params=None, headers=None
)
data=response["data"]
for item in data:
results['data'].append(item)
action_result.add_data(results)
# Add the response into the data section
# Add a dictionary that is made up of the most important values from data into the summary
# summary = action_result.update_summary({})
# summary['num_data'] = len(action_result['data'])
# Return success, no need to set the message, only the status
# BaseConnector will create a textual message based off of the summary dictionary
return action_result.set_status(phantom.APP_SUCCESS)
# For now return Error with a message, in case of success we don't set the message, but use the summary
#return action_result.set_status(phantom.APP_ERROR, "Action not yet implemented")
_handle_containdevice code
def _handle_containdevice(self, param):
# Implement the handler here
# use self.save_progress(...) to send progress messages back to the platform
self.save_progress("In action handler for: {0}".format(self.get_action_identifier()))
# Add an action result object to self (BaseConnector) to represent the action for this param
action_result = self.add_action_result(ActionResult(dict(param)))
# Access action parameters passed in the 'param' dictionary
# Required values can be accessed directly
hostname = param['hostname']
# Optional values should use the .get() function
# optional_parameter = param.get('optional_parameter', 'default_value')
# make rest call
ret_val, response = self._make_rest_call(
'/containdevice', action_result, params=None, headers=None
)
if phantom.is_fail(ret_val):
# the call to the 3rd party device or service failed, action result should contain all the error details
# for now the return is commented out, but after implementation, return from here
# return action_result.get_status()
pass
# Now post process the data, uncomment code as you deem fit
# Add the response into the data section
action_result.add_data(response)
# Add a dictionary that is made up of the most important values from data into the summary
# summary = action_result.update_summary({})
# summary['num_data'] = len(action_result['data'])
# Return success, no need to set the message, only the status
# BaseConnector will create a textual message based off of the summary dictionary
# return action_result.set_status(phantom.APP_SUCCESS)
# For now return Error with a message, in case of success we don't set the message, but use the summary
return action_result.set_status(phantom.APP_ERROR, "Action not yet implemented")