Skip to main content

Automating Card Leak Monitoring via Google Threat Intelligence Dark Web Search and API

  • June 19, 2026
  • 0 replies
  • 0 views

aymanabdelaziz
Staff

Financial institutions face an ongoing battle against card fraud and credential leaks. A common challenge for security teams is identifying when Bank Identification Numbers (BINs) are leaked across underground channels. To address this, Google Threat Intelligence (GTI) provides advanced, real-time dark web search capabilities to gain direct visibility into illicit communication networks.

This article details how to leverage GTI’s advanced dark web search capabilities to automate BIN leak monitoring, transitioning from manual searching to a scalable, automated pipeline.

 

Harnessing Modern Dark Web Search Capabilities

 

Google Threat Intelligence indexes underground communication platforms dynamically, allowing security teams to query massive volumes of raw dark web data using precise, granular filters. For a deeper understanding of these capabilities, you can review the Google Threat Intelligence Dark Web Guide.

By interacting with these capabilities via the API, analysts can continuously scan for targeted indicators—such as specific BIN sequences—and immediately route the findings to fraud prevention or incident response platforms.

 

Crafting the Search Query

 

To minimize false positives and capture high-fidelity alerts, the query must be structured to cross-reference target card prefixes with industry-specific contextual keywords and corporate identifiers.

The following query template isolates messenger application leaks involving specific credit or debit cards for a target financial institution:

 

type:messenger 
(content:'BIN1' OR content:'BIN2' OR content:'BIN3' OR content:'BIN4' OR content:'BIN5' OR content:'BIN6' OR content:'BIN7' OR content:'BIN8' OR content:'BIN9' OR content:'BIN10') 
(content:'Credit' OR content:'Debit') 
(content:'<FINANCIAL_INSTITUTION_NAME>')

 

Breakdown of the Query Structure:

  • type:messenger: Restricts the search surface to instant messaging platforms (such as Telegram which is commonly favored by threat actors for rapid data dumping and selling).

  • content:'<BIN_NUMBER>': An explicit OR chain containing the target organization's known card prefixes.

  • content:'Credit' OR content:'Debit': Adds specific context to focus on carding activities rather than random hits.

  • content:'<FINANCIAL_INSTITUTION_NAME>': Anchors the results to the specific brand name to validate that the leaked records are tied directly to your enterprise.

 

Automating the Collection via API

 

To export these alerts into a portable format like CSV—which can be readily ingested by SIEMs, data lakes, or external ticketing systems—we can leverage the GTI API.

The standalone Python script, leaked_cards_dw_all.py, utilizes the List DDW Communications endpoint to pull records matching our query.

 

Key Technical Pillars of the Automation Script:

  • Cursor-Based Pagination: Handles large datasets gracefully by parsing the next link provided in the API response metadata. It fetches results in optimal page sizes of 40 records up to a maximum cap of 200 records.

  • Dynamic JSON Flattening: Dark web communication payloads can contain deeply nested tracking fields, author metadata, and network paths. The script recursively flattens these structures into uniform key-value pairs.

  • Timestamp Normalization: Converts raw epoch timestamps into clean, human-readable dates (YYYY-MM-DD HH:MM:SS) during processing.

  • Resilience: Implements built-in rate-limit handling (detecting HTTP 429) to automatically pause and retry, preventing script termination during large collection runs.

Here is the production-ready automation script from leaked_cards_dw_all.py. Make sure to replace Insert_Your_GTI_API_Key and the placeholder tokens inside the FILTER_QUERY with your specific assets.


import csv

import time

import logging

import requests

from datetime import datetime

# ---------------------------------------------------------

# Configuration & Setup

# ---------------------------------------------------------

logging.basicConfig(

    level=logging.INFO,

    format="%(asctime)s - %(levelname)s - %(message)s",

    datefmt="%Y-%m-%d %H:%M:%S"

)

# NOTE: In production, load this from an environment variable

API_KEY = "Insert_Your_GTI_API_Key"

BASE_URL = "https://www.virustotal.com/api/v3/ddw_communications"

FILTER_QUERY = (

    "type:messenger "

    "(content:'BIN1' OR content:'BIN2' OR content:'BIN3' OR content:'BIN4' OR "

    "content:'BIN5' OR content:'BIN6' OR content:'BIN7' OR content:'BIN8' OR "

    "content:'BIN9' OR content:'BIN10') "

    "(content:'Credit' OR content:'Debit') "

    "(content:'<FINANCIAL_INSTITUTION_NAME>')"

)


def flatten_json(nested_json: dict, separator: str = "_") -> dict:

    """Recursively flattens a nested dictionary."""

    out = {}

    def flatten(x, name=""):

        if isinstance(x, dict):

            for a in x:

                flatten(x[a], name + a + separator)

        elif isinstance(x, list):

            i = 0

            for a in x:

                flatten(a, name + str(i) + separator)

                i += 1

        else:

            out[name[:-1]] = x

    flatten(nested_json)

    return out


def fetch_and_export_ddw_to_csv(api_key: str, query: str, output_csv: str, max_results: int = 200, page_limit: int = 40):

    """

    Paginates through results to fetch up to max_results, formats timestamps,

    and exports all nested JSON fields as distinct columns in a CSV file.

    """

    headers = {

        "x-apikey": api_key,

        "accept": "application/json"

    }

    

    # Parameters are only needed for the very first request

    params = {

        "limit": page_limit,

        "filter": query

    }

    logging.info(f"Starting fetch for up to {max_results} results in batches of {page_limit}...")

    

    all_items = []

    current_url = BASE_URL

    

    while current_url and len(all_items) < max_results:

        try:

            # If we are on the base URL, pass the params dict. 

            # Otherwise, VirusTotal's `next` link already has the parameters baked into the URL string.

            if current_url == BASE_URL:

                response = requests.get(current_url, headers=headers, params=params)

            else:

                response = requests.get(current_url, headers=headers)

            

            if response.status_code == 429:

                logging.warning("Rate limit exceeded (HTTP 429). Sleeping for 60 seconds...")

                time.sleep(60)

                continue

                

            response.raise_for_status()

            data = response.json()

            items = data.get("data", [])

            

            if not items:

                logging.info("No more results returned for this filter.")

                break

            # Calculate how many items we actually need to hit the max_results cap

            needed = max_results - len(all_items)

            items_to_add = items[:needed]

            all_items.extend(items_to_add)

            

            logging.info(f"Fetched {len(items_to_add)} items. Total collected so far: {len(all_items)}/{max_results}")

            

            # Pagination logic

            current_url = data.get("links", {}).get("next")

            time.sleep(1)

        except requests.exceptions.RequestException as e:

            logging.error(f"API Request failed: {e}")

            break

            

    if not all_items:

        logging.info("No data collected. Exiting without creating a CSV.")

        return

    logging.info("Formatting timestamps and preparing CSV data...")

    flattened_records = []

    for item in all_items:

        flat_record = flatten_json(item)

        

        # Format Timestamps

        for time_field in ['attributes_timestamp', 'attributes_date']:

            if time_field in flat_record and isinstance(flat_record[time_field], (int, float)):

                flat_record[time_field] = datetime.fromtimestamp(flat_record[time_field]).strftime('%Y-%m-%d %H:%M:%S')

        flattened_records.append(flat_record)

    # Collect unique headers

    all_csv_headers = set()

    for record in flattened_records:

        all_csv_headers.update(record.keys())

        

    all_csv_headers = sorted(list(all_csv_headers))

    try:

        with open(output_csv, mode="w", newline="", encoding="utf-8") as csv_file:

            writer = csv.DictWriter(csv_file, fieldnames=all_csv_headers)

            writer.writeheader()

            

            for record in flattened_records:

                writer.writerow(record)

                

        logging.info(f"Successfully exported {len(flattened_records)} records to {output_csv}")

    except IOError as e:

        logging.error(f"Failed to write to CSV file: {e}")


if __name__ == "__main__":

    if not API_KEY:

        logging.error("API Key is missing.")

    else:

        output_filename = "filtered_ddw_results.csv"

        # Using max_results=200 and page_limit=40 as requested

        fetch_and_export_ddw_to_csv(API_KEY, FILTER_QUERY, output_filename, max_results=200, page_limit=40)

 

 

Conclusion and Operational Flexibility

 

Transitioning dark web intelligence ingestion away from manual workflows into programmatic pipelines empowers organizations to respond rapidly to exposures. By extracting data through the GTI API using leaked_cards_dw_all.py, security teams can dynamically route structured CSV logs directly into internal security orchestration playbooks.

While the primary focus of this guide demonstrates how to capture leaked financial card data, the underlying automation architecture remains entirely agnostic to the data type. By simply modifying the FILTER_QUERY string within the script, you can easily pivot this automation tool to address a broad range of cyber threat intelligence use cases.