跳至内容

从 CloudEvents 到 Apache Kafka 记录,第一部分

发布时间:2023-03-08 ,  修订时间:2024-01-17

从 CloudEvents 到 Apache Kafka 记录,第一部分

作者:Daniele Zonca,红帽高级首席软件工程师,Matthias Weßendorf,红帽首席软件工程师

在这篇博文中,您将学习如何使用 KafkaSink 组件轻松地将传入的 CloudEvents 存储到 Apache Kafka 主题。

Apache Kafka 用于许多不同的用例,但采用 Kafka 协议的必要性可能是一个障碍,尤其是在第三方组件扩展可能性有限的情况下。

有一些事件生产者不支持 Kafka 协议,而 HTTP 可能是一个更灵活的选择。Strimzi 项目有一个 Bridge 组件,它通过 HTTP 公开生产者/消费者 API,但它是特定于 Kafka 的,因此本质上是相同的协议(具有消费者组、偏移量等)。

您认为 CloudEvents 要求可能是一个问题吗?CloudEvents 定义了一个 HTTP 格式绑定,特别是在二进制模式下,大多数 HTTP 负载可能已经是有效的 CloudEvents!

The Knative Sink for Apache Kafka 是一个 Kafka 本地实现,用于 CloudEvent 入站并将其作为 Apache Kafka 记录持久化到可配置的主题。

设置 Apache Kafka 主题

为了使用 KafkaSink 组件,您需要有一个 Apache Kafka 主题,并且对其具有适当的访问权限。对于这篇文章,我们使用的是 Strimzi 提供的本地 Apache Kafka 安装,如 此处 所述。Apache Kafka 集群在您的 Kubernetes 环境中运行后,就可以创建主题了。为此,我们使用 Strimzi 的 KafkaTopic CRD 以标准的声明式 Kubernetes 方式创建主题

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: my-topic
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 1
  replicas: 1

这将创建一个简单的主题,partitionsreplicas 都设置为 1,这在生产环境中推荐。

注意:有关 Knative Kafka Broker 的生产就绪配置,请参阅 这篇博客

将清单应用于 Kubernetes 集群后,可以像下面这样查询它

kubectl get kafkatopics.kafka.strimzi.io -n kafka
NAME       CLUSTER      PARTITIONS   REPLICATION FACTOR   READY
my-topic   my-cluster   1            1                    True

设置 KafkaSink 组件

Knative Sink for Apache Kafka 的安装说明 在此处

接下来,我们将创建一个 KafkaSink 实例,将其绑定到我们基于本地 Strimzi 的 Apache Kafka 集群上的 my-topic 主题

apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
  name: my-kafka-sink
  namespace: default
spec:
  topic: my-topic
  bootstrapServers:
   - my-cluster-kafka-bootstrap.kafka:9092

KafkaSink 是一种 Addressable 类型,它可以通过 HTTP 接收传入的 CloudEvents,这些事件的地址定义在它们的 status.address.url 字段中

kubectl get kafkasinks.eventing.knative.dev
NAME            URL                                                                                  AGE   READY   REASON
my-kafka-sink   http://kafka-sink-ingress.knative-eventing.svc.cluster.local/default/my-kafka-sink   13s   True

Kn 事件插件

此时,我们只需要在 Kubernetes 集群中使用一个安装了 curl 程序的 pod,并将事件发送到 KafkaSinkURL

但是,我们将使用 kn 客户端 CLI 及其 事件插件 来管理命令行的云事件

kn event send \
  --to KafkaSink:eventing.knative.dev/v1alpha1:my-kafka-sink \
  --type=dev.knative.blog.post \
  -f message="Hello"

使用上面的命令,我们将 message 作为带有 dev.knative.blog.post 的 CloudEvents 发送到我们的 my-kafka-sink 对象。kn event 插件从这个调用生成一个有效的 CloudEvents,并将其直接发送到引用的 sink 的可寻址 URL。

使用 kcat 处理事件

kcat 是以前称为 kafkacat 的项目,它提供命令行模式用于从 Apache Kafka 生产和消费记录。

这使我们可以消费存储在 Apache Kafka 集群的 my-topic 主题中的 Apache Kafka 记录(作为 CloudEvent)

kubectl run kcat -ti --image=docker.io/edenhill/kcat:1.7.1 --rm=true --restart=Never -- -C -b my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092 -t my-topic -f '\nHeaders: %h\nMessage value: %s\n\n '
上面的命令将打印 Kafka 记录的所有标头及其值,例如

Headers: ce_specversion=1.0,ce_id=ce5026d0-234e-4997-975a-c005f515fedf,ce_source=kn-event/v1.9.0,ce_type=ype=dev.knative.blog.post,content-type=application/json,ce_time=2023-02-13T12:52:20.654526321Z
Message value: {"message":"Hello"}

% Reached end of topic my-topic [0] at offset 2

CloudEvents 二进制模式

需要注意的是,KafkaSink 使用 二进制内容模式 将传入的 CloudEvents 存储为 Kafka 记录,因为它在传输或路由方面更有效,因为它对传输或路由进行了优化,并且避免了 JSON 解析。使用 二进制内容模式 意味着所有 CloudEvents 属性和扩展名都映射为 Kafka 记录的标头,而 CloudEvent 的 data 对应于 Kafka 记录的实际值。这是使用 二进制内容模式 相对于 结构化内容模式 的另一个好处,因为它不那么阻塞,因此与不理解 CloudEvents 的系统兼容。

展望

存储在 Knative KafkaSink 组件支持的 Kafka 主题中的消息可以轻松地被 Apache Kafka 社区中更大的生态系统中的任何消费者应用程序消费。本文的下一篇文章将展示如何使用 Knative Broker 的 Apache Kafka 实现来存储传入事件,并利用 Knative Eventing 工具进行基于 CloudEvents 元数据的路由,因为此过滤功能本身并非直接构建到 Apache Kafka 中。

我们使用分析和 Cookie 来了解网站流量。出于此目的,您的网站使用信息将与 Google 共享。 了解更多。