Skip to main content
Solved

Merge cases via API/Action/Playbook

  • November 13, 2025
  • 1 reply
  • 54 views

AnimSparrow
Forum|alt.badge.img+2

Hello,

Most of our ingested cases originate from the GraphMail source. We frequently need to reply to emails as part of our workflow, but currently, each reply generates a separate case. We want to merge or aggregate these replies into a single case to avoid fragmentation.

I initially tried using alert grouping, but the time window is too restrictive for ongoing email threads.

  • Reply subjects include prefixes like RE:, AW:, etc., which change the original subject.
  • Alternatively, use conversation.id (which we extract and ingest) or similarity in subject (post-prefix removal) to correlate messages.
  • We include a unique CASE ID in the updated subject for tracking, e.g.: [certid00091] - ORIGINAL SUBJECT → This allows regex-based matching on the CASE ID (e.g., certid\d+).

What I would like to achive:

  • Group all emails (original + replies) with the same CASE ID into one case.

Questions:

  1. Is there a built-in or recommended way to aggregate email threads in Chronicle, especially when subject prefixes vary?
  2. Can this be achieved via playbooks (e.g., on case creation, search for existing case with same CASE ID and merge)?
  3. Are there integrations (e.g., with Graph API, or third-party tools) that support email thread deduplication?
  4. Any best practices or sample YARA-L/playbook logic for merging cases programmatically?

At first glance, a playbook that:

  • Parses subject for [certid\d+]
  • Searches for existing open case with same ID
  • Merges new event into it (or updates metadata)

…seems feasible — but I’d appreciate guidance on implementation. I’ve only found some API endpoint mapping table
“cases-queue  /api/external/v1/cases-queue/bulk-operations/MergeCases  POST  MergeCases  chronicle.googleapis.com/cases.update”  but either it will be decomissioned soon or I don’t know how to use it properly

Thanks in advance for any tips, examples, or pointers!

 

Best answer by AnimSparrow

Ok, we were able to handle it:

the core was to integrate Google Chronicle integration

After that, we created action inside that will refer to the service account and refresh token from original integration, after that, there is our code to look for old ones and merge
worth mentioning that for cases to be marge you have to provide PARENT case as well to make it work (so for example Parent case 90, Merge cases 90,91,92,93)


# -*- coding: utf-8 -*-

import requests
import json
from google.oauth2.service_account import Credentials
from google.auth.transport.requests import Request as GoogleRequest
from SiemplifyAction import SiemplifyAction
import time
from datetime import datetime, timedelta

def main():
    siemplify = SiemplifyAction()
    siemplify.script_name = "Chronicle - Merge Cases"

    INSTANCE_ID = "projects/>PROJECT</locations/eu/instances/>INSTANCE<"
    MERGE_URL = f"https://europe-chronicle.googleapis.com/v1alpha/{INSTANCE_ID}/cases:merge"

    # Get Service Account
    service_account_json = siemplify.extract_configuration_param(
        provider_name=siemplify.integration_identifier,
        param_name="User's Service Account"
    )

    # Refresh token
    credentials = Credentials.from_service_account_info(
        json.loads(service_account_json),
        scopes=["https://www.googleapis.com/auth/cloud-platform"]
    )
    credentials.refresh(GoogleRequest())
    access_token = credentials.token

    # Params
    target_case_id_str = siemplify.parameters.get("CaseID", "").strip()
    specific_case = siemplify._get_case_by_id(target_case_id_str)
    conversationId = specific_case["cyber_alerts"][0]["security_events"][0]["additional_properties"]["orig_conversationId"]
    # konwersja na unix timestamp w ms
    end_time = int(time.time() * 1000)
    start_time = int((datetime.now() - timedelta(weeks=2)).timestamp() * 1000)
    cases = siemplify.get_cases_by_filter(
    entities_free_search=conversationId,
    start_time_unix_time_in_ms=start_time,
    end_time_unix_time_in_ms=end_time,
    )
    cases_without_duplicate = cases.copy()
    id_value = int(target_case_id_str)
    if id_value in cases_without_duplicate:
        cases_without_duplicate.remove(id_value)
    parent_case = cases_without_duplicate[0] if cases_without_duplicate else None
    if not parent_case or not cases:
        siemplify.end("Both parameters are required.", "false")

    try:
        parent_case = int(parent_case)
    except ValueError:
        siemplify.end("All Case IDs must be valid integers.", "false")


    payload = {
        "casesIds": cases,
        "caseToMergeWith": parent_case
    }


    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json"
    }

    try:
        response = requests.post(MERGE_URL, json=payload, headers=headers, timeout=30)
        response.raise_for_status()

        result = response.json()
        final_case_id = result.get("caseId", parent_case)

        siemplify.end(
            f"Successfully merged cases {', '.join(map(str, cases))} into case {parent_case}. "
            f"Final case ID: {final_case_id}",
            "true"
        )

    except requests.exceptions.HTTPError as e:
        try:
            error_detail = response.json().get("error", {}).get("message", response.text)
        except:
            error_detail = response.text
        siemplify.end(f"Merge failed: {error_detail}", "false")
    except Exception as e:
        siemplify.end(f"Unexpected error: {str(e)}", "false")


if __name__ == "__main__":
    main()

1 reply

AnimSparrow
Forum|alt.badge.img+2
  • Author
  • New Member
  • Answer
  • November 17, 2025

Ok, we were able to handle it:

the core was to integrate Google Chronicle integration

After that, we created action inside that will refer to the service account and refresh token from original integration, after that, there is our code to look for old ones and merge
worth mentioning that for cases to be marge you have to provide PARENT case as well to make it work (so for example Parent case 90, Merge cases 90,91,92,93)


# -*- coding: utf-8 -*-

import requests
import json
from google.oauth2.service_account import Credentials
from google.auth.transport.requests import Request as GoogleRequest
from SiemplifyAction import SiemplifyAction
import time
from datetime import datetime, timedelta

def main():
    siemplify = SiemplifyAction()
    siemplify.script_name = "Chronicle - Merge Cases"

    INSTANCE_ID = "projects/>PROJECT</locations/eu/instances/>INSTANCE<"
    MERGE_URL = f"https://europe-chronicle.googleapis.com/v1alpha/{INSTANCE_ID}/cases:merge"

    # Get Service Account
    service_account_json = siemplify.extract_configuration_param(
        provider_name=siemplify.integration_identifier,
        param_name="User's Service Account"
    )

    # Refresh token
    credentials = Credentials.from_service_account_info(
        json.loads(service_account_json),
        scopes=["https://www.googleapis.com/auth/cloud-platform"]
    )
    credentials.refresh(GoogleRequest())
    access_token = credentials.token

    # Params
    target_case_id_str = siemplify.parameters.get("CaseID", "").strip()
    specific_case = siemplify._get_case_by_id(target_case_id_str)
    conversationId = specific_case["cyber_alerts"][0]["security_events"][0]["additional_properties"]["orig_conversationId"]
    # konwersja na unix timestamp w ms
    end_time = int(time.time() * 1000)
    start_time = int((datetime.now() - timedelta(weeks=2)).timestamp() * 1000)
    cases = siemplify.get_cases_by_filter(
    entities_free_search=conversationId,
    start_time_unix_time_in_ms=start_time,
    end_time_unix_time_in_ms=end_time,
    )
    cases_without_duplicate = cases.copy()
    id_value = int(target_case_id_str)
    if id_value in cases_without_duplicate:
        cases_without_duplicate.remove(id_value)
    parent_case = cases_without_duplicate[0] if cases_without_duplicate else None
    if not parent_case or not cases:
        siemplify.end("Both parameters are required.", "false")

    try:
        parent_case = int(parent_case)
    except ValueError:
        siemplify.end("All Case IDs must be valid integers.", "false")


    payload = {
        "casesIds": cases,
        "caseToMergeWith": parent_case
    }


    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json"
    }

    try:
        response = requests.post(MERGE_URL, json=payload, headers=headers, timeout=30)
        response.raise_for_status()

        result = response.json()
        final_case_id = result.get("caseId", parent_case)

        siemplify.end(
            f"Successfully merged cases {', '.join(map(str, cases))} into case {parent_case}. "
            f"Final case ID: {final_case_id}",
            "true"
        )

    except requests.exceptions.HTTPError as e:
        try:
            error_detail = response.json().get("error", {}).get("message", response.text)
        except:
            error_detail = response.text
        siemplify.end(f"Merge failed: {error_detail}", "false")
    except Exception as e:
        siemplify.end(f"Unexpected error: {str(e)}", "false")


if __name__ == "__main__":
    main()