This is a first of a series of articles exploring the use of Python to search Splunk and add levels of automation to the resulting data analysis. Splunk not only provides a RESTful API for web services, it also provides an SDK–meaning that implementation components are provided to make programming straight-forward. Splunk Docs also contains many good code examples to get programmers off to a running start.
Let’s start by getting our code connected to Splunk with a simple connection class. From there we will move on to installing Sysinternal’s SysMon utility and implementing simple searches of Sysmon-based logs, and then finally to some automation routines on large result sets. It may also be instructive to consider how one might implement some behavior-based detection and analysis on Sysmon datasets.
There are quite a few examples floating around of connecting to Splunk with Python using the SDK’s connect routine–including in the Splunk Docs. As simple an exercise as this is, in some cases the examples are incomplete and missing a few steps, or I otherwise could not always make the Splunk Docs code work (e.g. reading a credentials file with the code in Splunk Docs seems not functional). Hence, it was simpler just to write a classful implementation as a starting point.
Below is a connection class that works with my Anaconda Python v3 after having used the pip utility to install the SDK. For ease of development, it uses a splunkrc file with name value pairs including username, host, password. I will show how you might implement credentials in production more safely in a later entry. By the way, standard disclaimer: don’t try any of this in production until you thoroughly understand, experiment and test the code and configuration in your dev environment.
How to Use
- The first step is to install the SDK, I suggest using pip. Find the detailed instructions here.
- Install the class in a file called splunk_connect.py in the same dir as the driver code mentioned next. Otherwise edit the driver to refer to the correct module to import.
- place a .splunkrc in the same dir you are running the code from with the directions to your instance
A simple Connection Class
import splunklib.client as splunkclient
class SplunkConnection:
DEBUG = False
def __init__(self):
'create a Splunk service instance and log in'
self.service = ""
self.kwargs = self.get_credentials_from_rc()
if self.DEBUG:
print (self.kwargs)
# connect requires keywords not positional args
self.service = splunkclient.connect(**self.kwargs)
if self.DEBUG:
print ('service: ' , self.service)
def get_credentials_from_rc(self):
'read connection parameters from an rc file'
'''to use: create a splunkrc file that has username, host, etc:
username = somename
host = myhost
password = somepass
...
'''
kwargs = {}
with open(".splunkrc") as myfile:
for line in myfile:
name, var = line.partition("=")[::2]
# by convention python comm likes to use kwargs as the
# name of key word sweeper dictionary
kwargs[name] = var.rstrip()
return kwargs
def get_app_names(self):
'print installed apps to the console to verify login'
app_list = []
for app in self.service.apps:
app_list.append(app.name)
return app_list
Now you simply need a driver or wrapper for this connection class to test it out. Here is some simple code to get started: printing out the installed apps is a basic exercise that the Splunk Docs sites used to demonstrate successful connection.
from splunk_connect import SplunkConnection
def main():
sc = SplunkConnection()
app_name_list = sc.get_app_names()
for app_name in app_name_list:
print( app_name)
if name == 'main':
main()
This is a start, but doesn’t provide anything particularly useful as of yet. Before implementing a Splunk Search Class, let’s install a utility that will prove invaluable for generating endpoint security data for analysis: Microsoft SysInternal’s Sysmon utility is both free and incredibly useful. There are plenty of information sources on how to install and filter Sysmon . Here are two of note:
- Swift On Security Sysmon Filter
- Introduction to Sysmon on Splunk including a discussion of adding the Technical Add-on to your search-head
Review both of these sites to learn more about configuring your endpoints with Sysmon for forwarding data in a CIM-searchable manner to Splunk. The emphasis here is not to repeat a lot of material readily available on the web, but to get beyond the threshold of automating analysis on large blocks of data.
Although installing Sysmon and the Sysmon TAC are simple, I am likely understating the task of tuning Sysmon with a config file. The Swift on Security XML filter will do that, but it deserves some study and will be an area of ongoing work for an Enterprise that wants to effectively operationalize Sysmon feeds into Splunk. It’s the age-old signal in the noise challenge.
Moving forward, let’s develop a straight-forward Search class and driver so we can get access to real data.
import splunklib.results as results
class SplunkSearch:
DEBUG = True
def __init__(self,service):
'create a Splunk service instance and log in'
self.job = ''
self.jobs = service.jobs
def set_blocking_search(self,query, num_results):
'place a blocking search on jobs collection'
# num results can be 0 for no limit, or limited by num_results
if num_results == 0:
search = query
else:
search = query + ' | head ' + str(num_results)
if self.DEBUG:
print ('Query: ', search)
kwargs = {"exec_mode": "blocking"}
self.job = self.jobs.create(search, **kwargs)
def set_search(self,num_results):
pass
def get_search_results(self):
rr = results.ResultsReader(self.job.results())
return rr
Now augment the previous driver to exercise this search object. It would be simple to build a constructor that takes everything including type of search (e.g. blocking, non-blocking, number of results, index, …). In fact, with a bit of extra effort one can create a GUI interfaces in Python: this would present a Threat Hunter with a form to automate a Sysmon search and perform hash submissions to VirusTotal and/or IP analysis sites. It will also be possible to embed these types of functionality directly into Splunk so that one never needs to leave the Splunk Web Interface. For now, though, we will keep the Python code as stand-alone from the Web Interface.
from splunk_connect import SplunkConnection
from splunk_search import SplunkSearch
def main():
sc = SplunkConnection()
app_name_list = sc.get_app_names()
for app_name in app_name_list:
print( app_name)
# run a search
ss = SplunkSearch(sc.service)
# define the search in SPL
query = 'search source="WinEventLog:Microsoft-Windows-Sysmon/Operational"'
# limit to 10 results (just for demo speed) and set the search
ss.set_blocking_search(query,10)
# display the results
for result in ss.get_search_results():
print (result)
if __name__ == '__main__':
main()
Running this provides a glimpse into where these techniques can become useful: we can see real process data including parent processes, hashes, etc. If you use the -n option for starting Sysmon, you also get network information. It’s not hard to see how we can move this forward into a vehicle for everything from alert-based whitelisting to automated uploads to VirusTotal. Here is the result on my test dev system. We can work on formatting it into a nicer display and sending it to useful places for analysis in the next article.
The Jupyter Notebook and supporting classes are available on github.