从 CloudEvents 到 Apache Kafka 记录,第二部分 ¶
发布时间:2023-04-03,修订时间:2024-01-17
从 CloudEvents 到 Apache Kafka 记录,第二部分¶
作者:Daniele Zonca,红帽高级首席软件工程师,Matthias Weßendorf,红帽高级首席软件工程师
在本博文中,您将学习如何轻松地将传入的 CloudEvents 存储到 Apache Kafka 主题,以及如何使用 Knative 代理和触发器 API 进行基于内容的事件路由。
这篇文章的第一部分解释了 Knative 如何帮助将 CloudEvents 接收到 Apache Kafka 主题以供进一步处理。文章展示了如何使用 Apache Kafka 生态系统中的标准工具(例如kcat
CLI)处理 CloudEvents Kafka 记录。此外,文章还解释了 CloudEvents 的二进制内容模式(Knative 的默认模式)的**优势**。现在,在这篇文章中,我们将展示一种不同的处理已接入 CloudEvents 的方法,通过利用 Knative 代理和触发器 API 进行事件路由。
设置 Apache Kafka 和 Knative 代理¶
为了使用 Apache Kafka 的 Knative 代理,您需要先安装 Apache Kafka。对于本文,我们使用本地 Apache Kafka 安装,由Strimzi提供支持,如此处所述。文章还讨论了如何为本地开发环境安装 Apache Kafka 的 Knative 代理。
注意
有关 Apache Kafka 的 Knative 代理的生产就绪配置,请参阅此博客。
设置 Knative 代理组件¶
上面提到的文章将所有 Knative Broker
配置为 Kafka
类,因此创建新的 Apache Kafka 代理非常简单
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
name: my-demo-kafka-broker
annotations:
eventing.knative.dev/broker.class: Kafka
spec: {}
Broker
是一种可寻址
类型,它可以通过 HTTP 将传入的 CloudEvents 发送到其 status.address.url
字段中定义的地址
kubectl get brokers.eventing.knative.dev
NAME URL AGE READY REASON
my-demo-kafka-broker http://kafka-broker-ingress.knative-eventing.svc.cluster.local/default/my-demo-kafka-broker 7s True
但我们看不到任何关于 Apache Kafka 主题的信息。原因是代理实现使用的主题被认为是实现细节。让我们看看实际的代理对象
kubectl get brokers.eventing.knative.dev my-demo-kafka-broker -o yaml
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
annotations:
eventing.knative.dev/broker.class: Kafka
name: my-demo-kafka-broker
namespace: default
spec:
config:
apiVersion: v1
kind: ConfigMap
name: kafka-broker-config
namespace: knative-eventing
status:
address:
url: http://kafka-broker-ingress.knative-eventing.svc.cluster.local/default/my-demo-kafka-broker
annotations:
bootstrap.servers: my-cluster-kafka-bootstrap.kafka:9092
default.topic: knative-broker-default-my-demo-kafka-broker
default.topic.partitions: "10"
default.topic.replication.factor: "1"
以上给出了 YAML 表示的简化版本,但请注意 spec.config
:它指向集群中所有启用了 Kafka 的 Knative 代理的默认配置。kafka-broker-config
ConfigMap 通过定义诸如 partition
或 replication factor
之类的旋钮来配置底层主题的概念。但是,在代理的 status
中,您可以看到主题的名称:knative-broker-default-my-demo-kafka-broker
。名称遵循约定 knative-broker-<namespace>-<broker-name>
。
注意
默认情况下,Knative Kafka 代理会创建自己的内部主题,但是此操作在某些环境中可能会受到限制。对于这种情况和任何其他类似的用例,可以自带主题。
设置消费者应用程序¶
现在我们有了 Broker
,它将充当接收 CloudEvents 的 HTTP 端点,现在是时候定义一个接收和处理 CloudEvents 的应用程序了
apiVersion: v1
kind: Pod
metadata:
name: log-receiver
labels:
app: log-receiver
spec:
containers:
- name: log-receiver
image: gcr.io/knative-releases/knative.dev/eventing/cmd/event_display
imagePullPolicy: Always
ports:
- containerPort: 8080
protocol: TCP
name: log-receiver
---
apiVersion: v1
kind: Service
metadata:
name: log-receiver
spec:
selector:
app: log-receiver
ports:
- port: 80
protocol: TCP
targetPort: log-receiver
name: http
在这里,我们定义了一个简单的 Pod
及其 Service
,它指向一个 HTTP 服务器,该服务器接收 CloudEvents。如您所见,这不是 Kafka 特定的消费者,任何 HTTP Web 服务器,无论使用哪种语言,都可以用于处理来自 Apache Kafka 主题的 CloudEvents。
消费者应用程序的开发人员不需要了解如何编写 Kafka 消费者应用程序的任何细节。Knative 及其 Apache Kafka 的代理实现通过充当消费者应用程序的 HTTP 代理来抽象化这一点。这极大地简化了这些专注且独立的消费者应用程序的工程工作。
定义使用 Apache Kafka 的消息路由规则¶
Apache Kafka 中的主题通常用于包含可能引用同一边界上下文(如果您正在应用领域驱动设计原则)的不同类型的事件。这意味着每个消费者都将接收所有事件,只是为了过滤和处理其中的一部分。
这是 Apache Kafka 协议的缺点之一:没有用于路由记录的直接过滤器 API。为了处理或过滤事件并将它们路由到不同的目标或其他 Kafka 主题,需要实现一个完整的 Kafka 消费者客户端。或者需要使用其他库,例如Kafka Streams。
如您所想,这是一种非常常见的模式,Knative 事件将其作为 API 的一部分。Trigger
API 定义了一套强大的过滤器,用于根据 CloudEvents 的元数据路由 CloudEvents
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: log-trigger
spec:
broker: my-demo-kafka-broker
filter:
attributes:
type: <cloud-event-type>
<ce-extension>: <ce-extension-value>
subscriber:
ref:
apiVersion: v1
kind: Service
name: log-receiver
我们看到一个触发器定义了一组 filter
规则,如果这些规则匹配,则来自 Kafka 主题的 CloudEvent 将使用 HTTP 路由到我们引用的 Web 服务器应用程序。在 Knative 中还有一个实验性功能,它可以使用 Trigger
API 上的 filters
字段启用新的类似 SQL 的过滤,该字段实现了CloudEvents 订阅 API。
注意
强烈建议在 Trigger
的 CloudEvents 元数据属性和扩展中应用过滤器属性。如果没有提供过滤器,则所有发生的 CloudEvents 都会路由到引用的订阅者,这是一种不好的应用程序设计,除非您明确希望为代理中的所有事件创建日志记录器。
对于由 Apache Kafka 的 Knative 代理执行的 Trigger
,还可以使用 Trigger
上的 kafka.eventing.knative.dev/delivery.order
注释来配置传递事件的顺序。
Kn 事件插件¶
为了发送事件,我们也不需要使用 Apache Kafka Producer API,因为我们正在使用 HTTP 将 CloudEvents 接入 Broker。作为一种选择,我们可以在 Kubernetes 集群中使用一个安装了 curl
程序的 Pod
,并将事件发送到 Broker
的 URL
。但是,我们使用的是 kn
客户端 CLI 及其 事件插件 从命令行管理云事件。
kn event send \
--to Broker:eventing.knative.dev/v1:my-demo-kafka-broker \
--type=dev.knative.blog.post \
-f message="Hello"
通过上述命令,我们将 message
作为具有 dev.knative.blog.post
类型的 CloudEvents 发送到我们的 my-demo-kafka-broker
对象。kn event
插件根据此调用生成有效的 CloudEvent,并将其直接发送到所引用组件的可寻址 URL,在本例中是我们的 Broker
。
结论¶
此示例展示了从发送事件到接收事件的简单流程。消息会持久存储在 Knative Broker 后面的 Kafka 主题上。从那里,任何标准的 Apache Kafka API 也都可以使用它。但是,Knative 提供的抽象简化了事件驱动应用程序的开发过程。无需太多额外的配置,还可以根据事件的元数据对其进行过滤和路由。
此外,采用 Trigger
/Filter
不仅仅是避免在所有消费者中重新实现相同模式的一种方式,它还使整个消息处理更加高效,因为只有在必要时才会调用消费者,如果它是一个 Knative 服务,它甚至可以缩放到零!