使用 Amazon DynamoDB 增量导出更新 Apache Iceberg 表
重点摘要
本文介绍了如何使用 Amazon DynamoDB 的增量导出功能,结合 EMR Serverless 和 Apache Iceberg 进行数据更新及查询。通过增量导出,用户可以避免每次都进行全量导出,仅处理变化的数据,从而提高处理效率。最终,在 Athena 中可直接查询最新的 Iceberg 表内容。
Amazon DynamoDB 是一款完全管理的无服务器键值 NoSQL 数据库,设计用于运行高性能的应用程序。近期,DynamoDB 推出了一个新功能:增量导出到 Amazon S3。借助这个增量导出功能,您可以定期更新下游系统,只需使用已更改的数据,无需每次都执行全量导出。增量导出仅输出在指定时间段内插入、更新或删除的数据项,并且文件格式与全量导出不同,主要用于作为补充数据类似源代码中的补丁,同时附带元数据,如每项数据的上次更新时间及处理前后的数据快照。
在本文中,您将学习如何使用 Amazon EMR Serverless 和 Apache Spark 批量处理一系列全量和增量导出,以生成表示 DynamoDB 表最新状态的 Apache Iceberg 表,然后能够使用 Amazon Athena 查询该表。需要注意的是,从导出到批量处理再到查询的整个过程都是无服务器的。
如果您不熟悉这些技术:
Amazon EMR Serverless 使得无需配置、管理或扩展集群即可简单运行使用 Apache Spark 和 Apache Hive 的开源分析框架的应用。Apache Spark 是一种用于编程集群的接口,具备隐式数据并行性和容错性。Apache Iceberg 是一个针对存储在 S3 的大型数据集的表格式,具备快速查询性能、原子提交和并发写入等特性。它还支持时间旅行,允许用户查询过去的数据。您可以更多了解 Iceberg 的工作原理。Amazon Athena 是一种无服务器的交互式分析服务,建立在开源框架上,支持开放表和文件格式。架构方案
使用 DynamoDB 导出设置分析的典型方法是首先发起一次全量导出以生成新的 Iceberg 表,然后重复执行增量导出每个增量周期可以为 15 分钟到 24 小时以更新 Iceberg 表中的所有更改。数据处理通过在 EMR Serverless 上运行的 Spark 作业进行。
注意:您可以选择使用 AWS EventBridge 或 Amazon 管理的 Apache Airflow 工作流 来自动化管道并定期更新。
下图展示了该管道的过程:
应用流量更新 DynamoDB。新的增量导出到 S3 以原始格式写入更改。EMR Serverless 脚本将更改整合到 Iceberg 表中作为处理层。下游应用如 Athena可以将表作为一个整体进行查询,使分析师或其他用户能够分析最新的 DynamoDB 数据。解决方案步骤
假设您已经拥有希望导出用于分析的 DynamoDB 表。导出过程不会改变表或干扰其他对表的流量。如果您想创建一个仅供实验的小示例表,可以参考 入门指南 设置。
下文说明使用三个 S3 存储桶位置:
用于全量和增量导出的 S3 文件夹“dynamodbexportbucket”用于 Spark 脚本的 S3 文件夹“sparkscriptbucket”用于 Iceberg 表的 S3 文件夹“icebergbucket”以及生成的 schema 定义文件您可以使用同一个 S3 存储桶,设置不同的前缀,或者使用不同的存储桶。当设置 S3 存储桶时,建议遵循 S3 安全最佳实践。
步骤 1:从您的 DynamoDB 表执行全量导出
您将从 DynamoDB 表执行全量导出开始。此步骤仅需执行一次。
进入 DynamoDB 控制台中的 Exports to S3 部分,选择 Export to S3。指定 DynamoDB 表、S3 存储桶ltdynamodbexportbucketgt和可选前缀,选择 Full export 并选择 Export from an earlier point in time。选择一个便于计算的最近的时间值,作为第一个增量导出的启动时间。如果您正在进行每小时的导出,可以选择最近的整点。例如,我们使用 20231001 120000 作为本地时间,选择 DynamoDB JSON 作为输出类型,选择加密密钥,然后选择 Export。有关全量导出的更多细节,请参见 AWS 文档中的 DynamoDB 导出。
全量导出完成后,您将在 S3 存储桶中看到输出结果。
步骤 2:创建 EMR Serverless 应用程序
接下来,配置一个 EMR Serverless 应用程序,作为您所有批量活动的执行环境。EMR Serverless 理想用于这些作业,因为其运行时间较短,而在下次调用之前有较长的延迟。EMR Serverless 会自动确定应用程序所需的资源,分配这些资源来处理作业,并在作业完成后释放资源。因此,当作业未在运行时,您将不会产生额外费用。
在 AWS 控制台中,确保您位于与您的三个存储桶相同的 AWS 区域。然后搜索 Amazon EMR 打开 EMR 控制台。在左侧边栏选择 EMR Serverless。
选择 Get started如果您不是第一次使用 EMR Studio,则选择 Manage applications。现在您可以创建 EMR Serverless 应用。选择类型为 Spark,并从下拉菜单中选择最新版本的 EMR,如下所示:还有其他可选配置可供调整,但默认配置将适用于本次演示。如果您更喜欢 AWS 命令行接口CLI,以下命令将创建一个类似的集群:
bashaws emrserverless createapplication name ddbincrementalexports type SPARK releaselabel emr6140
如果您使用的是 CLI,请注意返回的 EMR Serverless 应用程序 ID。后续步骤将需要用到该 ID。您还可以从 AWS EMR 控制台中检索 EMR Serverless 应用程序 ID,以查看新创建的 EMR Serverless 应用程序。
步骤 3:准备 EMR Serverless 执行角色
EMR Serverless 作业的执行需要一个 IAM 角色,该角色必须具有足够的权限来读取 ltdynamodbexportbucketgt 和 ltsparkscriptbucketgt、读写 lticebergbucketgt,以及访问 AWS Glue Catalog。
您可以下载一套 AWS CLI 命令示例,然后 在脚本内部替换存储桶占位符名称为您实际的存储桶名称。替换存储桶名称后,您可以运行这套 CLI 命令,以在 AWS 账户中创建具有正确权限的 IAM 角色。
步骤 4:使用 EMR Serverless 读取全量导出并动态识别表架构
DynamoDB 不强制执行架构,且不同数据项可以有不同的属性。然而,Iceberg 表需要固定的架构。确保非结构化的 DynamoDB 数据正确映射到结构化的 Iceberg 表的最佳方法是创建一个包含将映射到您全量 DynamoDB 数据集所有唯一属性的列的 Iceberg 表。
您也可以手动定义架构,但运行一个 Spark 作业来分析您的 DynamoDB 全量导出并输出带有所有属性的架构可能更简单。
用于推断架构的脚本可在 detectschemafromfullexportpy 下载。它以 JSON 格式写入生成的架构。将脚本保存并上传至 s3//ltsparkscriptbucketgt/ltoptionalprefixgt/。上述 IAM 角色应授予您的 EMR Serverless 应用读取该存储桶的权限。
您现在将提交一个作业,以使用全量导出推断架构。该作业将提交给您在前一步创建的 EMR Serverless 应用。
前往 EMR 应用中的 Submit job。输入作业名称。在本例中,我们将其命名为 “detectschemafromfullexport”。对于 Runtime role,使用您在步骤 3 中创建的角色 ARN可从下拉列表中选择。对于 Script location,输入您保存 Python 脚本的 S3 路径您可以使用 UI 找到它。对于 Script arguments,提供 DynamoDB 全量导出的 S3 位置来自步骤 1,以及将写入架构文件的输出文件路径。注意,运行您的 EMR Serverless 应用的作业角色需要对架构输出位置有写入权限。您可以使用 lticebergbucketgt,因为您已经设置了该存储桶的写入权限。以下是可供复制粘贴的参数的起始文本:json[ s3//ltdynamodbexportbucketgt/ltoptionalprefixgt/ltfullexportfoldergt/ s3//lticebergbucketgt/ltoptionalprefixgt/schemajson]
真实示例展示脚本参数:
json[ s3//mybucket/exports/AWSDynamoDB/01697091611852ae49471c/ s3//myiceberg/schema/schemajson]
确保所有三个存储桶位于与 EMR Serverless 应用作业相同的区域。如果不在同一区域,作业会在执行时因 ConnectTimeoutException 而失败。
或者,您也可以使用以下 AWS CLI 提交您的作业到 EMR Serverless 应用。
bashaws emrserverless startjobrun applicationid applicationid executionrolearn jobrolearn jobdriver {sparkSubmit {entryPoint s3//ltsparkscriptbucketgt/ltoptionalprefixgt/detectschemafromfullexportpy entryPointArguments [s3//ltdynamodbexportbucketgt/ltoptionalprefixgt/ltfullexportfoldergt/ s3//lticebergbucketgt/ltoptionalprefixgt/schemajson]}}
该作业需要几分钟才能运行,具体取决于您的 DynamoDB 全量导出的大小。您可以在 EMR Serverless 作业控制台中跟踪作业进度。如果显示成功,则作业已完成。否则,请检查控制台错误或驱动程序日志stdout/stderr,并纠正诸如参数输入错误等问题。
成功完成后,您将会在参数指定的位置获得一个 schemajson 文件,包含来自 DynamoDB 全量导出的架构。它的样子应类似如下:
json
一元机场免费示例架构
键为列名,值为数据类型
{ productid S quantity N remainingcount N inventorydate S price S productname S}
步骤 5:创建 AWS Glue 数据目录数据库
建议您使用 AWS Glue 数据目录持久化 Iceberg 表的元存储,这样可以通过 Athena或任何兼容 Hive 元存储的查询引擎查询这些表。本文中使用的脚本假设 dev 为默认目录名,db 为默认数据库名。使用以下 AWS CLI 命令创建 db 数据库:
bashaws glue createdatabase databaseinput {Namedb}
步骤 6:使用 EMR Serverless 根据全量导出和使用生成的架构构建 Iceberg 表
下载 createicebergfromfullexportpy 脚本,并将其存储在 ltsparkscriptbucketgt 中,以将全量导入加载到 Iceberg 表中。该 pyspark 脚本执行以下主要操作:
将 JSON 数据读取到数据框中数据框是一种将数据组织成行和列的二维表格数据结构。数据框是现代数据分析中最常用的数据结构之一,因为它们是一种灵活且直观的数据存储和处理方式。此处的第一个数据框与 S3 存储桶中的 JSON 格式相匹配。
应用架构以制作新的数据框该脚本随后使用架构将基于 JSON 的数据框转换为与提供架构匹配的数据框。

将新数据框写入目标 Iceberg 表然后将数据框批量写入新的 Iceberg 表。
通过控制台向 EMR Serverless 应用提交作业,如下图截图所示。为作业提供一个名称、作业角色、您的 Python 脚本的位置ltsparkscriptbucketgt和脚本参数。
以下是可供复制粘贴的脚本参数的起始文本:
json[ s3//ltdynamodbexportbucketgt/ltoptionalprefixgt/ltfullexportfoldergt/ s3//lticebergbucketgt/ltoptionalprefixgt/schemajson lticebergtablenamegt s3//lticebergbucketgt/ltoptionalprefixgt/]
真实示例展示脚本参数:
json[ s3//mybucket/exports/AWSDynamoDB/01697091611852ae49471c/ s3//myiceberg/schema/schemajson iceberg s3//myiceberg/icechest/]
或者,您也可以使用 AWS CLI 提交您的作业到 EMR Serverless 应用。
bashaws emrserverless startjobrun applicationid applicationid executionrolearn jobrolearn jobdriver {sparkSubmit {entryPoint s3////createicebergfromfullexportpyentryPointArguments [s3/////s3////schemajsonlticebergtablename
发表评论