用于 Apache Kafka 的 Knative 源¶
KafkaSource
读取存储在现有 Apache Kafka 主题中的消息,并将这些消息作为 CloudEvents 通过 HTTP 发送到其配置的 sink
。KafkaSource
保留主题分区中存储的消息顺序。它通过等待从 sink
收到成功响应来实现这一点,然后才会传递同一分区中的下一条消息。
安装 KafkaSource 控制器¶
-
通过输入以下命令来安装
KafkaSource
控制器kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.16.0/eventing-kafka-controller.yaml
-
通过输入以下命令来安装 Kafka Source 数据平面
kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.16.0/eventing-kafka-source.yaml
-
通过输入以下命令来验证
kafka-controller
和kafka-source-dispatcher
是否正在运行kubectl get deployments.apps,statefulsets.apps -n knative-eventing
示例输出
NAME READY UP-TO-DATE AVAILABLE AGE deployment.apps/kafka-controller 1/1 1 1 3s NAME READY AGE statefulset.apps/kafka-source-dispatcher 1/1 3s
可选:创建 Kafka 主题¶
注意
创建 Kafka 主题部分假设您使用 Strimzi 来操作 Apache Kafka,但是可以使用 Apache Kafka CLI 或任何其他工具来复制等效操作。
如果您使用的是 Strimzi
-
创建
KafkaTopic
YAML 文件apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: knative-demo-topic namespace: kafka labels: strimzi.io/cluster: my-cluster spec: partitions: 3 replicas: 1 config: retention.ms: 7200000 segment.bytes: 1073741824
-
通过运行以下命令来部署
KafkaTopic
YAML 文件其中kubectl apply -f <filename>.yaml
<filename>
是您的KafkaTopic
YAML 文件的名称。示例输出
kafkatopic.kafka.strimzi.io/knative-demo-topic created
-
通过运行以下命令来确保
KafkaTopic
正在运行kubectl -n kafka get kafkatopics.kafka.strimzi.io
示例输出
NAME CLUSTER PARTITIONS REPLICATION FACTOR knative-demo-topic my-cluster 3 1
创建服务¶
-
创建
event-display
服务作为 YAML 文件apiVersion: serving.knative.dev/v1 kind: Service metadata: name: event-display namespace: default spec: template: spec: containers: - # This corresponds to # https://github.com/knative/eventing/tree/main/cmd/event_display/main.go image: gcr.io/knative-releases/knative.dev/eventing/cmd/event_display
-
通过运行以下命令来应用 YAML 文件
其中kubectl apply -f <filename>.yaml
<filename>
是您在上一步中创建的文件的名称。示例输出
service.serving.knative.dev/event-display created
-
通过运行以下命令来确保服务 Pod 正在运行
kubectl get pods
Pod 名称以
event-display
为前缀NAME READY STATUS RESTARTS AGE event-display-00001-deployment-5d5df6c7-gv2j4 2/2 Running 0 72s
Kafka 事件源¶
-
使用引导服务器、主题等相应地修改
source/event-source.yaml
apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: kafka-source spec: consumerGroup: knative-group bootstrapServers: - my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace topics: - knative-demo-topic sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display
-
部署事件源
kubectl apply -f event-source.yaml
示例输出
kafkasource.sources.knative.dev/kafka-source created
-
验证 KafkaSource 是否已准备就绪
kubectl get kafkasource kafka-source
示例输出
NAME TOPICS BOOTSTRAPSERVERS READY REASON AGE kafka-source ["knative-demo-topic"] ["my-cluster-kafka-bootstrap.kafka:9092"] True 26h
扩展¶
要调度更多或更少的消费者,可以扩展 KafkaSource,并将它们分配到不同的调度程序 Pod 中。kafkasource 状态在状态.placements 键下显示此类分配。
您可以使用 kubectl 按以下符号扩展 KafkaSource
kubectl scale kafkasource -n <ns> <kafkasource-name> --replicas=<number-of-replicas> # e.g. 12 replicas for a topic with 12 partitions
或者,如果您使用的是 GitOps 方法,则可以添加 consumers
键(如下例所示)并将其提交到您的存储库
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source
spec:
consumerGroup: knative-group
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
consumers: 12 # Number of replicas
topics:
- knative-demo-topic
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
使用 KEDA 自动扩展¶
您可以使用 KEDA 对 KafkaSource 进行自动扩展。有关如何启用和配置此功能的信息,请阅读 此处的说明。
验证¶
-
向 Apache Kafka 主题生成一条消息(
{"msg": "This is a test!"}
),如以下示例所示kubectl -n kafka run kafka-producer -ti --image=strimzi/kafka:0.14.0-kafka-2.3.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic knative-demo-topic
提示
如果您没有看到命令提示符,请尝试按 Enter 键。
-
验证服务是否已从事件源接收到消息
kubectl logs --selector='serving.knative.dev/service=event-display' -c user-container
示例输出
☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: dev.knative.kafka.event source: /apis/v1/namespaces/default/kafkasources/kafka-source#my-topic subject: partition:0#564 id: partition:0/offset:564 time: 2020-02-10T18:10:23.861866615Z datacontenttype: application/json Extensions, key: Data, { "msg": "This is a test!" }
处理交付失败¶
KafkaSource
实施了 Delivery
规范,允许您为其配置事件交付参数,这些参数在事件无法交付的情况下适用
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source
spec:
consumerGroup: knative-group
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
topics:
- knative-demo-topic
delivery:
deadLetterSink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: example-sink
backoffDelay: <duration>
backoffPolicy: <policy-type>
retry: <integer>
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
delivery
API 在 处理交付失败 章节中进行了讨论。
可选:指定键反序列化器¶
当 KafkaSource
从 Kafka 接收消息时,它会将键转储到名为 Key
的事件扩展中,并将 Kafka 消息头转储到以 kafkaheader
开头的扩展中。
您可以在四种类型中指定键反序列化器
string
(默认)表示 UTF-8 编码字符串int
表示 32 位和 64 位有符号整数float
表示 32 位和 64 位浮点数byte-array
表示 Base64 编码的字节数组
要指定键反序列化器,请将标签 kafkasources.sources.knative.dev/key-type
添加到 KafkaSource
定义中,如以下示例所示
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source
labels:
kafkasources.sources.knative.dev/key-type: int
spec:
consumerGroup: knative-group
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
topics:
- knative-demo-topic
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
可选:指定初始偏移量¶
默认情况下,KafkaSource
从每个分区中的最新偏移量开始消费。如果您要从最早的偏移量开始消费,请将 initialOffset 字段设置为 earliest
,例如
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source
spec:
consumerGroup: knative-group
initialOffset: earliest
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
topics:
- knative-demo-topic
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
注意
initialOffset
的有效值为 earliest
和 latest
。任何其他值都会导致验证错误。此字段仅在该消费者组没有已提交的偏移量时才适用。
连接到支持 TLS 的 Kafka Broker¶
KafkaSource 支持 TLS 和 SASL 身份验证方法。要启用 TLS 身份验证,您必须拥有以下文件
- CA 证书
- 客户端证书和密钥
KafkaSource 期望这些文件采用 PEM 格式。如果它们采用其他格式(例如 JKS),请将其转换为 PEM。
-
通过运行以下命令来在 KafkaSource 将要设置的命名空间中创建证书文件作为密钥
kubectl create secret generic cacert --from-file=caroot.pem
kubectl create secret tls kafka-secret --cert=certificate.pem --key=key.pem
-
应用 KafkaSource。相应地修改
bootstrapServers
和topics
字段。apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: kafka-source-with-tls spec: net: tls: enable: true cert: secretKeyRef: key: tls.crt name: kafka-secret key: secretKeyRef: key: tls.key name: kafka-secret caCert: secretKeyRef: key: caroot.pem name: cacert consumerGroup: knative-group bootstrapServers: - my-secure-kafka-bootstrap.kafka:443 topics: - knative-demo-topic sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display
为 KafkaSources 启用 SASL¶
简单身份验证和安全层 (SASL) 由 Apache Kafka 用于身份验证。如果您在集群上使用 SASL 身份验证,则用户必须向 Knative 提供凭据以与 Kafka 集群通信,否则无法生成或使用事件。
先决条件¶
- 您有权访问具有简单身份验证和安全层 (SASL) 的 Kafka 集群。
步骤¶
-
通过运行以下命令来创建使用 Kafka 集群的 SASL 信息的密钥
STRIMZI_CRT=$(kubectl -n kafka get secret example-cluster-cluster-ca-cert --template='{{index.data "ca.crt"}}' | base64 --decode )
SASL_PASSWD=$(kubectl -n kafka get secret example-user --template='{{index.data "password"}}' | base64 --decode )
kubectl create secret -n default generic <secret_name> \ --from-literal=ca.crt="$STRIMZI_CRT" \ --from-literal=password="$SASL_PASSWD" \ --from-literal=saslType="SCRAM-SHA-512" \ --from-literal=user="example-user"
-
创建或修改 KafkaSource,使其包含以下规范选项
apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: example-source spec: ... net: sasl: enable: true user: secretKeyRef: name: <secret_name> key: user password: secretKeyRef: name: <secret_name> key: password type: secretKeyRef: name: <secret_name> key: saslType tls: enable: true caCert: secretKeyRef: name: <secret_name> key: ca.crt ...
其中
<secret_name>
是在上一步中生成的密钥的名称。
清理步骤¶
-
删除 Kafka 事件源
kubectl delete -f source/source.yaml kafkasource.sources.knative.dev
示例输出
"kafka-source" deleted
-
删除
event-display
服务kubectl delete -f source/event-display.yaml service.serving.knative.dev
示例输出
"event-display" deleted
-
可选:删除 Apache Kafka 主题
kubectl delete -f kafka-topic.yaml
示例输出
kafkatopic.kafka.strimzi.io "knative-demo-topic" deleted