Skip to content

Instantly share code, notes, and snippets.

@gdlx
Last active February 28, 2025 07:52
Show Gist options
  • Save gdlx/c3bc07c45129cd116c07bfae4bf50642 to your computer and use it in GitHub Desktop.
Save gdlx/c3bc07c45129cd116c07bfae4bf50642 to your computer and use it in GitHub Desktop.
Pulumi Opsgenie Python Dynamic Provider for policies order and details + integrations alert filters
import os
from abc import ABCMeta
import pulumi
from pulumi.dynamic import ResourceProvider
from requests import Response
from requests_toolbelt.sessions import BaseUrlSession
class BaseProvider(ResourceProvider, metaclass=ABCMeta):
def __init__(self) -> None:
cfg = pulumi.Config()
og_cfg = pulumi.Config("opsgenie")
self.api_url = og_cfg.get("apiUrl") or os.getenv("OPSGENIE_API_URL")
self.api_key = og_cfg.get_secret("apiKey") or os.getenv("OPSGENIE_API_KEY")
self.team_id = cfg.get("ownerTeamId") or os.getenv("OPSGENIE_TEAM_ID")
def _session(self, team_id: str | None = None) -> BaseUrlSession:
session = BaseUrlSession(base_url=f"https://{self.api_url}/v2/")
session.headers.update({"Authorization": f"GenieKey {self.api_key}"})
if team_id:
session.params = {"teamId": team_id}
return session
def _check_response(self, response: Response) -> None:
if response.status_code != 200:
data = response.json()
message = data.get("message", "Unknown error")
message += f"\nRequest: {response.request.method} {response.request.url}"
message += f"\nResponse: {response.status_code} {response.reason}"
errors = data.get("errors", {})
if errors:
errors = [f"- {k}: {v}" for k, v in errors.items()]
message += "\nErrors:\n" + "\n".join(errors)
raise Exception(message)
from collections.abc import Mapping, Sequence
from typing import Any, Literal
MatchType = Literal["all", "always", "any"]
FilterType = Literal["default", "integration"]
FilterCondition = Mapping[str, Any]
class Filter:
"""
A class to create and manage filter conditions for OpsGenie.
This class handles different types of filter matching logic and their
associated conditions.
Attributes:
conditions (FilterConditions): The conditions to be applied in the filter
match (MatchType): The type of matching logic to be applied
Match Types:
- "always": Matches all (match-all)
- "all": Matches all conditions (match-all-conditions)
- "any": Matches any condition (match-any-condition)
"""
conditions: list[FilterCondition]
match: MatchType
type: FilterType
MATCH_TYPES = {
"always": "match-all",
"all": "match-all-conditions",
"any": "match-any-condition",
}
def __init__(
self,
*conditions: FilterCondition | list[FilterCondition],
match: MatchType | None = None,
filter_type: FilterType = "default",
) -> None:
if not isinstance(conditions, Sequence):
conditions = [conditions]
if match is None:
match = "all" if conditions else "always"
if match not in self.MATCH_TYPES:
raise ValueError(
f"Unknown filter type {match} "
+ f"(expected one of {'|'.join(self.MATCH_TYPES)})"
)
self.conditions = conditions
self.match = self.MATCH_TYPES[match]
self.type = filter_type
def as_dict(self) -> dict[str, Any]:
"""Returns the filter to a dictionary representation."""
match self.type:
case "default":
type_key = "type"
case "integration":
type_key = "conditionMatchType"
case _:
raise ValueError(f"Unknown filter type {self.type}")
if self.match == self.MATCH_TYPES["always"]:
return {type_key: self.match}
return {type_key: self.match, "conditions": self.conditions}
def as_list(self) -> list[dict[str, Any]]:
"""Returns the filter to a list representation."""
return [self.as_dict()]
from collections.abc import Mapping
from typing import Any
from pulumi import Input, Inputs, ResourceOptions
from pulumi.dynamic import CreateResult, DiffResult, ReadResult, Resource, UpdateResult
from pulumi_opsgenie import ApiIntegration
from helpers.filter import Filter
from ._base_provider import BaseProvider
INTEGRATION_EXTRA_FIELDS = {"alertFilter"}
class IntegrationExtrasProvider(BaseProvider):
def _get_integration(self, inputs: Inputs) -> dict[str, Any]:
response = self._session(inputs["team_id"]).get(
f"integrations/{inputs['integration_id']}"
)
self._check_response(response)
return response.json()["data"]
def _set_integration_extras(self, inputs: Inputs) -> Inputs:
updated_integration = current_integration = self._get_integration(inputs)
for field in INTEGRATION_EXTRA_FIELDS:
if not inputs.get(field) or current_integration[field] == inputs[field]:
continue
updated_integration.update({field: inputs[field]})
response = self._session(inputs["team_id"]).put(
f"integrations/{inputs['integration_id']}",
json=updated_integration,
)
self._check_response(response)
return self._integration_extras(inputs)
def _integration_extras(self, inputs: Inputs) -> Inputs:
integration = self._get_integration(inputs)
for field in INTEGRATION_EXTRA_FIELDS:
inputs[field] = integration[field]
return inputs
def create(self, inputs: Inputs) -> CreateResult:
return CreateResult(
inputs["integration_id"], self._set_integration_extras(inputs)
)
def update(self, id: str, olds: Inputs, inputs: Inputs) -> UpdateResult:
return UpdateResult(self._set_integration_extras(inputs))
def read(self, id, inputs: Inputs) -> ReadResult:
return ReadResult(id, self._integration_extras(inputs))
def diff(self, id: str, olds: Inputs, news: Inputs) -> DiffResult:
changes = []
for field in INTEGRATION_EXTRA_FIELDS:
if olds[field] != news[field]:
changes.append(field)
return DiffResult(changes=bool(changes))
class IntegrationExtras(Resource):
"""
A Pulumi custom resource that manages additional configuration for an
OpsGenie integration.
This resource allows setting up extra configurations like alert filters for
an existing OpsGenie integration.
Args:
integration (Input[ApiIntegration]): The OpsGenie integration to add
extras configuration to
alert_filter (Filter): Filter configuration for defining which alerts
should be processed
opts (ResourceOptions, optional): Resource options for the extras configuration
"""
def __init__(
self,
integration: Input[ApiIntegration],
alert_filter: Mapping[str, Any] | Filter,
opts: ResourceOptions | None = None,
) -> None:
alert_filter = self._normalize_alert_filter(alert_filter)
# Extras can't be deleted anyway
opts.protect = False
super().__init__(
IntegrationExtrasProvider(),
f"{integration._name}-extras",
{
"integration_id": integration.id,
"team_id": integration.owner_team_id,
"alertFilter": alert_filter,
},
opts,
)
def _normalize_alert_filter(
self, alert_filter: Mapping[str, Any] | Filter
) -> dict[str, Any]:
if isinstance(alert_filter, Filter):
alert_filter = alert_filter.as_dict()
if "conditions" in alert_filter:
for i, condition in enumerate(alert_filter["conditions"]):
condition["order"] = i
if "not" not in condition:
condition["not"] = False
return alert_filter
from collections.abc import Mapping
from typing import Any
from pulumi import Input, Inputs, ResourceOptions
from pulumi.dynamic import CreateResult, DiffResult, ReadResult, Resource, UpdateResult
from pulumi_opsgenie import AlertPolicy, NotificationPolicy
from ._base_provider import BaseProvider
POLICY_EXTRA_FIELDS = {"details"}
POLICY_TYPES = {AlertPolicy: "alert", NotificationPolicy: "notification"}
class PolicyExtrasProvider(BaseProvider):
def _get_all_policies(self, inputs: Inputs) -> dict[str, dict]:
response = self._session(inputs["team_id"]).get(
f"policies/{inputs['policy_type']}"
)
self._check_response(response)
data = response.json().get("data", [])
return {str(item["id"]): dict(item) for item in data}
def _get_policy(self, inputs: Inputs) -> dict[str, Any]:
response = self._session(inputs["team_id"]).get(
f"policies/{inputs['policy_id']}"
)
self._check_response(response)
return response.json()["data"]
def _get_policy_order(self, inputs: Inputs) -> int:
return self._get_all_policies(inputs).get(inputs["policy_id"], {}).get("order")
def _set_policy_extras(self, inputs: Inputs) -> Inputs:
updated_policy = current_policy = self._get_policy(inputs)
for field in POLICY_EXTRA_FIELDS:
if not inputs.get(field) or current_policy[field] == inputs[field]:
continue
updated_policy.update({field: inputs[field]})
response = self._session(inputs["team_id"]).put(
f"policies/{inputs['policy_id']}",
json=updated_policy,
)
self._check_response(response)
return {**inputs, **self._set_policy_order(inputs)}
def _set_policy_order(self, inputs: Inputs) -> Inputs:
if inputs["order"] is None:
return inputs
requested_order = int(inputs["order"])
response = self._session(inputs["team_id"]).post(
f"policies/{inputs['policy_id']}/change-order",
json={"targetIndex": int(requested_order)},
)
self._check_response(response)
# We don't put order in the `order` key because Opsgenie can set an
# order that may be different from the one requested in the
# configuration while being correct relatively to other managed
# policies, so we need to be able to compare requested and obtained
# orders independently.
# Check diff method for the comparison logic.
# The order originally requested in the configuration
inputs["requested_order"] = requested_order
# The order actually obtained after the change
inputs["obtained_order"] = self._get_policy_order(inputs)
return inputs
def _policy_extras(self, inputs: Inputs) -> Inputs:
policy = self._get_policy(inputs)
for field in POLICY_EXTRA_FIELDS:
inputs[field] = policy[field]
# Order is not part of the policy properties.
inputs["current_order"] = self._get_policy_order(inputs)
return inputs
def create(self, inputs: Inputs) -> CreateResult:
return CreateResult(inputs["policy_id"], self._set_policy_extras(inputs))
def update(self, id: str, olds: Inputs, inputs: Inputs) -> UpdateResult:
return UpdateResult(self._set_policy_extras(inputs))
def read(self, id, inputs: Inputs) -> ReadResult:
return ReadResult(id, self._policy_extras(inputs))
def diff(self, id: str, olds: Inputs, news: Inputs) -> DiffResult:
changes = []
for field in POLICY_EXTRA_FIELDS:
if olds[field] != news[field]:
changes.append(field)
if (
# Check if the order requested in the configuration has changed
olds["requested_order"] != news["order"]
) or (
# Check if the current live order is different from the one obtained
# after the last change.
# `current_order` is only set if `read` method has been called.
"current_order" in olds and olds["current_order"] != olds["obtained_order"]
):
changes.append("order")
return DiffResult(changes=bool(changes))
class PolicyExtras(Resource):
def __init__(
self,
policy: Input[AlertPolicy | NotificationPolicy],
order: Input[int | None] = None,
details: Input[Mapping[str, str] | None] = None,
opts: ResourceOptions | None = None,
) -> None:
super().__init__(
PolicyExtrasProvider(),
f"{policy._name}-extras",
{
"policy_type": POLICY_TYPES[type(policy)],
"policy_id": policy.id,
"team_id": policy.team_id,
"order": order,
"details": details,
},
opts,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment