= aws_s3 :type: input :status: stable :categories: ["Services","AWS"] //// THIS FILE IS AUTOGENERATED! To make changes, edit the corresponding source file under: https://github.com/redpanda-data/connect/tree/main/internal/impl/. And: https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl //// // © 2024 Redpanda Data Inc. component_type_dropdown::[] Downloads objects within an Amazon S3 bucket, optionally filtered by a prefix, either by walking the items in the bucket or by streaming upload notifications in realtime. [tabs] ====== Common:: + -- ```yml # Common config fields, showing default values input: label: "" aws_s3: bucket: "" prefix: "" scanner: to_the_end: {} sqs: url: "" key_path: Records.*.s3.object.key bucket_path: Records.*.s3.bucket.name envelope_path: "" ``` -- Advanced:: + -- ```yml # All config fields, showing default values input: label: "" aws_s3: bucket: "" prefix: "" region: "" endpoint: "" credentials: profile: "" id: "" secret: "" token: "" from_ec2_role: false role: "" role_external_id: "" force_path_style_urls: false delete_objects: false scanner: to_the_end: {} sqs: url: "" endpoint: "" key_path: Records.*.s3.object.key bucket_path: Records.*.s3.bucket.name envelope_path: "" delay_period: "" max_messages: 10 wait_time_seconds: 0 ``` -- ====== == Stream objects on upload with SQS A common pattern for consuming S3 objects is to emit upload notification events from the bucket either directly to an SQS queue, or to an SNS topic that is consumed by an SQS queue, and then have your consumer listen for events which prompt it to download the newly uploaded objects. More information about this pattern and how to set it up can be found at in the https://docs.aws.amazon.com/AmazonS3/latest/dev/ways-to-add-notification-config-to-bucket.html[Amazon S3 docs]. Benthos is able to follow this pattern when you configure an `sqs.url`, where it consumes events from SQS and only downloads object keys received within those events. In order for this to work Benthos needs to know where within the event the key and bucket names can be found, specified as xref:configuration:field_paths.adoc[dot paths] with the fields `sqs.key_path` and `sqs.bucket_path`. The default values for these fields should already be correct when following the guide above. If your notification events are being routed to SQS via an SNS topic then the events will be enveloped by SNS, in which case you also need to specify the field `sqs.envelope_path`, which in the case of SNS to SQS will usually be `Message`. When using SQS please make sure you have sensible values for `sqs.max_messages` and also the visibility timeout of the queue itself. When Benthos consumes an S3 object the SQS message that triggered it is not deleted until the S3 object has been sent onwards. This ensures at-least-once crash resiliency, but also means that if the S3 object takes longer to process than the visibility timeout of your queue then the same objects might be processed multiple times. == Download large files When downloading large files it's often necessary to process it in streamed parts in order to avoid loading the entire file in memory at a given time. In order to do this a <> can be specified that determines how to break the input into smaller individual messages. == Credentials By default Benthos will use a shared credentials file when connecting to AWS services. It's also possible to set them explicitly at the component level, allowing you to transfer data across accounts. You can find out more in xref:guides:cloud/aws.adoc[]. == Metadata This input adds the following metadata fields to each message: - s3_key - s3_bucket - s3_last_modified_unix - s3_last_modified (RFC3339) - s3_content_type - s3_content_encoding - s3_version_id - All user defined metadata You can access these metadata fields using xref:configuration:interpolation.adoc#bloblang-queries[function interpolation]. Note that user defined metadata is case insensitive within AWS, and it is likely that the keys will be received in a capitalized form, if you wish to make them consistent you can map all metadata keys to lower or uppercase using a Bloblang mapping such as `meta = meta().map_each_key(key -> key.lowercase())`. == Fields === `bucket` The bucket to consume from. If the field `sqs.url` is specified this field is optional. *Type*: `string` *Default*: `""` === `prefix` An optional path prefix, if set only objects with the prefix are consumed when walking a bucket. *Type*: `string` *Default*: `""` === `region` The AWS region to target. *Type*: `string` *Default*: `""` === `endpoint` Allows you to specify a custom endpoint for the AWS API. *Type*: `string` *Default*: `""` === `credentials` Optional manual configuration of AWS credentials to use. More information can be found in xref:guides:cloud/aws.adoc[]. *Type*: `object` === `credentials.profile` A profile from `~/.aws/credentials` to use. *Type*: `string` *Default*: `""` === `credentials.id` The ID of credentials to use. *Type*: `string` *Default*: `""` === `credentials.secret` The secret for the credentials being used. [CAUTION] ==== This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info]. ==== *Type*: `string` *Default*: `""` === `credentials.token` The token for the credentials being used, required when using short term credentials. *Type*: `string` *Default*: `""` === `credentials.from_ec2_role` Use the credentials of a host EC2 machine configured to assume https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_use_switch-role-ec2.html[an IAM role associated with the instance^]. *Type*: `bool` *Default*: `false` Requires version 4.2.0 or newer === `credentials.role` A role ARN to assume. *Type*: `string` *Default*: `""` === `credentials.role_external_id` An external ID to provide when assuming a role. *Type*: `string` *Default*: `""` === `force_path_style_urls` Forces the client API to use path style URLs for downloading keys, which is often required when connecting to custom endpoints. *Type*: `bool` *Default*: `false` === `delete_objects` Whether to delete downloaded objects from the bucket once they are processed. *Type*: `bool` *Default*: `false` === `scanner` The xref:components:scanners/about.adoc[scanner] by which the stream of bytes consumed will be broken out into individual messages. Scanners are useful for processing large sources of data without holding the entirety of it within memory. For example, the `csv` scanner allows you to process individual CSV rows without loading the entire CSV file in memory at once. *Type*: `scanner` *Default*: `{"to_the_end":{}}` Requires version 4.25.0 or newer === `sqs` Consume SQS messages in order to trigger key downloads. *Type*: `object` === `sqs.url` An optional SQS URL to connect to. When specified this queue will control which objects are downloaded. *Type*: `string` *Default*: `""` === `sqs.endpoint` A custom endpoint to use when connecting to SQS. *Type*: `string` *Default*: `""` === `sqs.key_path` A xref:configuration:field_paths.adoc[dot path] whereby object keys are found in SQS messages. *Type*: `string` *Default*: `"Records.*.s3.object.key"` === `sqs.bucket_path` A xref:configuration:field_paths.adoc[dot path] whereby the bucket name can be found in SQS messages. *Type*: `string` *Default*: `"Records.*.s3.bucket.name"` === `sqs.envelope_path` A xref:configuration:field_paths.adoc[dot path] of a field to extract an enveloped JSON payload for further extracting the key and bucket from SQS messages. This is specifically useful when subscribing an SQS queue to an SNS topic that receives bucket events. *Type*: `string` *Default*: `""` ```yml # Examples envelope_path: Message ``` === `sqs.delay_period` An optional period of time to wait from when a notification was originally sent to when the target key download is attempted. *Type*: `string` *Default*: `""` ```yml # Examples delay_period: 10s delay_period: 5m ``` === `sqs.max_messages` The maximum number of SQS messages to consume from each request. *Type*: `int` *Default*: `10` === `sqs.wait_time_seconds` Whether to set the wait time. Enabling this activates long-polling. Valid values: 0 to 20. *Type*: `int` *Default*: `0`