跳至内容

使用 Knative 事件、函数和 Apache Camel K 消费 AWS S3 事件

发布时间:2024-06-14

使用 Knative 事件、函数和 Apache Camel K 消费 AWS S3 事件

作者:Matthias Weßendorf,Red Hat 高级首席软件工程师

在本博文中,您将学习如何使用 Knative 事件和函数轻松地使用 Knative 函数从 AWS S3 存储桶中消费事件。博文基于关于 Apache Camel K 和 Knative 的系列文章的第一篇文章

无服务器函数的典型用例之一是响应由外部事件源提供的事件。一个常见的例子是在无服务器函数中接收来自 AWS S3 存储桶的通知。但是,您如何在本地环境中接收这些事件,在该环境中,函数不是在 AWS 上运行,而是在自定义 Kubernetes 设置上运行?

安装

Apache Camel K 的安装 提供了一些选择,例如 CLI、Kustomize、OLM 或 Helm。例如,Helm 安装是

$ helm repo add camel-k https://apache.github.io/camel-k/charts/
$ helm install my-camel-k camel-k/camel-k

除了 Camel K,我们还需要安装 Knative 事件,如文档中所述。

创建 Knative 代理实例

我们使用 Knative 代理作为我们系统的核心,充当事件生产者和事件消费者的事件网格

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  namespace: default
  name: my-broker

现在事件生产者可以向其发送事件,事件消费者可以接收事件。

使用 Kamelets 作为 AWS S3 的事件源

为了将 Kamelet 绑定到 Knative 组件(如我们上面创建的代理),我们使用 Pipe API。管道允许以声明方式将数据从 Kamelet 描述的系统移动到 Knative 资源从 Knative 资源移动到 Kamelet 描述的另一个(外部)系统。

下面是一个 Pipe,它使用现成的 Kamelet,一个 aws-s3-source

apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: aws-s3-source-pipe
  annotations:
    trait.camel.apache.org/mount.config: "secret:aws-s3-credentials"
spec:
  integration:
    dependencies:
    - "camel:cloudevents"
  source:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: aws-s3-source
    properties:
      bucketNameOrArn: "${aws.s3.bucketNameOrArn}"
      accessKey: "${aws.s3.accessKey}"
      secretKey: "${aws.s3.secretKey}"
      region: "${aws.s3.region}"
    dataTypes:
      out:
        scheme: aws2-s3
        format: application-cloudevents
  sink:
    dataTypes:
      in:
        scheme: http
        format: application-cloudevents
    ref:
      kind: Broker
      apiVersion: eventing.knative.dev/v1
      name: my-broker

aws-s3-source Kamelet 被引用为 Pipesource,当引用的存储桶中有活动时,将 CNCF CloudEvents 发送到出站 sink。这里我们使用 Knative 代理,它接受 CloudEvents。稍后,我们将连接 Knative 函数以从代理接收事件。

注意

AWS S3 属性存储在一个秘密中,该秘密通过 trait.camel.apache.org/mount.config 注解挂载到 Pipe 中。

创建 Knative 函数作为消费者

为了使用 Knative 函数从 Knative 代理消费消息,我们需要创建一个简单的 Golang 函数。由于有效负载作为 CloudEvents 发送到函数,因此我们使用内置的 cloudevents 模板,通过执行以下命令

$ func create -l go -t cloudevents s3-logger

这将在您当前目录的 s3-logger 文件夹中创建一个新项目,其中包含一个用于 Knative 函数的 Golang 文件

package function

import (
    "context"
    "fmt"

    "github.com/cloudevents/sdk-go/v2/event"
)

// Handle an event.
func Handle(ctx context.Context, ce event.Event) (*event.Event, error) {
    /*
     * YOUR CODE HERE
     *
     * Try running `go test`.  Add more test as you code in `handle_test.go`.
     */

    fmt.Println("Received event")
    fmt.Println(ce) // echo to local output
    return &e, nil // echo to caller
}

现在我们可以修改 Handle 函数以打印出 CloudEvent 的一些属性

func Handle(ctx context.Context, ce event.Event) (*event.Event, error) {
    fmt.Println("Received S3 event notification")
    fmt.Println("CloudEvent Subject attribute: " + ce.Subject())
    fmt.Println("CloudEvent Source attribute:  " + ce.Source())

  // Some processing of the payload of the CloudEvent...

    return nil, nil

上面的 Handle 函数只是打印文件名,该文件名由 subject 属性表示,以及存储为接收到的 CloudEvent 的 source 属性的存储桶的完全限定名称。

注意

目前,AWS-S3-Source 发送所有数据,因此完整的文件也可以通过 data 属性访问。

将 Knative 函数订阅到 AWS S3 事件

为了能够接收 S3 事件通知,我们需要将函数订阅到提供事件的代理。但是,我们如何知道哪些事件可用呢?为此,我们检查系统中有哪些 Knative 事件类型可用

$ kubectl get eventtypes.eventing.knative.dev 
NAME                                            TYPE                                      SOURCE                       SCHEMA   REFERENCE NAME   REFERENCE KIND   DESCRIPTION                             READY   REASON
et-my-broker-53bfa9803446c35c5a612c5a44a1c263   org.apache.camel.event.aws.s3.getObject   aws.s3.bucket.<bucketname>            my-broker        Broker           Event Type auto-created by controller   True

现在我们知道,在我们的命名空间中,my-broker 代理上存在 org.apache.camel.event.aws.s3.getObject 事件。

现在我们可以执行函数对该事件类型从给定代理进行的订阅

$ func subscribe --filter type=org.apache.camel.event.aws.s3.getObject --source my-broker

这将更新项目的 func.yaml 元数据,当函数被部署时,CLI 将构建并部署消费者,并为其创建一个具有匹配过滤器参数的 Knative Trigger

$ func deploy

部署后,在 Knative 函数的终端上,对于每个 S3 事件都可见以下输出

Received S3 event notification

CloudEvent Subject attribute: my-file.txt
CloudEvent Source attribute:  aws.s3.bucket.<bucketname>

结论

使用 Knative 事件、函数和 Apache Camel K,可以将来自第三方云服务(如 AWS S3)的通知触发到在您自己的本地 Kubernetes 集群上运行的函数。

我们使用分析和 Cookie 来了解网站流量。有关您使用我们网站的信息将与 Google 共享,以用于该目的。了解更多。