Skip to content

Commit

Permalink
Greengrass Implement subscription_definition APIs (#5240)
Browse files Browse the repository at this point in the history
  • Loading branch information
cm-iwata committed Jun 17, 2022
1 parent a44a9df commit 62923af
Show file tree
Hide file tree
Showing 4 changed files with 905 additions and 0 deletions.
195 changes: 195 additions & 0 deletions moto/greengrass/models.py
@@ -1,6 +1,7 @@
import uuid
from collections import OrderedDict
from datetime import datetime
import re

from moto.core import BaseBackend, BaseModel, get_account_id
from moto.core.utils import BackendDict, iso_8601_datetime_with_milliseconds
Expand Down Expand Up @@ -220,6 +221,55 @@ def to_dict(self):
}


class FakeSubscriptionDefinition(BaseModel):
def __init__(self, region_name, name, initial_version):
self.region_name = region_name
self.id = str(uuid.uuid4())
self.arn = f"arn:aws:greengrass:{self.region_name}:{get_account_id()}:/greengrass/definition/subscriptions/{self.id}"
self.created_at_datetime = datetime.utcnow()
self.update_at_datetime = datetime.utcnow()
self.latest_version = ""
self.latest_version_arn = ""
self.name = name
self.initial_version = initial_version

def to_dict(self):
return {
"Arn": self.arn,
"CreationTimestamp": iso_8601_datetime_with_milliseconds(
self.created_at_datetime
),
"Id": self.id,
"LastUpdatedTimestamp": iso_8601_datetime_with_milliseconds(
self.update_at_datetime
),
"LatestVersion": self.latest_version,
"LatestVersionArn": self.latest_version_arn,
"Name": self.name,
}


class FakeSubscriptionDefinitionVersion(BaseModel):
def __init__(self, region_name, subscription_definition_id, subscriptions):
self.region_name = region_name
self.subscription_definition_id = subscription_definition_id
self.subscriptions = subscriptions
self.version = str(uuid.uuid4())
self.arn = f"arn:aws:greengrass:{self.region_name}:{get_account_id()}:/greengrass/definition/subscriptions/{self.subscription_definition_id}/versions/{self.version}"
self.created_at_datetime = datetime.utcnow()

def to_dict(self):
return {
"Arn": self.arn,
"CreationTimestamp": iso_8601_datetime_with_milliseconds(
self.created_at_datetime
),
"Definition": {"Subscriptions": self.subscriptions},
"Id": self.subscription_definition_id,
"Version": self.version,
}


class GreengrassBackend(BaseBackend):
def __init__(self, region_name, account_id):
super().__init__(region_name, account_id)
Expand Down Expand Up @@ -595,5 +645,150 @@ def get_function_definition_version(
function_definition_version_id
]

@staticmethod
def _is_valid_subscription_target_or_source(target_or_source):

if target_or_source in ["cloud", "GGShadowService"]:
return True

if re.match(
r"^arn:aws:iot:[a-zA-Z0-9-]+:[0-9]{12}:thing/[a-zA-Z0-9-]+$",
target_or_source,
):
return True

if re.match(
r"^arn:aws:lambda:[a-zA-Z0-9-]+:[0-9]{12}:function:[a-zA-Z0-9-_]+:[a-zA-Z0-9-_]+$",
target_or_source,
):
return True

return False

@staticmethod
def _validate_subscription_target_or_source(subscriptions):

target_errors = []
source_errors = []

for subscription in subscriptions:
subscription_id = subscription["Id"]
source = subscription["Source"]
target = subscription["Target"]

if not GreengrassBackend._is_valid_subscription_target_or_source(source):
target_errors.append(
f"Subscription source is invalid. ID is '{subscription_id}' and Source is '{source}'"
)

if not GreengrassBackend._is_valid_subscription_target_or_source(target):
target_errors.append(
f"Subscription target is invalid. ID is '{subscription_id}' and Target is '{target}'"
)

if source_errors:
error_msg = ", ".join(source_errors)
raise GreengrassClientError(
"400",
f"The subscriptions definition is invalid or corrupted. (ErrorDetails: [{error_msg}])",
)

if target_errors:
error_msg = ", ".join(target_errors)
raise GreengrassClientError(
"400",
f"The subscriptions definition is invalid or corrupted. (ErrorDetails: [{error_msg}])",
)

def create_subscription_definition(self, name, initial_version):

GreengrassBackend._validate_subscription_target_or_source(
initial_version["Subscriptions"]
)

sub_def = FakeSubscriptionDefinition(self.region_name, name, initial_version)
self.subscription_definitions[sub_def.id] = sub_def
init_ver = sub_def.initial_version
subscriptions = init_ver.get("Subscriptions", {})
sub_def_ver = self.create_subscription_definition_version(
sub_def.id, subscriptions
)

sub_def.latest_version = sub_def_ver.version
sub_def.latest_version_arn = sub_def_ver.arn
return sub_def

def list_subscription_definitions(self):
return self.subscription_definitions.values()

def get_subscription_definition(self, subscription_definition_id):

if subscription_definition_id not in self.subscription_definitions:
raise IdNotFoundException(
"That Subscription List Definition does not exist."
)
return self.subscription_definitions[subscription_definition_id]

def delete_subscription_definition(self, subscription_definition_id):
if subscription_definition_id not in self.subscription_definitions:
raise IdNotFoundException("That subscriptions definition does not exist.")
del self.subscription_definitions[subscription_definition_id]
del self.subscription_definition_versions[subscription_definition_id]

def update_subscription_definition(self, subscription_definition_id, name):

if name == "":
raise InvalidContainerDefinitionException(
"Input does not contain any attributes to be updated"
)
if subscription_definition_id not in self.subscription_definitions:
raise IdNotFoundException("That subscriptions definition does not exist.")
self.subscription_definitions[subscription_definition_id].name = name

def create_subscription_definition_version(
self, subscription_definition_id, subscriptions
):

GreengrassBackend._validate_subscription_target_or_source(subscriptions)

if subscription_definition_id not in self.subscription_definitions:
raise IdNotFoundException("That subscriptions does not exist.")

sub_def_ver = FakeSubscriptionDefinitionVersion(
self.region_name, subscription_definition_id, subscriptions
)

sub_vers = self.subscription_definition_versions.get(
subscription_definition_id, {}
)
sub_vers[sub_def_ver.version] = sub_def_ver
self.subscription_definition_versions[subscription_definition_id] = sub_vers

return sub_def_ver

def list_subscription_definition_versions(self, subscription_definition_id):
if subscription_definition_id not in self.subscription_definition_versions:
raise IdNotFoundException("That subscriptions definition does not exist.")
return self.subscription_definition_versions[subscription_definition_id]

def get_subscription_definition_version(
self, subscription_definition_id, subscription_definition_version_id
):

if subscription_definition_id not in self.subscription_definitions:
raise IdNotFoundException("That subscriptions definition does not exist.")

if (
subscription_definition_version_id
not in self.subscription_definition_versions[subscription_definition_id]
):
raise VersionNotFoundException(
f"Version {subscription_definition_version_id} of Subscription List Definition {subscription_definition_id} does not exist."
)

return self.subscription_definition_versions[subscription_definition_id][
subscription_definition_version_id
]


greengrass_backends = BackendDict(GreengrassBackend, "greengrass")
110 changes: 110 additions & 0 deletions moto/greengrass/responses.py
Expand Up @@ -448,3 +448,113 @@ def get_function_definition_version(self):
function_definition_version_id=function_definition_version_id,
)
return 200, {"status": 200}, json.dumps(res.to_dict())

def subscription_definitions(self, request, full_url, headers):
self.setup_class(request, full_url, headers)

if self.method == "POST":
return self.create_subscription_definition()

if self.method == "GET":
return self.list_subscription_definitions()

def create_subscription_definition(self):

initial_version = self._get_param("InitialVersion")
name = self._get_param("Name")
res = self.greengrass_backend.create_subscription_definition(
name=name, initial_version=initial_version
)
return 201, {"status": 201}, json.dumps(res.to_dict())

def list_subscription_definitions(self):

res = self.greengrass_backend.list_subscription_definitions()
return (
200,
{"status": 200},
json.dumps(
{
"Definitions": [
subscription_definition.to_dict()
for subscription_definition in res
]
}
),
)

def subscription_definition(self, request, full_url, headers):
self.setup_class(request, full_url, headers)

if self.method == "GET":
return self.get_subscription_definition()

if self.method == "DELETE":
return self.delete_subscription_definition()

if self.method == "PUT":
return self.update_subscription_definition()

def get_subscription_definition(self):
subscription_definition_id = self.path.split("/")[-1]
res = self.greengrass_backend.get_subscription_definition(
subscription_definition_id=subscription_definition_id
)
return 200, {"status": 200}, json.dumps(res.to_dict())

def delete_subscription_definition(self):
subscription_definition_id = self.path.split("/")[-1]
self.greengrass_backend.delete_subscription_definition(
subscription_definition_id=subscription_definition_id
)
return 200, {"status": 200}, json.dumps({})

def update_subscription_definition(self):
subscription_definition_id = self.path.split("/")[-1]
name = self._get_param("Name")
self.greengrass_backend.update_subscription_definition(
subscription_definition_id=subscription_definition_id, name=name
)
return 200, {"status": 200}, json.dumps({})

def subscription_definition_versions(self, request, full_url, headers):
self.setup_class(request, full_url, headers)

if self.method == "POST":
return self.create_subscription_definition_version()

if self.method == "GET":
return self.list_subscription_definition_versions()

def create_subscription_definition_version(self):

subscription_definition_id = self.path.split("/")[-2]
subscriptions = self._get_param("Subscriptions")
res = self.greengrass_backend.create_subscription_definition_version(
subscription_definition_id=subscription_definition_id,
subscriptions=subscriptions,
)
return 201, {"status": 201}, json.dumps(res.to_dict())

def list_subscription_definition_versions(self):
subscription_definition_id = self.path.split("/")[-2]
res = self.greengrass_backend.list_subscription_definition_versions(
subscription_definition_id=subscription_definition_id
)
versions = [i.to_dict() for i in res.values()]
return 200, {"status": 200}, json.dumps({"Versions": versions})

def subscription_definition_version(self, request, full_url, headers):
self.setup_class(request, full_url, headers)

if self.method == "GET":
return self.get_subscription_definition_version()

def get_subscription_definition_version(self):
subscription_definition_id = self.path.split("/")[-3]
subscription_definition_version_id = self.path.split("/")[-1]
res = self.greengrass_backend.get_subscription_definition_version(
subscription_definition_id=subscription_definition_id,
subscription_definition_version_id=subscription_definition_version_id,
)
return 200, {"status": 200}, json.dumps(res.to_dict())
4 changes: 4 additions & 0 deletions moto/greengrass/urls.py
Expand Up @@ -24,5 +24,9 @@
"{0}/greengrass/definition/resources$": response.resource_definitions,
"{0}/greengrass/definition/resources/(?P<definition_id>[^/]+)/?$": response.resource_definition,
"{0}/greengrass/definition/resources/(?P<definition_id>[^/]+)/versions$": response.resource_definition_versions,
"{0}/greengrass/definition/subscriptions$": response.subscription_definitions,
"{0}/greengrass/definition/subscriptions/(?P<definition_id>[^/]+)/?$": response.subscription_definition,
"{0}/greengrass/definition/subscriptions/(?P<definition_id>[^/]+)/versions$": response.subscription_definition_versions,
"{0}/greengrass/definition/subscriptions/(?P<definition_id>[^/]+)/versions/(?P<definition_version_id>[^/]+)/?$": response.subscription_definition_version,
"{0}/greengrass/definition/resources/(?P<definition_id>[^/]+)/versions/(?P<definition_version_id>[^/]+)/?$": response.resource_definition_version,
}

0 comments on commit 62923af

Please sign in to comment.