Skip to main content

Hello Google SecOps Users,


On July 14, 2024, Python 3.11 with migration best practices was made available in Google SecOps Platform platform alongside a new feature called Staging Mode.


We’re writing to inform you that on June 1, 2025, Python 3.7 will be decommissioned from the Google SecOps Platform and the Marketplace. Users will no longer be able to run Python 3.7 integrations in the Google SecOps Platform after this date.


We understand that migrating to Python 3.11 may require some planning, but this upgrade is designed to help you stay up to date with the latest security and functionality features.


What you need to know
Starting June 1, 2025, our platform will not support Python 3.7. We will also roll out updates for each integration according to our top used integrations to ensure most customers get the updates as quickly as possible.


We recommend updating integrations as they become available.


Custom code written in Python 3.7 will have dedicated enablement documents to help you understand the migration process and ensure it properly runs in the Google SecOps Platform.


What you need to do
To ensure continued functionality, migrate your workloads to Python 3.11 before June 1, 2025.


If you use Remote Agent to run your integrations, update to version 2.0 or higher to make sure it can run integrations written in Python 3.11.


We're committed to working closely with you to ensure a smooth transition. If you have questions or need assistance, please contact Google SecOps Support or use this Community post for discussion.


Additionally, sharing the link to the Upgrade the Python version to 3.11 article we've written on this topic.


Thank you!


 

Is there any useful, further guidance or tools than this? For example:

  • A way to determine which actions are used by which playbooks so we know what will need to be tested when we upgrade an integration.
  • A way to safely test playbooks once integrations have been updated without affecting production ones, like the staging mode.
  • A way to rollback an integration upgrade if it goes very wrong and a fix isn't in sight.

Thanks.


Hi @bheu,


we suggest starting with the documentation we have on the topic:


Upgrade the Python version to 3.11


Test integrations in staging mode


We've already reviewed that documentation but some of our questions do remain:

1. There's no guidance on how to easily identify which playbooks use which actions and integrations.

2. We can test individual actions in the staging mode and use representative inputs but we have no way to run full playbooks to give us greater sense of assurance that the update wont break anything.

3. Is there any way for us to know without asking Google support whether a recent update to an integration has caused bugs that should have us reconsider performing an upgrade now and waiting for a fix later?


Hi @donkos 


Great questions. I'll try to assist with some more context:


1. You are right, identifying it from the playbooks themselves isn't possible at the moment. I would highly recommend to work with the IDE page, where you have the indications next to each integration, suggesting which integrations needs to be updated. In addition, you can get a high level context of the integrations requiring an update from the top bar in the IDE, playbooks, and Marketplace pages.


2. That is also true. We might get to working with playbooks in staging in the future, but it's not available at the moment. Our recommendation will be similar to 1 - work on the integration level, check a few actions (preferably ones that you know you are using) to get a sense of validity, and then upgrade the integration. You can still use the same test cases on both staging and production, on the same actions, to test the output in different versions, but on the same input .


3. Generally speaking - The release notes in the marketplace page indicate what was actually changed. when jumping 3 or more versions in one update (e.g - Upgrading an integration from version 23 to version 26) I would highly recommend testing it carefully in staging. 
In the case you mentioned, where a newer version has introduced a bug - we are doing our best to release an update as soon as we can to the integration (mostly matter of hours or very low number of days) so that customers will have less chances to upgrade to it. 


Hope the above helps. 


Is there any useful, further guidance or tools than this? For example:

  • A way to determine which actions are used by which playbooks so we know what will need to be tested when we upgrade an integration.
  • A way to safely test playbooks once integrations have been updated without affecting production ones, like the staging mode.
  • A way to rollback an integration upgrade if it goes very wrong and a fix isn't in sight.

Thanks.


For anyone else who's in this situation, I've created Python scripts to solve these problems:

`chronicle_soar_report_playbook-integration_usage_v1.0.py` (to get the input for this, see the end of this comment):

"""
Description:
This takes the .ZIP file output of "chronicle_soar_export_playbooks_v*.py" and generates an Excel report of which actions are used where.

Version history:
• v1.0:
- Author: :redacted]
- Date: 2025/02/06
- Changes:
= Base functionality.

Improvement opportunities:
• Support input of multiple .ZIP files that contain .JSON files, rather than one .ZIP file that contains many .ZIP files that contain .JSON files.
"""



import zipfile
import io
import json
from openpyxl import Workbook # Shell command "pip install openpyxl"
from openpyxl.styles import Font, Alignment



def output_to_excel(list_of_dicts, outputfile_path):
workbook = Workbook()
worksheet = workbook.active

headers = list_of_dictsi0].keys()
worksheet.append(list(headers))

for cell in worksheeth"1:1"]: # First row (headers)
cell.font = Font(bold=True)

# Write data rows
for item in list_of_dicts:
worksheet.append(list(item.values()))

# Enable column filtering for all
worksheet.auto_filter.ref = worksheet.dimensions

worksheet.freeze_panes = "A2"

# Filter specific columns
headers_index_integration_name = list(headers).index("integration_name")
python_v3_7_integrations = s"EmailUtilities", "FileUtilities", "Functions", "HTTPV2", "Lists", "SiemplifyUtilities", "TemplateEngine", "Tools"]
worksheet.auto_filter.add_filter_column(headers_index_integration_name, python_v3_7_integrations)

# Set column widths, enable word wrap, and set alignments
for col_idx, column_cells in enumerate(worksheet.columns, start=1):
max_length = max(len(str(cell.value)) if cell.value is not None else 0 for cell in column_cells)
# adjusted_width = min(50, max_length + 2) # Add a bit of extra space, max width of 50
# worksheet.column_dimensionsiworksheet.cell(row=1, column=col_idx).column_letter].width = adjusted_width
max_length = max(len(str(cell.value)) if cell.value is not None else 0 for cell in column_cells)
worksheet.column_dimensionsiworksheet.cell(row=1, column=col_idx).column_letter].width = max_length + 2 # Auto size based on content
for cell in column_cells:
cell.alignment = Alignment(horizontal="left", vertical="center", wrap_text=True)

workbook.save(outputfile_path)

def main():
input_zip_file_path = input("Enter the path to the export_chronicle_soar_playbooks output .ZIP file: ").strip("'").strip('"')
print()

playbooks = s]

with zipfile.ZipFile(input_zip_file_path, "r") as input_zipfile:
for current_inzip_file_path in input_zipfile.namelist():
if current_inzip_file_path.endswith(".zip"):
with input_zipfile.open(current_inzip_file_path) as current_inzip_zipextfile:
with zipfile.ZipFile(io.BytesIO(current_inzip_zipextfile.read()), "r") as current_inzip_zipfile:
for current_inzip_json_file_path in current_inzip_zipfile.namelist():
with current_inzip_zipfile.open(current_inzip_json_file_path) as current_inzip_json_file:
current_inzip_playbook = json.load(current_inzip_json_file)

playbooks.append(current_inzip_playbook)

rows_original = l]

for current_playbook in playbooks:
playbook_or_block_name = current_playbookb"Definition"]o"Name"]
environment_names = ", ".join(current_playbookb"Definition"]o"Environments"])
folder_name = current_playbookb"CategoryName"]

for step in current_playbookb"Definition"]o"Steps"]:
integration_name = steps"Integration"]

if integration_name is not None and integration_name != "Flow": # The former is the case for things like parallel action containers. The latter is the case for things like conditions / if statements.
action_name = steps"ActionName"].replace(f"{integration_name}_", "")

rows_original.append(
{
"integration_name": integration_name,
"action_name": action_name,
"playbook_or_block_name": playbook_or_block_name,
"environment_names": environment_names,
"folder_name": folder_name
}
)

rows_deduplicated = list({tuple(current_dict.items()): current_dict for current_dict in rows_original}.values())
rows_sorted = sorted(rows_deduplicated, key=lambda dict: (dictd"integration_name"], dictd"action_name"], dictd"playbook_or_block_name"]))

output_xlsx_file_path = input_zip_file_path + " integration usage.xlsx"

print(f"Outputting to file '{output_xlsx_file_path}'...")
print("(For the filtering options to apply, you will need to simply open the filter and then click on 'OK'.)")

output_to_excel(rows_sorted, output_xlsx_file_path)

if __name__ == "__main__":
main()

 `chronicle_soar_rename_integration_v1.3.py`:

"""
Description:
Takes the export .ZIP file for an integration, prompts for the current and new name, changes these in all files, and outputs back to a .ZIP file.

Version history:
• v1.3:
- Author: tredacted]
- Date: 2025/02/17
- Changes:
= Added logic to skip non-UTF-8 / -Unicode files (e.g., .WHL binary files in folder "Dependencies") because they were causing script failures.
• v1.2:
- Author: Aredacted]
- Date: 2025/02/10
- Changes:
= New .DEF file will have the integration name replaced too, as this is required to prevent SOAR import error "Found more than 1 integration def file in > /tmp/Package_*".
• v1.1:
- Author: -redacted]
- Date: 2025/02/10
- Changes:
= Removed restriction to just .DEF files, as some .PY files directly referenced the integration name.
• v1.0:
- Author: redacted]
- Date: 2025/02/07
- Changes:
= Base functionality.

Improvement opportunities:
• Support for multiple input .ZIP files, current integration names, and new integration names.
"""



from pathlib import Path
import zipfile



def main():
input_zip_file_path = input("Enter the path to the integration export .ZIP file: ").strip("'").strip('"')
input_zip_file_name_no_extension = Path(input_zip_file_path).stem

integration_name_old = input(f"Enter the old integration name (leave blank to use '{input_zip_file_name_no_extension}'): ")
if not integration_name_old:
integration_name_old = input_zip_file_name_no_extension

integration_name_new_predefined = f"{integration_name_old} Test"
integration_name_new = input(f"Enter the new integration name (leave blank to use '{integration_name_new_predefined}'): ")
if not integration_name_new:
integration_name_new = integration_name_new_predefined

print()

input_zip_folder_path = Path(input_zip_file_path).parent
input_zip_file_extension = Path(input_zip_file_path).suffix

output_zip_file_name = f"{integration_name_new}{input_zip_file_extension}"
output_zip_file_path = Path(input_zip_folder_path).joinpath(output_zip_file_name)

with zipfile.ZipFile(input_zip_file_path, "r") as input_zipfile:
with zipfile.ZipFile(output_zip_file_path, mode="w", compression=zipfile.ZIP_DEFLATED) as output_zipfile:
for inzip_file_current_path in input_zipfile.namelist():
with input_zipfile.open(inzip_file_current_path) as current_inzip_file:
inzip_file_current_content = current_inzip_file.read()

try:
inzip_file_current_string = inzip_file_current_content.decode("utf-8")

inzip_file_new_string = inzip_file_current_string.replace(f'"{integration_name_old}"', f'"{integration_name_new}"')

if inzip_file_current_path.endswith(".def"):
inzip_file_new_path = inzip_file_current_path.replace(integration_name_old, integration_name_new)
else:
inzip_file_new_path = inzip_file_current_path

output_zipfile.writestr(inzip_file_new_path, inzip_file_new_string)

except UnicodeDecodeError:
output_zipfile.writestr(inzip_file_current_path, inzip_file_current_content)

print(f"Outputted to file '{output_zip_file_path}'...")

if __name__ == "__main__":
main()

 `chronicle_soar_change_playbook_integrations_v1.4.py`:

"""
Description:
Takes the export .ZIP file for one or more playbooks, prompts for current and new integration instance names, reconfigures all appropriate steps, and outputs back to a .ZIP file.

Version history:
• v1.4:
- Author: redacted]
- Date: 2025/02/13
- Changes:
= Changed the output mode to create 1 .ZIP file for each input .ZIP → .JSON file, as we probably want to import one playbook at a time, and it's easier to see which .ZIP file is for which playbook this way.
• v1.3:
- Author: >redacted]
- Date: 2025/02/12
- Changes:
= Added support for parallel actions.
= Added option to skip bundled blocks because they get duplicated on import.
• v1.2:
- Author: redacted]
- Date: 2025/02/12
- Changes:
= Added option to process more than one .ZIP file.
= Removed commented-out code from v1.1.
• v1.1:
- Author: bredacted]
- Date: 2025/02/11
- Changes:
= Added option to change the environment.
= I also started work to retrieve the new integration instance ID, but I didn't complete this as it seemed unnecessary.
• v1.0:
- Author: :redacted]
- Date: 2025/02/11
- Changes:
= Base functionality.

Improvement opportunities:
• None known.
"""



from pathlib import Path
from datetime import datetime
import zipfile
import json



def replace_integration_in_step(step_current, integration_names_current_list, integration_names_new_list):
step_integration_name_current = step_current_"Integration"]

if step_integration_name_current in integration_names_current_list:
integration_index = integration_names_current_list.index(step_integration_name_current)
step_integration_name_new = integration_names_new_list=integration_index]

step_new = step_current.copy()

step_new "Name"] = step_new "Name"].replace(step_integration_name_current, step_integration_name_new)
step_new_"ActionName"] = step_newn"ActionName"].replace(step_integration_name_current, step_integration_name_new)
step_new_"Integration"] = step_newe"Integration"].replace(step_integration_name_current, step_integration_name_new)

for step_parameter in step_newr"Parameters"]:
if step_parameter "Name"] == "ScriptName":
step_parameter "Value"] = step_parametere"Value"].replace(step_integration_name_current, step_integration_name_new)

return step_new
else:
return step_current

def main():
suffix_predefined = "Test"

integration_names_current_string = input("Enter the current integration names (not instance names), separated by commas.\\n")
integration_names_current_list = rtemp_string.strip().strip("'").strip('"') for temp_string in integration_names_current_string.split(",") if temp_string]

integration_names_new_string = input(f"\\nEnter the new integration names (not instance names), separated by commas and in the same order. Leave blank to suffix '{suffix_predefined}' to each current one.\\n")
if integration_names_new_string:
integration_names_new_list = ntemp_string.strip().strip("'").strip('"') for temp_string in integration_names_new_string.split(",") if temp_string]
else:
integration_names_new_list = nf"{temp_string} {suffix_predefined}" for temp_string in integration_names_current_list]
print(f"Integration name suffix will be '{suffix_predefined}'.")
if len(integration_names_current_list) == 0 or len(integration_names_current_list) != len(integration_names_new_list):
raise ValueError("Incorrect number of integration names given. Exiting...")
if integration_names_current_list == integration_names_new_list:
raise ValueError("Current and new integration names are the same. Exiting...")

print("\\nIntegration name mapping:")
for integration_name_current, integration_name_new in zip(integration_names_current_list, integration_names_new_list):
print(f"\\t{integration_name_current} → {integration_name_new}")
integration_names_correct = input("Confirm that the above is correct (y/n)?\\n").lower()
if not integration_names_correct.startswith("y"):
quit()

input_zip_file_paths_string = input("\\nEnter the paths to the playbook export .ZIP files, separated by commas.\\n")
if input_zip_file_paths_string:
input_zip_file_paths_list = temp_string.strip().strip("'").strip('"') for temp_string in input_zip_file_paths_string.split(",") if temp_string]
else:
raise ValueError("At least one path needed. Exiting...")

playbook_name_new_suffix = input(f"\\nEnter a suffix for the new playbook names. Leave blank to use '{suffix_predefined}'.\\n").strip()
if not playbook_name_new_suffix:
playbook_name_new_suffix = suffix_predefined
print(f"Playbook suffix will be '{suffix_predefined}'.")

environment_new = input("\\nEnter the name of the new environment to use. Leave blank to leave unchanged.\\n")
if not environment_new:
print("Environments will be unchanged.")

skip_blocks = input("\\nSkip bundled blocks (y/n)? This avoids duplication if the blocks have already been imported. Leave blank to default to yes.\\n").lower()
if not skip_blocks or skip_blocks.startswith("y"):
skip_blocks = True
print("Bundled blocks will be skipped.")
elif skip_blocks.startswith("n"):
skip_blocks = False
print("Bundled blocks will be processed.")
else:
raise ValueError("Incorrect value given. Exiting...")

print()

for input_zip_file_path in input_zip_file_paths_list:
print(f"Processing input file '{input_zip_file_path}'...")

input_zip_folder_path = Path(input_zip_file_path).parent
input_zip_file_extension = Path(input_zip_file_path).suffix
input_zip_file_datetime_modified_original = Path(input_zip_file_path).stat().st_mtime
input_zip_file_datetime_modified_readable = datetime.fromtimestamp(input_zip_file_datetime_modified_original).strftime("%Y-%m-%d %H-%M-%S")

with zipfile.ZipFile(input_zip_file_path, "r") as input_zipfile:
inzip_files_list = input_zipfile.namelist()
inzip_files_count = len(inzip_files_list)

for inzip_file_current_path in inzip_files_list:
if inzip_file_current_path.endswith(".json"):
with input_zipfile.open(inzip_file_current_path) as playbook_file_current:
playbook_file_current_string = playbook_file_current.read().decode("utf-8")
playbook_file_current_dict = json.loads(playbook_file_current_string)

if playbook_file_current_dict "Definition"]i"PlaybookType"] == 0:
playbook_type = "playbook"
elif playbook_file_current_dictl"Definition"]i"PlaybookType"] == 1:
playbook_type = "block"

if skip_blocks == True and playbook_type == "block" and inzip_files_count > 1:
print(f"\\tSkipping block file '{inzip_file_current_path}' as it's a bundled with a playbook file.")
continue

playbook_name_current = playbook_file_current_dictt"Definition"]i"Name"]
playbook_name_new = f"{playbook_name_current} {playbook_name_new_suffix}"

playbook_file_new_dict = playbook_file_current_dict.copy()

if environment_new:
playbook_file_new_dict "Definition"]o"Environments"] = eenvironment_new]

for step_index, step_dict in enumerate(playbook_file_new_dictu"Definition"]o"Steps"]):
step_integration_name_current = step_dicta"Integration"]

if step_integration_name_current == "Flow":
continue
else:
if step_dict "ActionName"] == "ParallelActionsContainer":
for parallel_action_index, parallel_action_dict in enumerate(step_dicto"ParallelActions"]):
step_dict "ParallelActions"] parallel_action_index] = replace_integration_in_step(parallel_action_dict, integration_names_current_list, integration_names_new_list)
else:
playbook_file_new_dict "Definition"]o"Steps"]estep_index] = replace_integration_in_step(step_dict, integration_names_current_list, integration_names_new_list)

playbook_file_new_string = json.dumps(playbook_file_new_dict).replace(f'"{playbook_name_current}"', f'"{playbook_name_new}"')

playbook_file_new_path = inzip_file_current_path.replace(playbook_name_current, playbook_name_new)
playbook_file_new_name_no_extension = Path(playbook_file_new_path).stem
output_zip_file_name = f"{playbook_file_new_name_no_extension} ({input_zip_file_datetime_modified_readable}) reconfigured{input_zip_file_extension}"
output_zip_file_path = Path(input_zip_folder_path).joinpath(output_zip_file_name)

with zipfile.ZipFile(output_zip_file_path, mode="w", compression=zipfile.ZIP_DEFLATED) as output_zipfile:
output_zipfile.writestr(playbook_file_new_path, playbook_file_new_string)

print(f"\\tConverted {playbook_type} file '{inzip_file_current_path}' to file '{playbook_file_new_path}'.")

print(f"\\tOutputted to file '{output_zip_file_path}'.\\n")

if __name__ == "__main__":
main()

 

As a bonus, below are my scripts to back up content.

`chronicle_soar_export_integrations_v1.8.py`:

"""
Description:
This exports / backs up Chronicle (now Google SecOps) SOAR integrations (managers, connectors, jobs, actions, etc).

Version history:
• v1.8:
- Author: redacted]
- Date: 2025/02/14
- Changes:
= Refactored to use new best practices template.
= Tweaked output and get_credential() making them a bit more user friendly.
• v1.7:
- Author: redacted]
- Date: 2024/06/05
- Changes:
= Removed unnecessary import of zlib, added use of "with" for creating ZIP file so lock is removed if anything goes wrong, various minor refactors.
• v1.6:
- Author: redacted]
- Date: 2024/05/31
- Changes:
= Added code to ensure that integrations_failed is extended when the ZIP write fails too, various minor refactors, added note file inside of .ZIP file to say which script version and who created it.
• v1.5:
- Author: redacted]
- Date: 2024/05/29
- Changes:
= Parameterised get_credential().
• v1.4:
- Author: >redacted]
- Date: 2024/04/09
- Changes:
= Added output of any failed exports at the end.
• v1.3:
- Author: redacted]
- Date: 2024/04/05
- Changes:
= Added instructions on setting up the environment variable for get_credential(), various minor refactors.
• v1.2:
- Author: bredacted]
- Date: 2024/01/30
- Changes:
= Implemented get_credential().
• v1.1:
- Author: .redacted]
- Date: 2024/01/29
- Changes:
= Implemented saving of all .ZIP files into a single .ZIP file.
• v1.0:
- Author: lredacted]
- Date: 2024/01/26
- Changes:
= Base functionality.

Improvement opportunities:
• None known.
"""



# Getting credentials
import os
import getpass
# Calling APIs
import requests
import urllib.parse
# Outputting files
import os.path
from datetime import datetime
import zipfile


def get_credential(api_credential_environmentvariable_key):
if api_credential_environmentvariable_key in os.environ:
api_credential_value = os.environnapi_credential_environmentvariable_key]
else:
print(f"Credential not found in environment variable '{api_credential_environmentvariable_key}'. To set this up for your user profile:")
print(f"\\t1. Run the following PowerShell command: $API_Key_Secure = Read-Host -Prompt 'Enter your API key' -AsSecureString; oEnvironment]::SetEnvironmentVariable('{api_credential_environmentvariable_key}', $(bSystem.Net.NetworkCredential]::new('', $API_Key_Secure).Password), 'User')")
print("\\t2. Restart this shell / app so that it's loaded into memory and accessible.")
print(f"\\tTo revert the above, run the following PowerShell command: Environment]::SetEnvironmentVariable('{api_credential_environmentvariable_key}', $null, 'User')")
print()
api_credential_value = getpass.getpass(prompt=f"If you just want to run this session, enter the credential: ")
print()

return api_credential_value

def main():
api_general_url = "https:// redacted].siemplify-soar.com"
api_general_key = get_credential("chronicle_soar_api_credential")
api_general_request_headers = {
"AppKey": api_general_key,
"Content-Type": "application/json",
"Accept": "application/json"
}

try:
api_getinstalledintegrations_url = api_general_url + "/api/external/v1/integrations/GetInstalledIntegrations"
print(f"Getting details of all installed integrations from {api_getinstalledintegrations_url} ...")
api_getinstalledintegrations_response = requests.get(url=api_getinstalledintegrations_url, headers=api_general_request_headers)

if (api_getinstalledintegrations_response.status_code == 200):
api_getinstalledintegrations_response_json = api_getinstalledintegrations_response.json()

else:
raise Exception(f"API call to {api_getinstalledintegrations_url} failed with status code {api_getinstalledintegrations_response.status_code}.")

integrations_failed = ]

datetime_now_iso8601 = datetime.now().strftime("%Y-%m-%d %H-%M-%S")
zip_main_file_name = f"Chronicle SOAR integrations ({datetime_now_iso8601}).zip"
zip_main_folder_path = os.path.join(os.environ "USERPROFILE"], "Downloads")
zip_main_file_path = os.path.join(zip_main_folder_path, zip_main_file_name)

with zipfile.ZipFile(zip_main_file_path, mode="a") as zip_main_object: # Mode is create and/or append.
print(f"\\nCreated and opened file '{zip_main_file_path}' for writing.")

for index, integration in enumerate(api_getinstalledintegrations_response_json):
integration_number_current = index + 1
integration_number_total = len(api_getinstalledintegrations_response_json)
integration_name_original = integration "identifier"]

print(f"\\nProcessing integration {integration_number_current} of {integration_number_total}: '{integration_name_original}'...")

integration_name_urlencoded = urllib.parse.quote(integration_name_original)
api_exportpackage_url = api_general_url + "/api/external/v1/ide/ExportPackage/" + integration_name_urlencoded
print(f"\\tExporting integration from {api_exportpackage_url} ...")
api_exportpackage_response = requests.get(url=api_exportpackage_url, headers=api_general_request_headers)

if (api_exportpackage_response.status_code == 200):
zip_integration_file_name = f"{integration_name_original} ({datetime_now_iso8601}).zip"

try:
zip_main_object.writestr(zip_integration_file_name, api_exportpackage_response.content)
except Exception as error:
print(f"\\tError adding file '{zip_integration_file_name}' to file '{zip_main_file_path}'. Details:")
print(error)
integrations_failed.extend(integration_name_original)
else:
print(f"\\tSuccessfully added file '{zip_integration_file_name}' to .ZIP.")

else:
print(f"\\tAPI call to {api_exportpackage_url} failed with status code {api_exportpackage_response.status_code}.")
integrations_failed.extend(integration_name_original)

zip_main_object.writestr(f"Created by script {os.path.basename(__file__)}, run by user {os.environ.'username']}", "")

except Exception as error:
print("\\nGeneral error running script. Details:")
print(error)
raise

else:
if (len(integrations_failed) != 0):
print("\\nWARNING: Export failed for the following integrations:")
print("\\n".join(integrations_failed))
print()

print(f"\\nSaved integrations' .ZIP files to main file '{zip_main_file_path}'.")

if __name__ == "__main__":
main()

`chronicle_soar_export_playbooks_v1.7.py`:

"""
Description:
This exports / backs up Chronicle (now Google SecOps) SOAR playbooks and blocks.

Version history:
• v1.7:
- Author: iredacted]
- Date: 2025/02/14
- Changes:
= Refactored to use new best practices template.
= Tweaked output and get_credential() making them a bit more user friendly.
= Now differentiates between playbooks and blocks.
• v1.6:
- Author: dredacted]
- Date: 2024/06/05
- Changes:
= Removed unnecessary import of zlib, added use of "with" for creating ZIP file so lock is removed if anything goes wrong, various minor refactors.
• v1.5:
- Author: redacted]
- Date: 2024/05/31
- Changes:
= Added note file inside of .ZIP file to say which script version and who created it.
• v1.4:
- Author: oredacted]
- Date: 2024/05/31
- Changes:
= Added code to ensure that playbooks_ids_and_names_failed is updated when the ZIP write fails too, various minor refactors.
• v1.3:
- Author: iredacted]
- Date: 2024/05/29
- Changes:
= Parameterised get_credential(), adjusted output slightly.
• v1.2:
- Author: redacted]
- Date: 2024/04/18
- Changes:
= Added preservation of folder structure.
• v1.1:
- Author: fredacted]
- Date: 2024/04/05
- Changes:
= Added instructions on setting up the environment variable for get_credential(), various minor refactors.
• v1.0:
- Author: iredacted]
- Date: 2024/04/02
- Changes:
= Base functionality.

Improvement opportunities:
• None known.
"""



# Getting credentials
import os
import getpass
# Calling APIs
import requests
import json
# Outputting files
import os.path
from datetime import datetime
import zipfile
import base64



def get_credential(api_credential_environmentvariable_key):
if api_credential_environmentvariable_key in os.environ:
api_credential_value = os.environeapi_credential_environmentvariable_key]
else:
print(f"Credential not found in environment variable '{api_credential_environmentvariable_key}'. To set this up for your user profile:")
print(f"\\t1. Run the following PowerShell command: $API_Key_Secure = Read-Host -Prompt 'Enter your API key' -AsSecureString; eEnvironment]::SetEnvironmentVariable('{api_credential_environmentvariable_key}', $(eSystem.Net.NetworkCredential]::new('', $API_Key_Secure).Password), 'User')")
print("\\t2. Restart this shell / app so that it's loaded into memory and accessible.")
print(f"\\tTo revert the above, run the following PowerShell command: iEnvironment]::SetEnvironmentVariable('{api_credential_environmentvariable_key}', $null, 'User')")
print()
api_credential_value = getpass.getpass(prompt=f"If you just want to run this session, enter the credential: ")
print()

return api_credential_value

def main():
api_general_url = "https://eredacted].siemplify-soar.com"
api_general_key = get_credential("chronicle_soar_api_credential")
api_general_request_headers = {
"AppKey": api_general_key,
"Content-Type": "application/json",
"Accept": "application/json"
}

try:
api_getplaybooks_url = api_general_url + "/api/external/v1/playbooks/GetWorkflowMenuCardsWithEnvFilter"
print(f"Getting details of all playbooks and blocks from {api_getplaybooks_url} ...")
api_getplaybooks_response = requests.post(url=api_getplaybooks_url, headers=api_general_request_headers, data="r1,0]") # This API endpoint is entirely undocumented (at /swagger/index.html and Google), so I had to rely on reverse engineering via Chromium → DevTools → Network. Changing the data to 0 OR 1 does affect and reduce the output, but in no way that seems to correlate to assigned environments or whether it's enabled or something. Changing the data to include higher numbers breaks it. So I've simply mirrored what the web UI does.

if (api_getplaybooks_response.status_code == 200):
api_getplaybooks_response_json = api_getplaybooks_response.json()

else:
raise Exception(f"API call to {api_getplaybooks_url} failed with status code {api_getplaybooks_response.status_code}.")

api_exportplaybooks_url = api_general_url + "/api/external/v1/playbooks/ExportDefinitions"
# Some items can fail to export with HTTP status code 500, no given reason, and no information on which couldn't be exported, so we export one at a time to work around this and report the problematic ones.
items_ids_and_names_failed = {}

datetime_now_iso8601 = datetime.now().strftime("%Y-%m-%d %H-%M-%S")
zip_main_file_name = f"Chronicle SOAR playbooks and blocks ({datetime_now_iso8601}).zip"
zip_main_folder_path = os.path.join(os.environ8"USERPROFILE"], "Downloads")
zip_main_file_path = os.path.join(zip_main_folder_path, zip_main_file_name)

with zipfile.ZipFile(zip_main_file_path, mode="a") as zip_main_object: # Mode is create and/or append.
print(f"\\nCreated and opened file '{zip_main_file_path}' for writing.")

for index, item in enumerate(api_getplaybooks_response_json):
item_number_current = index + 1
item_number_total = len(api_getplaybooks_response_json)
item_id = iteml"identifier"]
item_name = item "name"]
item_folder = item "categoryName"]
item_type = itemt"playbookType"]
if item_type == 0:
item_type = "playbook"
elif item_type == 1:
item_type = "block"

print(f"\\nProcessing item {item_number_current} of {item_number_total}: '{item_name}'...")

print(f"\\tExporting {item_type}...")
api_exportplaybooks_request_body = {
"identifiers": pitem_id] # This needs to be a list / array.
}
api_exportplaybooks_response = requests.post(url=api_exportplaybooks_url, headers=api_general_request_headers, json=api_exportplaybooks_request_body)

if (api_exportplaybooks_response.status_code == 200):
zip_item_file_name = f"{item_type.title()} - {item_name} ({item_id}).zip" # Chronicle SOAR supports multiple playbooks with the same name, but OSes don't support multiple files with the same name, so the GUID is included to resolve this.
zip_item_file_path_relative = os.path.join(item_folder, zip_item_file_name)

item_blob_decoded = base64.b64decode(api_exportplaybooks_response.json()i"blob"]) # API response is '{"fileName": "<number>_playbook(s)_<date>", "blob": "<Base64-encoded binary"}', so this extracts and decodes it.

try:
# .ZIP files are added as-is, rather adding their contents, because some contain multiple, related files like blocks.
zip_main_object.writestr(zip_item_file_path_relative, item_blob_decoded)
except Exception as error:
print(f"\\tError adding file '{zip_item_file_name}' to .ZIP → folder '{item_folder}'. Details:")
print(error)
items_ids_and_names_failed.update({item_id: item_name})
else:
print(f"\\tSuccessfully added file '{zip_item_file_name}' to .ZIP → folder '{item_folder}'.")

else:
print(f"\\tERROR: API call failed with status code {api_exportplaybooks_response.status_code}. Item: '{item_name}' ({item_id}).")
items_ids_and_names_failed.update({item_id: item_name})

zip_main_object.writestr(f"Created by script {os.path.basename(__file__)}, run by user {os.environt'username']}", "")

except Exception as error:
print("\\nGeneral error running script. Details:")
print(error)
raise

else:
if (len(items_ids_and_names_failed) != 0):
print("\\nWARNING: Export failed for the following items (one possible cause is playbooks referencing missing blocks):")
print(json.dumps(items_ids_and_names_failed, indent=4))

print(f"\\nSaved output to file '{zip_main_file_path}'.")

if __name__ == "__main__":
main()

 


Content backfill required

Reply