Skip to main content

Whether you are building detections or threat hunting, being able to take event logs, aggregate them and generate a statistical calculation is an important capability to have. Today’s blog will provide an overview of some new statistical functions that can be used in both rules and search. In fact, in subsequent blogs, we will expand this thread and examine additional functions while providing examples to get you started with these new functions!


Today, we are going to use a set of functions to calculate the average, median, mode, standard deviation and variance for a set of events. For readers of this blog, you may recall that when we first introduced statistical search, functions for average and standard deviation were available in search, but not available for use in rules. That is no longer the case!


Before diving into these functions, let’s walk through the basics of the search we will use. Our search is identifying TCP network connections occurring within a specific netblock. We are using the net.ip_in_range_cidr function to isolate our search on the netblock (though we could also have used a reference list). We only want events that have network.sent_bytes greater than zero. 


Because I am going to aggregate events by hour and the IP address pair, I created variables for both the principal and target IP fields as well as the time window. The time window needs to include the hour of the day, as well as the date, mainly because if we run this search across multiple days, we don’t want the hours across days to be aggregated together. Well, I don’t want that for this search, but perhaps in a different case, we might. Anyway, to generate this time window placeholder variable, I used the timestamp functions timestamp.get_date and timestamp.get_hour in conjunction with the strings.concat function to create a placeholder variable with some nice formatting. That’s my filtering statement. 


BTW, there is more goodness that I can’t wait to share about working with timestamps, but that will have to wait for another day!


metadata.event_type = "NETWORK_CONNECTION"
net.ip_in_range_cidr(principal.ip, "10.128.0.0/24")
net.ip_in_range_cidr(target.ip, "10.128.0.0/24")
network.sent_bytes > 0
metadata.product_event_type = "conn"
network.ip_protocol = "TCP"
principal.ip = $pip
target.ip = $tip
$time_window = strings.concat(timestamp.get_date(metadata.event_timestamp.seconds), " ", timestamp.get_hour(metadata.event_timestamp.seconds), ":00")

match:
  $time_window, $pip, $tip


outcome:
  $event_count = count_distinct(metadata.id)
  $sent_bytes = sum(network.sent_bytes)

The match section aggregates the events based on the values specified, so all events with the same time window, principal.ip and target.ip will be aggregated. In the outcome section, we are creating a count of the number of events and a sum of the field network.sent_bytes. Here are our results.



These statistical functions require numeric fields to work with, and now that we have created a sum of the network.sent_bytes, we can do just that. The five functions we are going to cover today are:



  • windows.avg

  • windows.median

  • windows.mode

  • windows.stddev

  • windows.variance


These measures should be fairly explanatory, but if you need a quick refresher, here’s a video that provides a nice explanation of these statistical measures. 


Functions for mode, standard deviation and variance do not have additional arguments beyond specifying the field that is being calculated. For windows.avg and windows.median, both have a boolean second argument that allows you to determine if zero values should be factored into the average and median calculations.


In our initial search, we specified that all events had to have a byte value greater than zero so we don’t really need to worry about these two different calculations in our outcome section. However, if we had not specified this in our filtering statement, we could generate two different outcome variables, both with and without zero values, simply by flipping the second argument, like this:


$median_ignore_zero_values = window.median(network.sent_bytes, true)
$median_include_zero_values = window.median(network.sent_bytes, false)

Let’s add the functions to the outcome section of our search. Notice I’ve added the function math.round to the average, standard deviation and variance and rounded each to two decimal places. This can be done by nesting the functions. math.round is currently only available in search, but in future posts, we will discuss additional options available to us.


$avg_sent_bytes = math.round(window.avg(network.sent_bytes),2)
$median_sent_bytes = window.median(network.sent_bytes, true)
$mode_sent_bytes = window.mode(network.sent_bytes)
$std_sent_bytes = math.round(window.stddev(network.sent_bytes),2)
$variance_sent_bytes = math.round(window.variance(network.sent_bytes))

Now, you might be thinking that whenever we aggregated events in the outcome section, we needed an aggregation function like array_distinct or count or max or sum. Here’s the good news, these window functions perform aggregation themselves and don’t require additional aggregation functions prepended to them in the outcome section.


With that, here is the full search followed by a subset of the results.


metadata.event_type = "NETWORK_CONNECTION"
net.ip_in_range_cidr(principal.ip, "10.128.0.0/24")
net.ip_in_range_cidr(target.ip, "10.128.0.0/24")
network.sent_bytes > 0
metadata.product_event_type = "conn"
network.ip_protocol = "TCP"
principal.ip = $pip
target.ip = $tip
$time_window = strings.concat(timestamp.get_date(metadata.event_timestamp.seconds), " ", timestamp.get_hour(metadata.event_timestamp.seconds), ":00")

match:
 $time_window, $pip, $tip

outcome:
 $event_count = count_distinct(metadata.id)
 $sent_bytes = sum(network.sent_bytes)
 $avg_sent_bytes = math.round(window.avg(network.sent_bytes),2)
 $median_sent_bytes = window.median(network.sent_bytes, true)
 $mode_sent_bytes = window.mode(network.sent_bytes)
 $std_sent_bytes = math.round(window.stddev(network.sent_bytes),2)
 $variance_sent_bytes = math.round(window.variance(network.sent_bytes))

order: $time_window desc


We can see our result set sorted by the date/hour combination and the IP addresses, followed by the count, sum of the sent bytes and the average, median, mode, standard deviation and variance. Due to the nature of my data set being generated in a test lab, the values tend to be a bit all over the place, hence the high standard deviation and variance, but the concepts apply and can be useful when building a search during a hunt or for a detection.


Speaking of detections, let’s take a look at how we can take what we’ve built and transform it into a rule.


The following rule is based on the exact same criteria that we used in our search. There are a few formatting changes I made to fit it into the rule structure.



  • Added a meta section to the rule where there was no meta section in the search

  • Added the events section label and the event variable of $net to all the UDM fields in what was the filtering statement in the search; that is, all the criteria before the match section

  • Added a time window to the match section

  • Added the event variable, $net to the UDM fields in the outcome section

  • Added a condition section to the rule

  • Removed the order section from the search


rule statistical_search_example_with_threshold {
meta:
author = "Google Cloud Security"
description = "Calculate statistics based on the date/hour and IP address pairs on the network.sent_bytes field"
severity = "Low"

events:
$net.metadata.event_type = "NETWORK_CONNECTION"
net.ip_in_range_cidr($net.principal.ip, "10.128.0.0/24")
net.ip_in_range_cidr($net.target.ip, "10.128.0.0/24")
$net.network.sent_bytes > 0
$net.metadata.product_event_type = "conn"
$net.network.ip_protocol = "TCP"
$net.principal.ip = $pip
$net.target.ip = $tip
$time_window = strings.concat(timestamp.get_date($net.metadata.event_timestamp.seconds), " ", timestamp.get_hour($net.metadata.event_timestamp.seconds), ":00")

match:
$time_window, $pip, $tip over 1h

outcome:
$event_count = count_distinct($net.metadata.id)
$sent_bytes = sum($net.network.sent_bytes)
$avg_sent_bytes = window.avg($net.network.sent_bytes)
$median_sent_bytes = window.median($net.network.sent_bytes, true)
$mode_sent_bytes = window.mode($net.network.sent_bytes)
$std_sent_bytes = window.stddev($net.network.sent_bytes)
$variance_sent_bytes = window.variance($net.network.sent_bytes)

//order: $time_window desc //we don't use order in rules

condition:
$net and $event_count > 30
}

We could add additional criteria to the condition section based on the value of the standard deviation or any other field for that matter to further narrow the detections returned.



One more thing, if we wanted to calculate a z-score, we could do this in both the rule and the search by appending one more line to the outcome section. Here is what a z-score calculation might look like based on what we’ve already written:


$zscore = ($sent_bytes - $avg_sent_bytes) / $std_sent_bytes

While we’ve reached the end of this blog, there will be more to come with additional functions along with more examples of both searches and rules that can be used as a starting point as you deploy these capabilities in your Google SecOps instance!

Great Insights on the secops workflow 👌


Reply