Skip to main content

Hi,

I am trying to parse one single field in a log.

The sample log looks like this : 

<14>Mar 17 06:22:24 1.1.1.1 {"time":"17-03-2025; 11:52:09","category":24,"sourceIP":"127.0.0.1","destIP":"0.0.0.0","user":"company\\\\user.1234","download":90,"upload":202,"status":1,"contentType":"text/html","url":"http:google.com","policy":"it-dt","categoryDesc":"Business Oriented"}

This is the parser that I have written for it :

filter {
    mutate {
        replace => {
            "src_ip" => ""
        }
    }

    grok {
        match => {
            "message" => ["%{IP:src_ip}"]
        }
        overwrite => ["src_ip"]
        on_error => "not_ip"
    }


    if [src_ip] != "" {
        mutate{
            replace => {
                "event.id.read_only_udm.principal.ip" => "%{src_ip}"
            }
            on_error => "no_ip_found"
        }

        if ![no_ip_found] {
            mutate{
                replace =>{
                    "event.id.read_only_udm.principal.ip" => "%{src_ip}"
                }
            }
            mutate{
                merge => {
                    "event.id.read_only_udm.principal.ip" => "{src_ip}"
                }
                on_error => "ip_not_found"
            }
        }
    }

    mutate {
        replace => {
            "@output" => "event"
        }
    }
}
 
I am getting the following error for this : 

generic::unknown: pipeline.ParseLogEntry failed: LOG_PARSING_CBN_ERROR: "generic::invalid_argument: failed to convert raw output to events: raw output is not []interface{}, instead is: string"



So there's a several issues you'll need to address here.  I'll work them from bottom to top of your existing parser code since that will also demonstrate troubleshooting the error messages as they're discovered.


The error  `[]interface{}message you are receiving is due to you calling a `replace`  instead of a `merge` in your final output block



mutate {
merge => {
"@output" => "event"
}
}

 

After that correction we get a new error message for the next issue:
"generic::invalid_argument: failed to convert raw output to events: failed to convert raw message 0: field \\"id\\": no descriptor found" and we can find that message field "id" in the blocks where you are trying to assign the srcIP to the UDM path.  Looking at the field name format section of the udm field reference doc (https://cloud.google.com/chronicle/docs/reference/udm-field-list#field_name_format_for_parsers) we see your value 

"event.id.read_only_udm.principal.ip" is missing a "m" and should be 
"event.idm.read_only_udm.principal.ip"

Once that is resolved we get another error message "generic::invalid_argument: failed to convert raw output to events: failed to convert raw message 0: field \\"idm\\": index 0: recursive rawDataToProto failed: field \\"read_only_udm\\": index 0: recursive rawDataToProto failed: field \\"principal\\": index 0: recursive rawDataToProto failed: field \\"ip\\": failed to make strategy: received non-slice or non-array raw output for repeated field"  which indicates we are passing something that isn't an array to that repeated IP field. There is an example of the correct process for this in our parsing over view doc here https://cloud.google.com/chronicle/docs/event-processing/parsing-overview#store_the_target_ip_address_and_source_ip_address_using_the_merge_statement
comparing that example against your code we'll notice 2 things.


  1. In your merge block you are referencing "{src_ip}" which is the value of the src_ip field, instead of "src_ip" which allows the merge to work on the object, and map it as a single value array to principal.ip.

  2. In your parser are multiple replace statements here which will always fail and don't actually produce anything, these can be removed without negatively impacting parsing


So that entire `if [src_ip] != "" { ... }` block can be consolidated down to :


if [src_ip] != "" {
mutate{
merge => {
"event.idm.read_only_udm.principal.ip" => "src_ip"
}
on_error => "ip_not_found"
}
}

Once that has been changed we get another new error message "generic::invalid_argument: UDM.metadata not present", which gets to one of the core UDM requirements, every event needs an event_type defined and there are additional UDM fields required to pass validation based on the event_type.  Looking at this event this should be either NETWOCK_CONNECTION NETWORK_HTTP event, however those both require several fields that we are not currently parsing. https://cloud.google.com/chronicle/docs/unified-data-model/udm-usage#network_connection

You could avoid parsing those additional fields by adding `"event.idm.read_only_udm.metadata.event_type" => "GENERIC_EVENT"` to the first replace block. The GENERIC_EVENT event_type doesn't have any additional required fields so this would get you a parser that does run without error, but the event it produces will be extremely limited in its usefulness.  If you pursue this option I strongly recommend reviewing your grok statement to ensure you are matching the value you expect.  As currently written it will match the first IP in the logline (the source of the log) instead of the source of the traffic the event is describing. In UDM this is the difference between Source, Principal and Target nouns, these are described here: https://cloud.google.com/chronicle/docs/event-processing/udm-overview#the_principal_target_src_intermediary_observer_and_about_attributes

The recommended "correct" way will be to map to an appropriate event_type and fill in the rest of the required fields.  There's a big trick we can apply with this log format to help this process.  Since this log is SYSLOG + JSON we can do a two step field extraction to get automatic field naming without needing to write a complex grok statement.  For step 1 we'll extract the syslog timestamp, the logsource (in my example I label it oberserver IP) then pass the rest of the message to the json auto field extractor.  

Here is that all put together with a statedump{} added to help with visibility



filter {
mutate {
replace => {
"event.idm.read_only_udm.metadata.event_type" => "NETWORK_CONNECTION"
}
}

grok {
match => {
"message" => [
"%{INT}>%{SYSLOGTIMESTAMP:time} %{IPV4:observer} %{GREEDYDATA:jsonmessage}"
]
}
overwrite => ["time", "observer","jsonmessage"]
on_error => "pattern_not_matched"
}
json {
source => "jsonmessage"
array_function => "split_columns"
on_error => "invalid_json"
}

if [sourceIP] != "" {
mutate{
merge => {
"event.idm.read_only_udm.principal.ip" => "sourceIP"
}
on_error => "ip_not_found"
}
}

#ADD YOUR ADDITIONAL FIELD MAPPINGS HERE

statedump{}

mutate {
merge => {
"@output" => "event"
}
}
}



You'll need to fill in the additional required fields but will probably also want to include some optional ones to increase the usability of the log. This is easier with the statedump view letting you see how the fields extracted. 

As you work through this, the fields in your log source are very similar to the fields we use in this example https://cloud.google.com/chronicle/docs/event-processing/parsing-overview#steps_within_parser_instructions if you get stuck mapping a particular field there should be examples in there.











Reply