Skip to content

Commit

Permalink
WAFv2: Add support for IP sets and logging configuration (#7680)
Browse files Browse the repository at this point in the history
  • Loading branch information
pinzon committed May 8, 2024
1 parent fd3563a commit b21ae7c
Show file tree
Hide file tree
Showing 5 changed files with 561 additions and 3 deletions.
8 changes: 8 additions & 0 deletions moto/wafv2/exceptions.py
Expand Up @@ -19,3 +19,11 @@ def __init__(self) -> None:
"WAFNonexistentItemException",
"AWS WAF couldn’t perform the operation because your resource doesn’t exist.",
)


class WAFOptimisticLockException(WAFv2ClientError):
def __init__(self) -> None:
super().__init__(
"WAFOptimisticLockException",
"AWS WAF couldn’t save your changes because someone changed the resource after you started to edit it. Reapply your changes.",
)
213 changes: 211 additions & 2 deletions moto/wafv2/models.py
Expand Up @@ -8,8 +8,12 @@
from moto.moto_api._internal import mock_random
from moto.utilities.tagging_service import TaggingService

from .exceptions import WAFNonexistentItemException, WAFV2DuplicateItemException
from .utils import make_arn_for_wacl
from .exceptions import (
WAFNonexistentItemException,
WAFOptimisticLockException,
WAFV2DuplicateItemException,
)
from .utils import make_arn_for_ip_set, make_arn_for_wacl

if TYPE_CHECKING:
from moto.apigateway.models import Stage
Expand Down Expand Up @@ -79,6 +83,75 @@ def to_dict(self) -> Dict[str, Any]:
}


class FakeIPSet(BaseModel):
"""
https://docs.aws.amazon.com/waf/latest/APIReference/API_IPSet.html
"""

def __init__(
self,
arn: str,
ip_set_id: str,
ip_address_version: str,
addresses: List[str],
name: str,
description: str,
scope: str,
):
self.name = name
self.ip_set_id = ip_set_id
self.arn = arn
self.addresses = addresses
self.description = description
self.ip_address_version = ip_address_version
self.scope = scope

self.lock_token = str(mock_random.uuid4())[0:6]

def update(self, description: Optional[str], addresses: List[str]) -> None:
if description is not None:
self.description = description
self.addresses = addresses

self.lock_token = str(mock_random.uuid4())[0:6]

def to_dict(self) -> Dict[str, Any]:
return {
"Name": self.name,
"Id": self.ip_set_id,
"ARN": self.arn,
"Description": self.description,
"IPAddressVersion": self.ip_address_version,
"Addresses": self.addresses,
"LockToken": self.lock_token,
}


class FakeLoggingConfiguration:
def __init__(
self,
arn: str,
log_destination_configs: List[str],
redacted_fields: Optional[Dict[str, Any]],
managed_gy_firewall_manager: Optional[bool],
logging_filter: Optional[Dict[str, Any]],
):
self.arn = arn
self.log_destination_configs = log_destination_configs
self.redacted_fields = redacted_fields
self.managed_by_firewall_manager = managed_gy_firewall_manager or False
self.logging_filter = logging_filter

def to_dict(self) -> Dict[str, Any]:
return {
"ResourceArn": self.arn,
"LogDestinationConfigs": self.log_destination_configs,
"RedactedFields": self.redacted_fields,
"ManagedByFirewallManager": self.managed_by_firewall_manager,
"LoggingFilter": self.logging_filter,
}


class WAFV2Backend(BaseBackend):
"""
https://docs.aws.amazon.com/waf/latest/APIReference/API_Operations_AWS_WAFV2.html
Expand All @@ -87,6 +160,8 @@ class WAFV2Backend(BaseBackend):
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.wacls: Dict[str, FakeWebACL] = OrderedDict()
self.ip_sets: Dict[str, FakeIPSet] = OrderedDict()
self.logging_configurations: Dict[str, FakeLoggingConfiguration] = OrderedDict()
self.tagging_service = TaggingService()
# TODO: self.load_balancers = OrderedDict()

Expand Down Expand Up @@ -210,6 +285,140 @@ def update_web_acl(
acl.update(default_action, rules, description, visibility_config)
return acl.lock_token

def create_ip_set(
self,
name: str,
scope: str,
description: str,
ip_address_version: str,
addresses: List[str],
tags: List[Dict[str, str]],
) -> FakeIPSet:
ip_set_id = str(mock_random.uuid4())
arn = make_arn_for_ip_set(
name=name,
account_id=self.account_id,
region_name=self.region_name,
_id=ip_set_id,
scope=scope,
)

new_ip_set = FakeIPSet(
arn,
ip_set_id,
ip_address_version,
addresses,
name,
description,
scope,
)
self.ip_sets[arn] = new_ip_set
self.tag_resource(arn, tags)
return new_ip_set

def delete_ip_set(self, name: str, scope: str, _id: str, lock_token: str) -> None:
arn = make_arn_for_ip_set(
name=name,
account_id=self.account_id,
region_name=self.region_name,
_id=_id,
scope=scope,
)

if arn not in self.ip_sets:
raise WAFNonexistentItemException()

if lock_token != self.ip_sets[arn].lock_token:
raise WAFOptimisticLockException()

self.ip_sets.pop(arn)

def list_ip_sets(self, scope: str) -> List[FakeIPSet]:
ip_sets = [
ip_set for arn, ip_set in self.ip_sets.items() if ip_set.scope == scope
]
return ip_sets

def get_ip_set(self, name: str, scope: str, _id: str) -> FakeIPSet:
arn = make_arn_for_ip_set(
name=name,
account_id=self.account_id,
region_name=self.region_name,
_id=_id,
scope=scope,
)
if arn not in self.ip_sets:
raise WAFNonexistentItemException()

return self.ip_sets[arn]

def update_ip_set(
self,
name: str,
scope: str,
_id: str,
description: Optional[str],
addresses: List[str],
lock_token: str,
) -> FakeIPSet:
arn = make_arn_for_ip_set(
name=name,
account_id=self.account_id,
region_name=self.region_name,
_id=_id,
scope=scope,
)

if not (ip_set := self.ip_sets.get(arn)):
raise WAFNonexistentItemException()

if ip_set.lock_token != lock_token:
raise WAFOptimisticLockException()

ip_set.update(description, addresses)

return ip_set

def put_logging_configuration(
self,
arn: str,
log_destination_configs: List[str],
redacted_fields: Optional[Dict[str, Any]],
managed_gy_firewall_manager: bool,
logging_filter: Dict[str, Any],
) -> FakeLoggingConfiguration:
logging_configuration = FakeLoggingConfiguration(
arn,
log_destination_configs,
redacted_fields,
managed_gy_firewall_manager,
logging_filter,
)
self.logging_configurations[arn] = logging_configuration
return logging_configuration

def delete_logging_configuration(self, arn: str) -> None:
if not self.logging_configurations.get(arn):
raise WAFNonexistentItemException()
self.logging_configurations.pop(arn)

def get_logging_configuration(self, arn: str) -> FakeLoggingConfiguration:
if not (logging_configuration := self.logging_configurations.get(arn)):
raise WAFNonexistentItemException()
return logging_configuration

def list_logging_configurations(self, scope: str) -> List[FakeLoggingConfiguration]:
if scope == "CLOUDFRONT":
scope = "global"
else:
scope = self.region_name

return [
logging_configuration
for arn, logging_configuration in self.logging_configurations.items()
if f":{scope}:" in arn
]


wafv2_backends = BackendDict(
WAFV2Backend, "waf-regional", additional_regions=["global"]
Expand Down

0 comments on commit b21ae7c

Please sign in to comment.