Skip to main content

Hi everyone,

I'm trying to send multiple alerts via a webhook to Chronicle SecOps and have them be grouped under the same case. However, when I use my job (see code below), even though both alerts are sent together and share common fields, they always end up in separate cases.

import sys
import json
import requests
from urllib3.util import parse_url

from SiemplifyJob import SiemplifyJob
from SiemplifyUtils import output_handler

# ====================
# Embedded Constants
# ====================

PROVIDER_NAME = "HTTP V2"
INTEGRATION_NAME = "HTTPV2"

API_REQUEST_METHODS_MAPPING = {
"GET": "GET",
"POST": "POST",
"PUT": "PUT",
"PATCH": "PATCH",
"DELETE": "DELETE",
"HEAD": "HEAD",
"OPTIONS": "OPTIONS",
}

AUTH_METHOD = {
"BASIC": "basic",
"API_KEY": "api_key",
"ACCESS_TOKEN": "access_token",
"NO_AUTH": None,
}

DEFAULT_REQUEST_TIMEOUT = 120
ACCESS_TOKEN_PLACEHOLDER = "{{integration.token}}"

# ====================
# Internal Classes
# ====================

class HTTPV2DomainMismatchException(Exception):
pass

class ApiParameters:
def __init__(self, test_url=None, auth_method=None, restrict_domain=False):
self.test_url = test_url
self.auth_method = auth_method
self.restrict_domain = restrict_domain

class ApiManager:
def __init__(self, session, api_params, logger):
self.session = session
self.test_url = api_params.test_url
self.auth_method = api_params.auth_method
self.restrict_domain = api_params.restrict_domain
self.logger = logger

def execute_http_request(
self,
method,
url_path,
params=None,
headers=None,
cookies=None,
body_payload=None,
follow_redirects=True,
timeout=DEFAULT_REQUEST_TIMEOUT,
):
if headers and headers.get("Cookie") and cookies:
del headers["Cookie"]

if self.restrict_domain:
url_domain = parse_url(url_path).host
test_url_domain = parse_url(self.test_url).host
if url_domain != test_url_domain:
raise HTTPV2DomainMismatchException

args = {
"method": method,
"url": url_path,
"params": params,
"headers": headers,
"cookies": cookies,
"timeout": timeout,
"allow_redirects": follow_redirects,
}

if body_payload:
if headers and "Content-Type" not in headers:
headers["Content-Type"] = "application/json"
args["data"] = body_payload

return self.session.request(**args)

# ====================
# Main Job
# ====================

SCRIPT_NAME = "Send_HTTP_Request_Single_Alert_Job"

@output_handler
def main():
siemplify = SiemplifyJob()
siemplify.script_name = SCRIPT_NAME

try:
siemplify.LOGGER.info("🔄 Starting job to create alert...")

# Parameters extracted from job configuration
method = siemplify.extract_job_param("Method", input_type=str, is_mandatory=True, print_value=True)
url_path = siemplify.extract_job_param("URL Path", input_type=str, is_mandatory=True, print_value=True)
headers_raw = siemplify.extract_job_param("Headers", input_type=str, default_value="{}", print_value=True)
url_params = siemplify.extract_job_param("URL Params", input_type=str, default_value="{}", print_value=True)
request_timeout = siemplify.extract_job_param("Request Timeout", input_type=int, default_value=DEFAULT_REQUEST_TIMEOUT, print_value=True)

# Setup HTTP session
session = requests.Session()
session.verify = True

api_params = ApiParameters(
test_url=url_path,
auth_method=auth_method,
restrict_domain=False
)
manager = ApiManager(session=session, api_params=api_params, logger=siemplify.LOGGER)

# Prepare headers
headers = json.loads(headers_raw)
if auth_method == AUTH_METHOD.get("ACCESS_TOKEN") and access_token:
headerseACCESS_TOKEN_PLACEHOLDER] = access_token

if "Content-Type" not in headers:
headerse"Content-Type"] = "application/json"

params = json.loads(url_params)

# Alert payload
body = {
"alert_identifier": "Aegis-Job-031",
"custom_fields": {
"detected_from": "Aegis",
"source_case": "15941",
"playbook_origen": "ManualTwoAlertJob"
},
"environment": "SentinelCorp",
"events": "
{
"device_product": "RULE",
"event_name": "Event_No_Name",
"severity": 2,
"source": "Aegis",
"type": "Event_No_Name"
}
],
"name": "Test alert from Aegis 2",
"priority": 10,
"rule_generator": "Test alert from Aegis 2",
"start_time": "2025-04-15T14:00:00Z"
}

# Send HTTP request
response = manager.execute_http_request(
method=method,
url_path=url_path,
params=params,
headers=headers,
cookies=None,
body_payload=json.dumps(body),
follow_redirects=True,
timeout=request_timeout
)

# Extended logging
siemplify.LOGGER.info("✅ ===========================================")
siemplify.LOGGER.info(f"✅ Response code: {response.status_code}")
siemplify.LOGGER.info(f"✅ Requested URL: {url_path}")
siemplify.LOGGER.info(f"✅ Sent headers: {json.dumps(headers, indent=2)}")
siemplify.LOGGER.info(f"✅ URL parameters: {json.dumps(params, indent=2)}")
siemplify.LOGGER.info(f"✅ Sent body: {json.dumps(body, indent=2)}")
siemplify.LOGGER.info(f"✅ Full response: {response.text}")
siemplify.LOGGER.info("✅ ===========================================")

siemplify.end("✅ Alert successfully sent.", True)

alert_created = response.json()
siemplify.LOGGER.info(f"🧾 Created alert dictionary:\n{json.dumps(alert_created, indent=2)}")

except Exception as e:
siemplify.LOGGER.error(f"❌ Error while sending alert: {e}")
siemplify.LOGGER.exception(e)
siemplify.end(f"❌ Job execution failed: {e}", False)

if __name__ == "__main__":
main()

Here are the parameters being used:

  • Method: POST

  • URL Path: A webhook URL pointing to Chronicle SecOps

  • Headers:
    {
      "Content-Type": "application/json; charset=utf-8",
      "Accept": "application/json",
      "User-Agent": "GoogleSecOps"
    }

  • URL Params:
    {
      "URL Field Name": "URL Field Value"
    }

  • Request Timeout: 120

My Python job sends two alerts with the same source_case, environment, custom_fields, and even the same display_id internally. Despite that, Chronicle SecOps creates two independent cases.

🔍 My question is:

What is the required field or mechanism in the webhook payload to ensure that multiple alerts get grouped under the same case instead of creating separate ones?

I’ve tried setting fields like source_case and display_id, but they don’t seem to have any effect on case grouping. Is there something specific Chronicle looks at for grouping alerts into the same case?

Thanks in advance for your help!

Be the first to reply!

Reply