使用 Knative 事件、函数和 Apache Camel K 消费 AWS S3 事件 ¶
发布时间:2024-06-14
使用 Knative 事件、函数和 Apache Camel K 消费 AWS S3 事件¶
作者:Matthias Weßendorf,Red Hat 高级首席软件工程师
在本博文中,您将学习如何使用 Knative 事件和函数轻松地使用 Knative 函数从 AWS S3 存储桶中消费事件。博文基于关于 Apache Camel K 和 Knative 的系列文章的第一篇文章
无服务器函数的典型用例之一是响应由外部事件源提供的事件。一个常见的例子是在无服务器函数中接收来自 AWS S3 存储桶的通知。但是,您如何在本地环境中接收这些事件,在该环境中,函数不是在 AWS 上运行,而是在自定义 Kubernetes 设置上运行?
安装¶
Apache Camel K 的安装 提供了一些选择,例如 CLI、Kustomize、OLM 或 Helm。例如,Helm 安装是
$ helm repo add camel-k https://apache.github.io/camel-k/charts/
$ helm install my-camel-k camel-k/camel-k
除了 Camel K,我们还需要安装 Knative 事件,如文档中所述。
创建 Knative 代理实例¶
我们使用 Knative 代理作为我们系统的核心,充当事件生产者和事件消费者的事件网格
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
namespace: default
name: my-broker
现在事件生产者可以向其发送事件,事件消费者可以接收事件。
使用 Kamelets 作为 AWS S3 的事件源¶
为了将 Kamelet 绑定到 Knative 组件(如我们上面创建的代理),我们使用 Pipe
API。管道允许以声明方式将数据从 Kamelet 描述的系统移动到 Knative 资源或从 Knative 资源移动到 Kamelet 描述的另一个(外部)系统。
下面是一个 Pipe
,它使用现成的 Kamelet
,一个 aws-s3-source
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
name: aws-s3-source-pipe
annotations:
trait.camel.apache.org/mount.config: "secret:aws-s3-credentials"
spec:
integration:
dependencies:
- "camel:cloudevents"
source:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: aws-s3-source
properties:
bucketNameOrArn: "${aws.s3.bucketNameOrArn}"
accessKey: "${aws.s3.accessKey}"
secretKey: "${aws.s3.secretKey}"
region: "${aws.s3.region}"
dataTypes:
out:
scheme: aws2-s3
format: application-cloudevents
sink:
dataTypes:
in:
scheme: http
format: application-cloudevents
ref:
kind: Broker
apiVersion: eventing.knative.dev/v1
name: my-broker
aws-s3-source
Kamelet 被引用为 Pipe
的 source
,当引用的存储桶中有活动时,将 CNCF CloudEvents 发送到出站 sink
。这里我们使用 Knative 代理,它接受 CloudEvents。稍后,我们将连接 Knative 函数以从代理接收事件。
注意
AWS S3 属性存储在一个秘密中,该秘密通过 trait.camel.apache.org/mount.config
注解挂载到 Pipe
中。
创建 Knative 函数作为消费者¶
为了使用 Knative 函数从 Knative 代理消费消息,我们需要创建一个简单的 Golang 函数。由于有效负载作为 CloudEvents 发送到函数,因此我们使用内置的 cloudevents
模板,通过执行以下命令
$ func create -l go -t cloudevents s3-logger
这将在您当前目录的 s3-logger
文件夹中创建一个新项目,其中包含一个用于 Knative 函数的 Golang 文件
package function
import (
"context"
"fmt"
"github.com/cloudevents/sdk-go/v2/event"
)
// Handle an event.
func Handle(ctx context.Context, ce event.Event) (*event.Event, error) {
/*
* YOUR CODE HERE
*
* Try running `go test`. Add more test as you code in `handle_test.go`.
*/
fmt.Println("Received event")
fmt.Println(ce) // echo to local output
return &e, nil // echo to caller
}
现在我们可以修改 Handle
函数以打印出 CloudEvent 的一些属性
func Handle(ctx context.Context, ce event.Event) (*event.Event, error) {
fmt.Println("Received S3 event notification")
fmt.Println("CloudEvent Subject attribute: " + ce.Subject())
fmt.Println("CloudEvent Source attribute: " + ce.Source())
// Some processing of the payload of the CloudEvent...
return nil, nil
上面的 Handle
函数只是打印文件名,该文件名由 subject
属性表示,以及存储为接收到的 CloudEvent 的 source
属性的存储桶的完全限定名称。
注意
目前,AWS-S3-Source 发送所有数据,因此完整的文件也可以通过 data
属性访问。
将 Knative 函数订阅到 AWS S3 事件¶
为了能够接收 S3 事件通知,我们需要将函数订阅到提供事件的代理。但是,我们如何知道哪些事件可用呢?为此,我们检查系统中有哪些 Knative 事件类型可用
$ kubectl get eventtypes.eventing.knative.dev
NAME TYPE SOURCE SCHEMA REFERENCE NAME REFERENCE KIND DESCRIPTION READY REASON
et-my-broker-53bfa9803446c35c5a612c5a44a1c263 org.apache.camel.event.aws.s3.getObject aws.s3.bucket.<bucketname> my-broker Broker Event Type auto-created by controller True
现在我们知道,在我们的命名空间中,my-broker
代理上存在 org.apache.camel.event.aws.s3.getObject
事件。
现在我们可以执行函数对该事件类型从给定代理进行的订阅
$ func subscribe --filter type=org.apache.camel.event.aws.s3.getObject --source my-broker
这将更新项目的 func.yaml
元数据,当函数被部署时,CLI 将构建并部署消费者,并为其创建一个具有匹配过滤器参数的 Knative Trigger
$ func deploy
部署后,在 Knative 函数的终端上,对于每个 S3 事件都可见以下输出
Received S3 event notification
CloudEvent Subject attribute: my-file.txt
CloudEvent Source attribute: aws.s3.bucket.<bucketname>
结论¶
使用 Knative 事件、函数和 Apache Camel K,可以将来自第三方云服务(如 AWS S3)的通知触发到在您自己的本地 Kubernetes 集群上运行的函数。