Skip to content

Commit

Permalink
DynamoDB: import_table()/describe_import() (#7672)
Browse files Browse the repository at this point in the history
  • Loading branch information
bblommers committed May 6, 2024
1 parent 8ab84c1 commit 06d0b2a
Show file tree
Hide file tree
Showing 6 changed files with 683 additions and 5 deletions.
6 changes: 3 additions & 3 deletions IMPLEMENTATION_COVERAGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -2024,7 +2024,7 @@

## dynamodb
<details>
<summary>56% implemented</summary>
<summary>59% implemented</summary>

- [X] batch_execute_statement
- [X] batch_get_item
Expand All @@ -2043,7 +2043,7 @@
- [ ] describe_export
- [ ] describe_global_table
- [ ] describe_global_table_settings
- [ ] describe_import
- [X] describe_import
- [ ] describe_kinesis_streaming_destination
- [ ] describe_limits
- [X] describe_table
Expand All @@ -2056,7 +2056,7 @@
- [ ] export_table_to_point_in_time
- [X] get_item
- [ ] get_resource_policy
- [ ] import_table
- [X] import_table
- [X] list_backups
- [ ] list_contributor_insights
- [ ] list_exports
Expand Down
10 changes: 8 additions & 2 deletions docs/docs/services/dynamodb.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ dynamodb
- [ ] describe_export
- [ ] describe_global_table
- [ ] describe_global_table_settings
- [ ] describe_import
- [X] describe_import
- [ ] describe_kinesis_streaming_destination
- [ ] describe_limits
- [X] describe_table
Expand All @@ -58,7 +58,13 @@ dynamodb
- [ ] export_table_to_point_in_time
- [X] get_item
- [ ] get_resource_policy
- [ ] import_table
- [X] import_table

Only InputFormat=DYNAMODB_JSON is supported so far.
InputCompressionType=ZSTD is not supported.
Other parameters that are not supported: InputFormatOptions, CloudWatchLogGroupArn


- [X] list_backups
- [ ] list_contributor_insights
- [ ] list_exports
Expand Down
38 changes: 38 additions & 0 deletions moto/dynamodb/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
RestoredTable,
Table,
)
from moto.dynamodb.models.table_import import TableImport
from moto.dynamodb.parsing import partiql
from moto.dynamodb.parsing.executors import UpdateExpressionExecutor
from moto.dynamodb.parsing.expressions import UpdateExpressionParser # type: ignore
Expand All @@ -44,6 +45,7 @@ def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.tables: Dict[str, Table] = OrderedDict()
self.backups: Dict[str, Backup] = OrderedDict()
self.table_imports: Dict[str, TableImport] = {}

@staticmethod
def default_vpc_endpoint_service(
Expand Down Expand Up @@ -934,5 +936,41 @@ def batch_execute_statement(
responses[idx] = {"Error": {"Code": e.name, "Message": e.message}} # type: ignore
return responses

def import_table(
self,
s3_source: Dict[str, str],
input_format: Optional[str],
compression_type: Optional[str],
table_name: str,
billing_mode: str,
throughput: Optional[Dict[str, int]],
key_schema: List[Dict[str, str]],
global_indexes: Optional[List[Dict[str, Any]]],
attrs: List[Dict[str, str]],
) -> TableImport:
"""
Only InputFormat=DYNAMODB_JSON is supported so far.
InputCompressionType=ZSTD is not supported.
Other parameters that are not supported: InputFormatOptions, CloudWatchLogGroupArn
"""
table_import = TableImport(
account_id=self.account_id,
s3_source=s3_source,
region_name=self.region_name,
table_name=table_name,
billing_mode=billing_mode,
throughput=throughput,
key_schema=key_schema,
global_indexes=global_indexes,
attrs=attrs,
compression_type=compression_type,
)
self.table_imports[table_import.arn] = table_import
table_import.start()
return table_import

def describe_import(self, import_arn: str) -> TableImport:
return self.table_imports[import_arn]


dynamodb_backends = BackendDict(DynamoDBBackend, "dynamodb")
138 changes: 138 additions & 0 deletions moto/dynamodb/models/table_import.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
from threading import Thread
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from uuid import uuid4

from moto.core.utils import gzip_decompress

if TYPE_CHECKING:
from moto.dynamodb.models import DynamoDBBackend
from moto.dynamodb.models.table import Table
from moto.s3.models import FakeBucket, S3Backend


class TableImport(Thread):
def __init__(
self,
account_id: str,
s3_source: Dict[str, str],
region_name: str,
table_name: str,
billing_mode: str,
throughput: Optional[Dict[str, int]],
key_schema: List[Dict[str, str]],
global_indexes: Optional[List[Dict[str, Any]]],
attrs: List[Dict[str, str]],
compression_type: Optional[str],
):
super().__init__()
self.arn = f"arn:aws:dynamodb:{region_name}:{account_id}:table/{table_name}/import/{str(uuid4()).replace('-', '')}"
self.status = "IN_PROGRESS"
self.account_id = account_id
self.s3_source = s3_source
self.region_name = region_name

self.table_name = table_name
self.billing_mode = billing_mode
self.throughput = throughput
self.key_schema = key_schema
self.global_indexes = global_indexes
self.attrs = attrs
self.compression_type = compression_type

self.failure_code: Optional[str] = None
self.failure_message: Optional[str] = None
self.table: Optional["Table"] = None
self.table_arn = (
f"arn:aws:dynamodb:{self.region_name}:{self.account_id}:table/{table_name}"
)

self.processed_count = 0
self.processed_bytes = 0
self.error_count = 0
self.imported_count = 0

def run(self) -> None:
s3_bucket_name = self.s3_source["S3Bucket"]

try:
from moto.s3.models import s3_backends

s3_backend = s3_backends[self.account_id]["global"]
bucket = s3_backend.buckets[s3_bucket_name]
except KeyError:
self.status = "FAILED"
self.failure_code = "S3NoSuchBucket"
self.failure_message = "The specified bucket does not exist"
return

try:
self._process_s3_files(s3_backend, bucket)
except Exception as e:
self.status = "FAILED"
self.failure_code = "UNKNOWN"
self.failure_message = str(e)

def _process_s3_files(self, s3_backend: "S3Backend", bucket: "FakeBucket") -> None:
# CREATE TABLE
from moto.dynamodb.models import dynamodb_backends

dynamo: DynamoDBBackend = dynamodb_backends[self.account_id][self.region_name]
self.table = dynamo.create_table(
name=self.table_name,
billing_mode=self.billing_mode,
throughput=self.throughput,
schema=self.key_schema,
global_indexes=self.global_indexes,
indexes=None,
attr=self.attrs,
sse_specification=None,
streams=None,
tags=[],
deletion_protection_enabled=False,
)

# Load data from S3
keys, _, _, _ = s3_backend.list_objects(
bucket,
prefix=self.s3_source.get("S3KeyPrefix"),
delimiter=None,
marker=None,
max_keys=None,
)

from py_partiql_parser._internal.json_parser import JsonParser

parser = JsonParser()

for key in keys:
if self.compression_type == "GZIP":
content = gzip_decompress(key.value).decode("utf-8")
else:
content = key.value.decode("utf-8")
result = parser.parse(original=content)
if not isinstance(result, list):
result = [result]
for json_object in result:
try:
self.processed_count += 1
self.processed_bytes += len(json_object)
self.table.put_item(item_attrs=json_object["Item"])
self.imported_count += 1
except Exception as e:
self.failure_message = str(e)
self.error_count += 1

self.status = "COMPLETED" if self.error_count == 0 else "FAILED"

def response(self) -> Dict[str, Any]:
return {
"ImportArn": self.arn,
"ImportStatus": self.status,
"TableArn": self.table_arn,
"FailureCode": self.failure_code,
"FailureMessage": self.failure_message,
"ProcessedItemCount": self.processed_count,
"ProcessedSizeBytes": self.processed_bytes,
"ErrorCount": self.error_count,
"ImportedItemCount": self.imported_count,
}
40 changes: 40 additions & 0 deletions moto/dynamodb/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -1182,3 +1182,43 @@ def batch_execute_statement(self) -> str:
stmts = self.body.get("Statements", [])
items = self.dynamodb_backend.batch_execute_statement(stmts)
return dynamo_json_dump({"Responses": items})

def import_table(self) -> str:
params = self.body
s3_source = params.get("S3BucketSource")
input_format = params.get("InputFormat") or "DYNAMODB_JSON"
compression_type = params.get("InputCompressionType") or "NONE"
table_parameters = params.get("TableCreationParameters")
table_name = table_parameters["TableName"]
table_attrs = table_parameters["AttributeDefinitions"]
table_schema = table_parameters["KeySchema"]
table_billing = table_parameters.get("BillingMode")
table_throughput = table_parameters.get("ProvisionedThroughput")
global_indexes = table_parameters.get("GlobalSecondaryIndexes")

self._validate_table_creation(
billing_mode=table_billing,
throughput=table_throughput,
key_schema=table_schema,
global_indexes=global_indexes,
local_secondary_indexes=None,
attr=table_attrs,
)

import_table = self.dynamodb_backend.import_table(
s3_source=s3_source,
input_format=input_format,
compression_type=compression_type,
table_name=table_name,
billing_mode=table_billing or "PROVISIONED",
throughput=table_throughput,
key_schema=table_schema,
global_indexes=global_indexes,
attrs=table_attrs,
)
return json.dumps({"ImportTableDescription": import_table.response()})

def describe_import(self) -> str:
import_arn = self.body["ImportArn"]
import_table = self.dynamodb_backend.describe_import(import_arn)
return json.dumps({"ImportTableDescription": import_table.response()})

0 comments on commit 06d0b2a

Please sign in to comment.