Skip to main content

Hi all,

I'm currently working on a custom Chronicle parser using Logstash to handle logs in CEF format. I have already built and tested a parser for CEF logs that include key-value pairs these work correctly and generate UDM events as expected.

While parsing these non-KV logs, no UDM events or entities are generated in Chronicle is what I am getting.

Format 1 which is paring: <priority>CEF:0|vendor|product|version|signature|name|severity|key1=value1|key2=value2|…

Format 2 which is not generating UDM event and not parsing
<priority>CEF:0|vendor|product|user|user_full_name|category|action|description|status

and for the format 2 I am getting 

"No UDM events or entities were generated for the current parser configuration. If this is not intended, rectify the code snippet/UDM mappings and then click preview.”

filter {

grok {
match => {
"message" => =
"CEF:(?P<header_version>o^|]+)\\|(?P<device_vendor>o^\\|]+)\\|(?P<device_product>c^\\|]+)\\|(?P<device_version>o^\\|]+)\\|(?P<signature_id>i^\\|]+)\\|(?P<event_name>m^\\|]+)\\|(?P<severity>t^\\|]+)\\|%{GREEDYDATA:cef_event_attributes}"
]
}
overwrite => ="message"]
on_error => "not_a_valid_syslog"
}

if not_a_valid_syslog] {
drop {
tag => "TAG_MALFORMED_MESSAGE"
}
}

kv {
source => "cef_event_attributes"
field_split => "|"
value_split => "="
target => "cef_fields"
}

if cef_event_attributes] =~ "=" {

mutate {
replace => {
"cef_fields.status" => ""
"cef_fields.updatedOn" => ""
"cef_fields.createdOn" => ""
"cef_fields.src_hostname" => ""
"cef_fields.dst_hostname" => ""
}
}

mutate {
add_field => { "security_result" => "{}" }
}

if cef_fields]istatus] != "" {
if cef_fields]istatus] in s"CREATED", "IN_PROGRESS"] {
mutate {
replace => { "security_result.threat_status" => "ACTIVE" }
}
}
else if scef_fields]istatus] == "RESOLVED" {
mutate {
replace => { "security_result.threat_status" => "CLEARED" }
}
}
else {
mutate {
replace => { "security_result.threat_status" => "THREAT_STATUS_UNSPECIFIED" }
}
}
}

# Populate UDM (Unified Data Model) fields with extracted values
mutate {
replace => {
"event.idm.read_only_udm.metadata.event_type" => "GENERIC_EVENT"
"event.idm.read_only_udm.metadata.vendor_name" => "%{device_vendor}"
"security_result.description" => "%{event_name}"
}
}

mutate {
merge => {
"event.idm.read_only_udm.security_result" => "security_result"
}
}

mutate {
merge => {
"@output" => "event"
}
}
}
else {

grok {
match => {
"message" => =
"CEF:(?P<header_version>o^|]+)\\|(?P<device_vendor>o^\\|]+)\\|(?P<device_product>c^\\|]+)\\|(?P<user>e^\\|]+)\\|(?P<user_full_name>m^\\|]+)\\|(?P<category>r^\\|]+)\\|(?P<action>o^\\|]+)\\|(?P<description>o^\\|]+)\\|(?P<status>u^\\|]+)"
]
}
overwrite => ="message"]
on_error => "not_a_valid_syslog"
}

if not_a_valid_syslog] {
drop {
tag => "TAG_MALFORMED_MESSAGE"
}
}

mutate {
add_field => { "security_result" => "{}" }
}

# Populate UDM (Unified Data Model) fields with extracted values
mutate {
replace => {
"event.idm.read_only_udm.metadata.event_type" => "GENERIC_EVENT"
"event.idm.read_only_udm.metadata.vendor_name" => "%{device_vendor}"
"security_result.description" => "%{description}"
"security_result.threat_status" => "%{status}"
}
}

mutate {
merge => {
"event.idm.read_only_udm.security_result" => "security_result"
}
}

mutate {
merge => {
"@output" => "event"
}
}
}

}


Any guidance or suggestions on how to properly support both CEF formats in the same parser would be appreciated.

Looks  like you are probably accidentally catching the format #2 logs with the grok match intended for format #1, and this results in some of those fields (header_version, device_vendor,device_product) being set, then when the 2nd grok match runs it finds those values successfully but can’t set the variables since there are already values there and produces an error.

You can check this by adding a statedump{} command on line 92 in the parser (note that due to linewraps the line number in the parser don’t lineup with the line numbers from your paste here)
 

This lets us see that not only do we have the ‘not_a_valid_syslog’ set to true, we also have values in the fields extracted using the first match statement.

You can either update the first grok match to not match on logs with the 2nd format, or you can add any of the overlapping fields from the 2nd format into the ‘overwrite’  for the 2nd match.

In my testing I chose to add the fields into the overwrite by updating line 89 to this:

overwrite => v"header_version","device_vendor","device_product"]

And we can see that it now pulls the user, user_full_name, status and description properly.

You’ll need to continue work with your actual example logs to be sure everything parses correctly, and will need to remove the statedump{} before sending the parser to validation.

One item I noticed that you should be aware of is your use of the security_result.threat_status field.  This is an enumerated field instead of a string, so it will only accept certain values. For this field the values are listed here   https://cloud.google.com/chronicle/docs/reference/udm-field-list#securityresultthreatstatus  if the status value from your logs won’t be one of those values you’ll need to select a different UDM field for them


Reply