Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Greengrass Implement subscription_definition APIs #5240

Merged
195 changes: 195 additions & 0 deletions moto/greengrass/models.py
@@ -1,6 +1,7 @@
import uuid
from collections import OrderedDict
from datetime import datetime
import re

from moto.core import BaseBackend, BaseModel, get_account_id
from moto.core.utils import BackendDict, iso_8601_datetime_with_milliseconds
Expand Down Expand Up @@ -220,6 +221,55 @@ def to_dict(self):
}


class FakeSubscriptionDefinition(BaseModel):
def __init__(self, region_name, name, initial_version):
self.region_name = region_name
self.id = str(uuid.uuid4())
self.arn = f"arn:aws:greengrass:{self.region_name}:{get_account_id()}:/greengrass/definition/subscriptions/{self.id}"
self.created_at_datetime = datetime.utcnow()
self.update_at_datetime = datetime.utcnow()
self.latest_version = ""
self.latest_version_arn = ""
self.name = name
self.initial_version = initial_version

def to_dict(self):
return {
"Arn": self.arn,
"CreationTimestamp": iso_8601_datetime_with_milliseconds(
self.created_at_datetime
),
"Id": self.id,
"LastUpdatedTimestamp": iso_8601_datetime_with_milliseconds(
self.update_at_datetime
),
"LatestVersion": self.latest_version,
"LatestVersionArn": self.latest_version_arn,
"Name": self.name,
}


class FakeSubscriptionDefinitionVersion(BaseModel):
def __init__(self, region_name, subscription_definition_id, subscriptions):
self.region_name = region_name
self.subscription_definition_id = subscription_definition_id
self.subscriptions = subscriptions
self.version = str(uuid.uuid4())
self.arn = f"arn:aws:greengrass:{self.region_name}:{get_account_id()}:/greengrass/definition/subscriptions/{self.subscription_definition_id}/versions/{self.version}"
self.created_at_datetime = datetime.utcnow()

def to_dict(self):
return {
"Arn": self.arn,
"CreationTimestamp": iso_8601_datetime_with_milliseconds(
self.created_at_datetime
),
"Definition": {"Subscriptions": self.subscriptions},
"Id": self.subscription_definition_id,
"Version": self.version,
}


class GreengrassBackend(BaseBackend):
def __init__(self, region_name, account_id):
super().__init__(region_name, account_id)
Expand Down Expand Up @@ -595,5 +645,150 @@ def get_function_definition_version(
function_definition_version_id
]

@staticmethod
def _is_valid_subscription_target_or_source(target_or_source):

if target_or_source in ["cloud", "GGShadowService"]:
return True

if re.match(
r"^arn:aws:iot:[a-zA-Z0-9-]+:[0-9]{12}:thing/[a-zA-Z0-9-]+$",
target_or_source,
):
return True

if re.match(
r"^arn:aws:lambda:[a-zA-Z0-9-]+:[0-9]{12}:function:[a-zA-Z0-9-_]+:[a-zA-Z0-9-_]+$",
target_or_source,
):
return True

return False

@staticmethod
def _validate_subscription_target_or_source(subscriptions):

target_errors = []
source_errors = []

for subscription in subscriptions:
subscription_id = subscription["Id"]
source = subscription["Source"]
target = subscription["Target"]

if not GreengrassBackend._is_valid_subscription_target_or_source(source):
target_errors.append(
f"Subscription source is invalid. ID is '{subscription_id}' and Source is '{source}'"
)

if not GreengrassBackend._is_valid_subscription_target_or_source(target):
target_errors.append(
f"Subscription target is invalid. ID is '{subscription_id}' and Target is '{target}'"
)

if source_errors:
error_msg = ", ".join(source_errors)
raise GreengrassClientError(
"400",
f"The subscriptions definition is invalid or corrupted. (ErrorDetails: [{error_msg}])",
)

if target_errors:
error_msg = ", ".join(target_errors)
raise GreengrassClientError(
"400",
f"The subscriptions definition is invalid or corrupted. (ErrorDetails: [{error_msg}])",
)

def create_subscription_definition(self, name, initial_version):

GreengrassBackend._validate_subscription_target_or_source(
initial_version["Subscriptions"]
)

sub_def = FakeSubscriptionDefinition(self.region_name, name, initial_version)
self.subscription_definitions[sub_def.id] = sub_def
init_ver = sub_def.initial_version
subscriptions = init_ver.get("Subscriptions", {})
sub_def_ver = self.create_subscription_definition_version(
sub_def.id, subscriptions
)

sub_def.latest_version = sub_def_ver.version
sub_def.latest_version_arn = sub_def_ver.arn
return sub_def

def list_subscription_definitions(self):
return self.subscription_definitions.values()

def get_subscription_definition(self, subscription_definition_id):

if subscription_definition_id not in self.subscription_definitions:
raise IdNotFoundException(
"That Subscription List Definition does not exist."
)
return self.subscription_definitions[subscription_definition_id]

def delete_subscription_definition(self, subscription_definition_id):
if subscription_definition_id not in self.subscription_definitions:
raise IdNotFoundException("That subscriptions definition does not exist.")
del self.subscription_definitions[subscription_definition_id]
del self.subscription_definition_versions[subscription_definition_id]

def update_subscription_definition(self, subscription_definition_id, name):

if name == "":
raise InvalidContainerDefinitionException(
"Input does not contain any attributes to be updated"
)
if subscription_definition_id not in self.subscription_definitions:
raise IdNotFoundException("That subscriptions definition does not exist.")
self.subscription_definitions[subscription_definition_id].name = name

def create_subscription_definition_version(
self, subscription_definition_id, subscriptions
):

GreengrassBackend._validate_subscription_target_or_source(subscriptions)

if subscription_definition_id not in self.subscription_definitions:
raise IdNotFoundException("That subscriptions does not exist.")

sub_def_ver = FakeSubscriptionDefinitionVersion(
self.region_name, subscription_definition_id, subscriptions
)

sub_vers = self.subscription_definition_versions.get(
subscription_definition_id, {}
)
sub_vers[sub_def_ver.version] = sub_def_ver
self.subscription_definition_versions[subscription_definition_id] = sub_vers

return sub_def_ver

def list_subscription_definition_versions(self, subscription_definition_id):
if subscription_definition_id not in self.subscription_definition_versions:
raise IdNotFoundException("That subscriptions definition does not exist.")
return self.subscription_definition_versions[subscription_definition_id]

def get_subscription_definition_version(
self, subscription_definition_id, subscription_definition_version_id
):

if subscription_definition_id not in self.subscription_definitions:
raise IdNotFoundException("That subscriptions definition does not exist.")

if (
subscription_definition_version_id
not in self.subscription_definition_versions[subscription_definition_id]
):
raise VersionNotFoundException(
f"Version {subscription_definition_version_id} of Subscription List Definition {subscription_definition_id} does not exist."
)

return self.subscription_definition_versions[subscription_definition_id][
subscription_definition_version_id
]


greengrass_backends = BackendDict(GreengrassBackend, "greengrass")
110 changes: 110 additions & 0 deletions moto/greengrass/responses.py
Expand Up @@ -448,3 +448,113 @@ def get_function_definition_version(self):
function_definition_version_id=function_definition_version_id,
)
return 200, {"status": 200}, json.dumps(res.to_dict())

def subscription_definitions(self, request, full_url, headers):
self.setup_class(request, full_url, headers)

if self.method == "POST":
return self.create_subscription_definition()

if self.method == "GET":
return self.list_subscription_definitions()

def create_subscription_definition(self):

initial_version = self._get_param("InitialVersion")
name = self._get_param("Name")
res = self.greengrass_backend.create_subscription_definition(
name=name, initial_version=initial_version
)
return 201, {"status": 201}, json.dumps(res.to_dict())

def list_subscription_definitions(self):

res = self.greengrass_backend.list_subscription_definitions()
return (
200,
{"status": 200},
json.dumps(
{
"Definitions": [
subscription_definition.to_dict()
for subscription_definition in res
]
}
),
)

def subscription_definition(self, request, full_url, headers):
self.setup_class(request, full_url, headers)

if self.method == "GET":
return self.get_subscription_definition()

if self.method == "DELETE":
return self.delete_subscription_definition()

if self.method == "PUT":
return self.update_subscription_definition()

def get_subscription_definition(self):
subscription_definition_id = self.path.split("/")[-1]
res = self.greengrass_backend.get_subscription_definition(
subscription_definition_id=subscription_definition_id
)
return 200, {"status": 200}, json.dumps(res.to_dict())

def delete_subscription_definition(self):
subscription_definition_id = self.path.split("/")[-1]
self.greengrass_backend.delete_subscription_definition(
subscription_definition_id=subscription_definition_id
)
return 200, {"status": 200}, json.dumps({})

def update_subscription_definition(self):
subscription_definition_id = self.path.split("/")[-1]
name = self._get_param("Name")
self.greengrass_backend.update_subscription_definition(
subscription_definition_id=subscription_definition_id, name=name
)
return 200, {"status": 200}, json.dumps({})

def subscription_definition_versions(self, request, full_url, headers):
self.setup_class(request, full_url, headers)

if self.method == "POST":
return self.create_subscription_definition_version()

if self.method == "GET":
return self.list_subscription_definition_versions()

def create_subscription_definition_version(self):

subscription_definition_id = self.path.split("/")[-2]
subscriptions = self._get_param("Subscriptions")
res = self.greengrass_backend.create_subscription_definition_version(
subscription_definition_id=subscription_definition_id,
subscriptions=subscriptions,
)
return 201, {"status": 201}, json.dumps(res.to_dict())

def list_subscription_definition_versions(self):
subscription_definition_id = self.path.split("/")[-2]
res = self.greengrass_backend.list_subscription_definition_versions(
subscription_definition_id=subscription_definition_id
)
versions = [i.to_dict() for i in res.values()]
return 200, {"status": 200}, json.dumps({"Versions": versions})

def subscription_definition_version(self, request, full_url, headers):
self.setup_class(request, full_url, headers)

if self.method == "GET":
return self.get_subscription_definition_version()

def get_subscription_definition_version(self):
subscription_definition_id = self.path.split("/")[-3]
subscription_definition_version_id = self.path.split("/")[-1]
res = self.greengrass_backend.get_subscription_definition_version(
subscription_definition_id=subscription_definition_id,
subscription_definition_version_id=subscription_definition_version_id,
)
return 200, {"status": 200}, json.dumps(res.to_dict())
4 changes: 4 additions & 0 deletions moto/greengrass/urls.py
Expand Up @@ -24,5 +24,9 @@
"{0}/greengrass/definition/resources$": response.resource_definitions,
"{0}/greengrass/definition/resources/(?P<definition_id>[^/]+)/?$": response.resource_definition,
"{0}/greengrass/definition/resources/(?P<definition_id>[^/]+)/versions$": response.resource_definition_versions,
"{0}/greengrass/definition/subscriptions$": response.subscription_definitions,
"{0}/greengrass/definition/subscriptions/(?P<definition_id>[^/]+)/?$": response.subscription_definition,
"{0}/greengrass/definition/subscriptions/(?P<definition_id>[^/]+)/versions$": response.subscription_definition_versions,
"{0}/greengrass/definition/subscriptions/(?P<definition_id>[^/]+)/versions/(?P<definition_version_id>[^/]+)/?$": response.subscription_definition_version,
"{0}/greengrass/definition/resources/(?P<definition_id>[^/]+)/versions/(?P<definition_version_id>[^/]+)/?$": response.resource_definition_version,
}