Last active
February 28, 2025 07:52
-
-
Save gdlx/c3bc07c45129cd116c07bfae4bf50642 to your computer and use it in GitHub Desktop.
Pulumi Opsgenie Python Dynamic Provider for policies order and details + integrations alert filters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from abc import ABCMeta | |
import pulumi | |
from pulumi.dynamic import ResourceProvider | |
from requests import Response | |
from requests_toolbelt.sessions import BaseUrlSession | |
class BaseProvider(ResourceProvider, metaclass=ABCMeta): | |
def __init__(self) -> None: | |
cfg = pulumi.Config() | |
og_cfg = pulumi.Config("opsgenie") | |
self.api_url = og_cfg.get("apiUrl") or os.getenv("OPSGENIE_API_URL") | |
self.api_key = og_cfg.get_secret("apiKey") or os.getenv("OPSGENIE_API_KEY") | |
self.team_id = cfg.get("ownerTeamId") or os.getenv("OPSGENIE_TEAM_ID") | |
def _session(self, team_id: str | None = None) -> BaseUrlSession: | |
session = BaseUrlSession(base_url=f"https://{self.api_url}/v2/") | |
session.headers.update({"Authorization": f"GenieKey {self.api_key}"}) | |
if team_id: | |
session.params = {"teamId": team_id} | |
return session | |
def _check_response(self, response: Response) -> None: | |
if response.status_code != 200: | |
data = response.json() | |
message = data.get("message", "Unknown error") | |
message += f"\nRequest: {response.request.method} {response.request.url}" | |
message += f"\nResponse: {response.status_code} {response.reason}" | |
errors = data.get("errors", {}) | |
if errors: | |
errors = [f"- {k}: {v}" for k, v in errors.items()] | |
message += "\nErrors:\n" + "\n".join(errors) | |
raise Exception(message) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections.abc import Mapping, Sequence | |
from typing import Any, Literal | |
MatchType = Literal["all", "always", "any"] | |
FilterType = Literal["default", "integration"] | |
FilterCondition = Mapping[str, Any] | |
class Filter: | |
""" | |
A class to create and manage filter conditions for OpsGenie. | |
This class handles different types of filter matching logic and their | |
associated conditions. | |
Attributes: | |
conditions (FilterConditions): The conditions to be applied in the filter | |
match (MatchType): The type of matching logic to be applied | |
Match Types: | |
- "always": Matches all (match-all) | |
- "all": Matches all conditions (match-all-conditions) | |
- "any": Matches any condition (match-any-condition) | |
""" | |
conditions: list[FilterCondition] | |
match: MatchType | |
type: FilterType | |
MATCH_TYPES = { | |
"always": "match-all", | |
"all": "match-all-conditions", | |
"any": "match-any-condition", | |
} | |
def __init__( | |
self, | |
*conditions: FilterCondition | list[FilterCondition], | |
match: MatchType | None = None, | |
filter_type: FilterType = "default", | |
) -> None: | |
if not isinstance(conditions, Sequence): | |
conditions = [conditions] | |
if match is None: | |
match = "all" if conditions else "always" | |
if match not in self.MATCH_TYPES: | |
raise ValueError( | |
f"Unknown filter type {match} " | |
+ f"(expected one of {'|'.join(self.MATCH_TYPES)})" | |
) | |
self.conditions = conditions | |
self.match = self.MATCH_TYPES[match] | |
self.type = filter_type | |
def as_dict(self) -> dict[str, Any]: | |
"""Returns the filter to a dictionary representation.""" | |
match self.type: | |
case "default": | |
type_key = "type" | |
case "integration": | |
type_key = "conditionMatchType" | |
case _: | |
raise ValueError(f"Unknown filter type {self.type}") | |
if self.match == self.MATCH_TYPES["always"]: | |
return {type_key: self.match} | |
return {type_key: self.match, "conditions": self.conditions} | |
def as_list(self) -> list[dict[str, Any]]: | |
"""Returns the filter to a list representation.""" | |
return [self.as_dict()] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections.abc import Mapping | |
from typing import Any | |
from pulumi import Input, Inputs, ResourceOptions | |
from pulumi.dynamic import CreateResult, DiffResult, ReadResult, Resource, UpdateResult | |
from pulumi_opsgenie import ApiIntegration | |
from helpers.filter import Filter | |
from ._base_provider import BaseProvider | |
INTEGRATION_EXTRA_FIELDS = {"alertFilter"} | |
class IntegrationExtrasProvider(BaseProvider): | |
def _get_integration(self, inputs: Inputs) -> dict[str, Any]: | |
response = self._session(inputs["team_id"]).get( | |
f"integrations/{inputs['integration_id']}" | |
) | |
self._check_response(response) | |
return response.json()["data"] | |
def _set_integration_extras(self, inputs: Inputs) -> Inputs: | |
updated_integration = current_integration = self._get_integration(inputs) | |
for field in INTEGRATION_EXTRA_FIELDS: | |
if not inputs.get(field) or current_integration[field] == inputs[field]: | |
continue | |
updated_integration.update({field: inputs[field]}) | |
response = self._session(inputs["team_id"]).put( | |
f"integrations/{inputs['integration_id']}", | |
json=updated_integration, | |
) | |
self._check_response(response) | |
return self._integration_extras(inputs) | |
def _integration_extras(self, inputs: Inputs) -> Inputs: | |
integration = self._get_integration(inputs) | |
for field in INTEGRATION_EXTRA_FIELDS: | |
inputs[field] = integration[field] | |
return inputs | |
def create(self, inputs: Inputs) -> CreateResult: | |
return CreateResult( | |
inputs["integration_id"], self._set_integration_extras(inputs) | |
) | |
def update(self, id: str, olds: Inputs, inputs: Inputs) -> UpdateResult: | |
return UpdateResult(self._set_integration_extras(inputs)) | |
def read(self, id, inputs: Inputs) -> ReadResult: | |
return ReadResult(id, self._integration_extras(inputs)) | |
def diff(self, id: str, olds: Inputs, news: Inputs) -> DiffResult: | |
changes = [] | |
for field in INTEGRATION_EXTRA_FIELDS: | |
if olds[field] != news[field]: | |
changes.append(field) | |
return DiffResult(changes=bool(changes)) | |
class IntegrationExtras(Resource): | |
""" | |
A Pulumi custom resource that manages additional configuration for an | |
OpsGenie integration. | |
This resource allows setting up extra configurations like alert filters for | |
an existing OpsGenie integration. | |
Args: | |
integration (Input[ApiIntegration]): The OpsGenie integration to add | |
extras configuration to | |
alert_filter (Filter): Filter configuration for defining which alerts | |
should be processed | |
opts (ResourceOptions, optional): Resource options for the extras configuration | |
""" | |
def __init__( | |
self, | |
integration: Input[ApiIntegration], | |
alert_filter: Mapping[str, Any] | Filter, | |
opts: ResourceOptions | None = None, | |
) -> None: | |
alert_filter = self._normalize_alert_filter(alert_filter) | |
# Extras can't be deleted anyway | |
opts.protect = False | |
super().__init__( | |
IntegrationExtrasProvider(), | |
f"{integration._name}-extras", | |
{ | |
"integration_id": integration.id, | |
"team_id": integration.owner_team_id, | |
"alertFilter": alert_filter, | |
}, | |
opts, | |
) | |
def _normalize_alert_filter( | |
self, alert_filter: Mapping[str, Any] | Filter | |
) -> dict[str, Any]: | |
if isinstance(alert_filter, Filter): | |
alert_filter = alert_filter.as_dict() | |
if "conditions" in alert_filter: | |
for i, condition in enumerate(alert_filter["conditions"]): | |
condition["order"] = i | |
if "not" not in condition: | |
condition["not"] = False | |
return alert_filter |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections.abc import Mapping | |
from typing import Any | |
from pulumi import Input, Inputs, ResourceOptions | |
from pulumi.dynamic import CreateResult, DiffResult, ReadResult, Resource, UpdateResult | |
from pulumi_opsgenie import AlertPolicy, NotificationPolicy | |
from ._base_provider import BaseProvider | |
POLICY_EXTRA_FIELDS = {"details"} | |
POLICY_TYPES = {AlertPolicy: "alert", NotificationPolicy: "notification"} | |
class PolicyExtrasProvider(BaseProvider): | |
def _get_all_policies(self, inputs: Inputs) -> dict[str, dict]: | |
response = self._session(inputs["team_id"]).get( | |
f"policies/{inputs['policy_type']}" | |
) | |
self._check_response(response) | |
data = response.json().get("data", []) | |
return {str(item["id"]): dict(item) for item in data} | |
def _get_policy(self, inputs: Inputs) -> dict[str, Any]: | |
response = self._session(inputs["team_id"]).get( | |
f"policies/{inputs['policy_id']}" | |
) | |
self._check_response(response) | |
return response.json()["data"] | |
def _get_policy_order(self, inputs: Inputs) -> int: | |
return self._get_all_policies(inputs).get(inputs["policy_id"], {}).get("order") | |
def _set_policy_extras(self, inputs: Inputs) -> Inputs: | |
updated_policy = current_policy = self._get_policy(inputs) | |
for field in POLICY_EXTRA_FIELDS: | |
if not inputs.get(field) or current_policy[field] == inputs[field]: | |
continue | |
updated_policy.update({field: inputs[field]}) | |
response = self._session(inputs["team_id"]).put( | |
f"policies/{inputs['policy_id']}", | |
json=updated_policy, | |
) | |
self._check_response(response) | |
return {**inputs, **self._set_policy_order(inputs)} | |
def _set_policy_order(self, inputs: Inputs) -> Inputs: | |
if inputs["order"] is None: | |
return inputs | |
requested_order = int(inputs["order"]) | |
response = self._session(inputs["team_id"]).post( | |
f"policies/{inputs['policy_id']}/change-order", | |
json={"targetIndex": int(requested_order)}, | |
) | |
self._check_response(response) | |
# We don't put order in the `order` key because Opsgenie can set an | |
# order that may be different from the one requested in the | |
# configuration while being correct relatively to other managed | |
# policies, so we need to be able to compare requested and obtained | |
# orders independently. | |
# Check diff method for the comparison logic. | |
# The order originally requested in the configuration | |
inputs["requested_order"] = requested_order | |
# The order actually obtained after the change | |
inputs["obtained_order"] = self._get_policy_order(inputs) | |
return inputs | |
def _policy_extras(self, inputs: Inputs) -> Inputs: | |
policy = self._get_policy(inputs) | |
for field in POLICY_EXTRA_FIELDS: | |
inputs[field] = policy[field] | |
# Order is not part of the policy properties. | |
inputs["current_order"] = self._get_policy_order(inputs) | |
return inputs | |
def create(self, inputs: Inputs) -> CreateResult: | |
return CreateResult(inputs["policy_id"], self._set_policy_extras(inputs)) | |
def update(self, id: str, olds: Inputs, inputs: Inputs) -> UpdateResult: | |
return UpdateResult(self._set_policy_extras(inputs)) | |
def read(self, id, inputs: Inputs) -> ReadResult: | |
return ReadResult(id, self._policy_extras(inputs)) | |
def diff(self, id: str, olds: Inputs, news: Inputs) -> DiffResult: | |
changes = [] | |
for field in POLICY_EXTRA_FIELDS: | |
if olds[field] != news[field]: | |
changes.append(field) | |
if ( | |
# Check if the order requested in the configuration has changed | |
olds["requested_order"] != news["order"] | |
) or ( | |
# Check if the current live order is different from the one obtained | |
# after the last change. | |
# `current_order` is only set if `read` method has been called. | |
"current_order" in olds and olds["current_order"] != olds["obtained_order"] | |
): | |
changes.append("order") | |
return DiffResult(changes=bool(changes)) | |
class PolicyExtras(Resource): | |
def __init__( | |
self, | |
policy: Input[AlertPolicy | NotificationPolicy], | |
order: Input[int | None] = None, | |
details: Input[Mapping[str, str] | None] = None, | |
opts: ResourceOptions | None = None, | |
) -> None: | |
super().__init__( | |
PolicyExtrasProvider(), | |
f"{policy._name}-extras", | |
{ | |
"policy_type": POLICY_TYPES[type(policy)], | |
"policy_id": policy.id, | |
"team_id": policy.team_id, | |
"order": order, | |
"details": details, | |
}, | |
opts, | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment