将增强型物联网数据导入 Amazon S3,使用 Amazon Kinesis Data Fireh
  • 20

将增强的物联网数据导入 Amazon S3 使用 Amazon Kinesis Data Firehose

关键要点

在本篇文章中,我们将介绍如何通过 Amazon Kinesis Data Firehose 将增强的物联网数据实时导入数据湖,同时涵盖关键的解决方案架构、AWS 服务配置和实际操作步骤。

介绍

在将数据从物联网IoT设备发送至数据湖时,您可能需要在云中用附加的元数据丰富设备数据负载,以便进行进一步的数据处理和可视化。出现这种情况的原因可能有多种,例如在带宽有限的环境中减少设备负载,或者在云中使用业务输入进行修改。例如,工厂车间的一台机器在一天内可能分配给不同的操作员。这个可变的业务数据将存储在数据库中。在您的数据湖中,您可能需要将此信息与负载一起存储。

在本文中,您将学习如何在接近实时的情况下将增强的 IoT 数据导入数据湖。

前提条件

一个 AWS 账户AWS 命令行界面 (AWS CLI)。请参阅 AWS CLI 快速设置 进行配置。

用例定义

假设在您的物流公司中,您有装有传感器的 IoT 设备的集装箱。当集装箱装载进船上时,集装箱 ID 与船 ID 关联。您需要在数据湖中存储带有船 ID 的 IoT 设备负载。

在这种用例中,传感器负载来自附着在集装箱上的 IoT 设备。然而,相关的船 ID 仅存储在元数据存储中。因此,负载必须在放入数据湖之前与船 ID 进行增强。

解决方案架构

在架构图中:

IoT 设备将负载流式传输到 AWS IoT Core 消息代理,并发送至特定的 MQTT 主题 device/data/DEVICEID。AWS IoT Core 消息代理允许设备使用支持的协议发布和订阅消息。当其主题中有负载时,AWS IoT 规则 被触发。此用例配置了一个 Amazon Kinesis Data Firehose 操作。您可以使用 AWS IoT 规则通过在特定的 MQTT 主题中触发消息或直接使用 Basic Ingest 功能 与 AWS 服务交互。Amazon Kinesis Data Firehose 在将设备负载发送到数据存储之前会缓存这些负载,缓存的依据是大小或时间,以先达到者为准。Kinesis Data Firehose 将实时流数据传输至存储或处理目标。一旦缓存达到大小或时间阈值,Kinesis Data Firehose 将调用 AWS Lambda 函数,以批量增强设备负载,并从 Amazon DynamoDB 中检索元数据。AWS Lambda 是一种无服务器计算服务,可以运行任何类型应用程序的代码。Amazon DynamoDB 是一个完全托管的 NoSQL 数据库,提供快速性能。增强后的负载返回至 Kinesis Data Firehose,以交付至目标。增强后的负载作为目标存储到 Amazon Simple Storage Service (Amazon S3) 桶。Amazon S3 是一种对象存储服务,能在多种用例中存储任何数量的数据。

AWS CloudFormation 模板

从 代码库 下载 AWS CloudFormation 模板。

AWS CloudFormation 模板将部署运行此示例用例所需的所有资源。让我们仔细看看 AWS IoT 规则、Kinesis Data Firehose 和 AWS Lambda 函数资源。

AWS IoT 规则资源

yamlIoTToFirehoseRule Type AWSIoTTopicRule Properties TopicRulePayload Actions Firehose RoleArn !GetAtt IoTFirehosePutRecordRoleArn DeliveryStreamName !Ref FirehoseDeliveryStream Separator n AwsIotSqlVersion 20160323 Description 该规则通过在 Kinesis Firehose 中聚合将 IoT 负载记录到 S3 桶。 RuleDisabled false Sql !Ref IotKinesisRuleSQL

AWS IoT 规则采用 SQL 参数,用于定义触发规则的 IoT 主题和从负载中提取的数据。

在该示例中,SQL 参数默认设置为 SELECT topic(3) as containerId FROM device/data/。SELECT 表示整个负载将原封不动地保留,containerId 从 MQTT 主题的第二个项生成并包含在负载中。FROM device/data/ 描述将触发 AWS IoT 规则的 IoT 主题。 是 MQTT 主题的通配符,IoT 设备将数据负载发布到 device/data/DEVICEID 主题以触发此规则。

AWS IoT 规则还定义了动作。在该示例中,您可以看到一个 Kinesis Data Firehose 动作,定义了目标 Kinesis Data Firehose 投递流和将记录放入此投递流所需的 AWS 身份与访问管理 (IAM) 角色。可以选择分隔符来分隔每条记录,在给定的示例中,它是换行符。

Kinesis Data Firehose 投递流资源

yamlFirehoseDeliveryStream Type AWSKinesisFirehoseDeliveryStream Properties ExtendedS3DestinationConfiguration BucketARN !GetAtt IoTLogBucketArn BufferingHints IntervalInSeconds 60 SizeInMBs 1 Prefix devicedata/ RoleARN !GetAtt FirehosePutS3RoleArn ProcessingConfiguration Enabled true Processors Type Lambda Parameters ParameterName LambdaArn ParameterValue !Sub {FirehoseTransformLambdaArn}LATEST ParameterName RoleArn ParameterValue !GetAtt FirehoseLambdaInvokeRoleArn

Kinesis Data Firehose 投递流必须定义一个目的地以存放流数据。它支持多种类型的目的地,您可以在此文档中查看可用的目的地类型及其用法。在本示例中,您将使用 Amazon S3 作为目的地。

示例投递流资源定义了以下属性:

BucketARN:存储聚合数据的目标桶。目标桶由 CloudFormation 堆栈创建。BufferingHints:用于数据缓冲的大小和时间阈值。在此示例中,设置为 1MB 和 60 秒以便更快查看结果,可根据业务需求进行调整。保持这些阈值较低将导致 Lambda 函数更频繁地调用。如果阈值设置过高,将导致数据的入库频率降低,因此在数据存储中查看最新数据将需要一些时间。Prefix:所创建对象的前缀。Kinesis Data Firehose 默认根据时间戳对数据进行分区。在这个示例中,对象将存放在 devicedata/YYYY/MM/dd/HH 文件夹下。Kinesis Data Firehose 具有像动态分区这样的高级数据分区功能。数据的分区在查询数据湖时非常重要。例如,如果您需要基于设备使用 Amazon Athena 查询数据,通过仅扫描相关设备 ID 的分区将显著减少扫描时间和成本。您可以在此文档中查看有关分区的详细信息。RoleARN:该 IAM 角色赋予 Kinesis Data Firehose 向 Amazon S3 桶放入聚合数据的 PutObject 权限。ProcessingConfiguration:如用例所述,一个转换 Lambda 函数将用元数据增强 IoT 数据。处理配置定义了在示例中作为 Lambda 函数的处理器。对于每一批数据,Kinesis Data Firehose 将调用该 Lambda 函数对数据进行转换。您可以在此文档中了解更多关于数据处理的信息。

转换 Lambda 函数

如下 Python 代码所示,Kinesis Data Firehose 返回一批记录,每条记录都是来自 IoT 设备的负载。首先,解码 base64 编码的负载数据。然后,根据集装箱 ID 从 DynamoDB 表中获取相应的船 ID。将船 ID 添加到负载中,并编码回 base64。最后,将记录列表返回给 Kinesis Data Firehose。

pythonimport osimport boto3import jsonimport base64

dynamodb = boto3resource(dynamodb)table = dynamodbTable(osenviron[METADATATABLE])records = []

def functionhandler(event context) for record in event[records] # 获取记录的数据字段json 格式,它是一个 base64 编码的字符串 jsondata = jsonloads(base64b64decode(record[data])) containerid = jsondata[containerId]

    # 从 DynamoDB 表中获取相应的 shipId    res = tablegetitem(Key={containerId containerid})    ddbitem = res[Item]    shipid = ddbitem[shipId]    # 将 shipId 附加到实际记录数据    enricheddata = jsondata    enricheddata[shipId] = shipid    # 将增强的记录编码为 base64    jsonstring = jsondumps(enricheddata)encode(ascii)    b64encodeddata = base64b64encode(jsonstring)decode(ascii)    # 创建一个包含增强数据的记录并返回给 Firehose    rec = {recordId record[recordId] result Ok data b64encodeddata}    recordsappend(rec)return {records records}

部署

在终端中运行以下命令以部署堆栈。

tk加速器下载安装

bashaws cloudformation deploy stackname IoTKinesisDataPath templatefile IoTKinesisDataPathyml parameteroverrides IotKinesisRuleSQL=SELECT topic(3) as containerId FROM device/data/ capabilities CAPABILITYNAMEDIAM

部署完成后,在终端中运行以下命令以查看部署输出。

bashaws cloudformation describestacks stackname IoTKinesisDataPath

请注意 IoTLogS3BucketName 和 MetadataTableName 输出参数。

将增强型物联网数据导入 Amazon S3,使用 Amazon Kinesis Data Fireh

测试

部署完成后,首先您需要创建一个元数据项以进行数据增强。运行以下命令在 DynamoDB 表中创建一个项。它将创建一个 containerId 为 cont1 并且 shipId 为 ship1 的项。请将 IoTKinesisDataPathMetadataTableSAMPLE 参数替换为 CloudFormation 堆栈部署中的 DynamoDB 表输出参数。

bashaws dynamodb putitem tablename IoTKinesisDataPathMetadataTableSAMPLE item {containerId{Scont1}shipId{Sship1}}

在现实情况下,设备会向特定的 MQTT 主题发布负载。在本示例中,您将使用 AWS CLI 向 MQTT 主题发布负载。运行以下命令在 AWS IoT Core 发布示例数据负载。请注意命令的负载字段,其中特殊数据由设备提供。

bashaws iotdata publish topic device/data/cont1 payload {temperature20humidity80latitude0longitude0} clibinaryformat rawinbase64out

现在,从 AWS 管理控制台 导航到 Amazon S3,并选择由 CloudFormation 堆栈创建的桶。您应该在此桶中看到 devicedata 文件夹。由于 Firehose 投递流设置的缓存配置,数据可能需要最多 1 分钟才能出现。如果您导航到 devicedata/YYYY/MM/dd/HH 文件夹,您会看到一个对象已被创建。请打开此文件,您将看到文件内容是带有增强的 shipId 字段的数据负载。

json{temperature 20 humidity 80 latitude 0 longitude 0 containerId cont1 shipId ship1}

故障排除

如系统出现故障,以下资源可能有助于分析问题的来源。

要监控 AWS IoT Core 规则引擎,您需要启用 AWS IoT Core 日志记录。这将提供有关 AWS IoT Core 中发生事件的详细信息。

AWS Lambda 可以通过 Amazon CloudWatch 进行监控。示例 CloudFormation 模板具有创建 Lambda 函数日志组所需的权限。

在故障出现时,Kinesis Data Firehose 将在 AWS IoT 规则引擎动作、转换 Lambda 函数或 Amazon S3 桶下创建一个 processingfailed 文件夹。您可以作为 JSON 对象读取失败的详细信息。有关更多信息,请参见此 文档。

清理

要清理已创建的资源,请首先清空 Amazon S3 桶。通过运行以下命令,将 bucketname 参数替换为 CloudFormation 堆栈部署的桶名称。重要:此命令将不可逆地删除桶内的所有数据。

bashaws s3 rm s3//bucketname recursive

然后,您可以通过在终端中运行以下命令删除 CloudFormation 堆栈。

bashaws cloudformation deletestack stackname IoTKinesisDataPath

结论

在本文中,您了解了通过 AWS IoT 规则引擎和 Amazon Kinesis Data Firehose 投递流以近实时的方式用元数据丰富 IoT 负载并有效地存储在数据湖中的常见模式。所提出的解决方案及其 CloudFormation 模板可作为可扩展的 IoT 数据摄取架构的基准。

您可以进一步阅读有关 AWS IoT Core 规则引擎 和 Amazon Kinesis Data Firehose。使用 AWS IoT 规则引擎中的 MQTT 主题的 最佳实践 将指导您定义主题结构。

Ozan Cihangir

Ozan 是 AWS 的原型工程师。他帮助客户在云中构建创新的解决方案以实现新兴技术项目。 LinkedIn