Skip to main content

Very new to parsing in SecOps - looking to understand the best approach when dealing with JSON structured data that requires parsing / GROK with the jsonPayload.message section. 

Here is an example log:
{
"insertId": "id",
"jsonPayload": {
"facility": "local7",
"message": "random unstructured data I am wanting to parse to UDM - could contain User, source and Dest IP etc. ",
"source_timestamp": "1999-01-31T12:59:59+00:00",
"sysloghost": "syslog_host",
"syslogip": "syslog_ip"
},
"labels": {
"cloud.region": "gcp_region",
"first_observed_timestamp": "1999-01-31T12:59:59Z",
"host.name": "source_hostname",
"last_observed_timestamp": "1999-01-31T12:59:59Z",
"log.file.name": "my_log_filename",
"log.file.path": "my_log_path",
"log_count": "1"
},
"logName": "gcp_log_filename_path",
"receiveTimestamp": "1999-01-31T12:59:59",
"resource": {
"labels": {
"location": "gcp_region",
"namespace": "",
"node_id": "host_node_id",
"project_id": "gcp_project"
},
"type": "generic_node"
},
"severity": "INFO",
"timestamp": "1999-01-31T12:59:59"
}

Looking to understand how to map fields "project_id, type, syslogip etc." to UDM fields, as well as the jsonPayload.message fields that I can extract with GROK (e.g. source/dest IP, usernames etc). Just need the basic structure. 

Thanks!

Here is a sample code that parses your log.  I am using statedump for testing purpose and you should remove before submitting into production.

filter { mutate { replace => { "var_target" => "" } } json { source => "message" array_function => "split_columns" on_error => "not_a_json" } statedump{} if ![not_a_json] { mutate { replace => { "event.idm.read_only_udm.metadata.event_type" => "GENERIC_EVENT" "event.idm.read_only_udm.principal.hostname" => "%{jsonPayload.sysloghost}" } } if [resource][labels][project_id] != "" { mutate { replace => { "project_resource_ancestors.name" => "%{resource.labels.project_id}" } on_error => "project_id_not_present" } mutate { replace => { "project_resource_ancestors.resource_type" => "CLOUD_PROJECT" } on_error => "resource_type_not_present" } if ![project_id_not_present] and ![resource_type_not_present] { mutate { merge => { "var_target.resource_ancestors" => "project_resource_ancestors" } } } } if [var_target] != "" { mutate { rename => { "var_target" => "event.idm.read_only_udm.target" } } } mutate { merge => { "@output" => "event" } on_error => "event_generation_failure" } } }

Amazing, thank you so much! More than enough to get me started!