Knative Apache Kafka Sink¶
KafkaSink
是一个 Apache Kafka 原生的 Sink 实现,它将传入的 CloudEvent 持久化到可配置的 Apache Kafka 主题。本页展示如何安装和配置 Knative KafkaSink
。
先决条件¶
您必须有权访问已安装 Knative Eventing 的 Kubernetes 集群。
安装¶
-
安装 Kafka 控制器
kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.16.0/eventing-kafka-controller.yaml
-
安装 KafkaSink 数据平面
kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.16.0/eventing-kafka-sink.yaml
-
验证
kafka-controller
和kafka-sink-receiver
部署是否正在运行kubectl get deployments.apps -n knative-eventing
示例输出
NAME READY UP-TO-DATE AVAILABLE AGE eventing-controller 1/1 1 1 10s eventing-webhook 1/1 1 1 9s kafka-controller 1/1 1 1 3s kafka-sink-receiver 1/1 1 1 5s
KafkaSink 示例¶
KafkaSink 对象类似于以下内容
apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
name: my-kafka-sink
namespace: default
spec:
topic: mytopic
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
输出主题内容模式¶
CloudEvent 规范定义了两种传输 CloudEvent 的模式:结构化模式和二进制模式。
"结构化模式消息"是指事件使用独立的事件格式进行完全编码并存储在消息体中的消息。
结构化内容模式将事件元数据和数据保存在有效载荷中,允许在多个路由跳跃和多个协议之间简单地转发同一个事件。
"二进制模式消息"是指将事件数据存储在消息体中,并将事件属性存储为消息元数据的一部分的消息。
二进制内容模式适用于任何形状的事件数据,并允许在没有转码工作的情况下进行高效传输。
具有指定 contentMode
的 KafkaSink 对象类似于以下内容
apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
name: my-kafka-sink
namespace: default
spec:
topic: mytopic
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
# CloudEvent content mode of Kafka messages sent to the topic.
# Possible values:
# - structured
# - binary
#
# default: binary.
#
# CloudEvent spec references:
# - https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/spec.md#message
# - https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/bindings/kafka-protocol-binding.md#33-structured-content-mode
# - https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/bindings/kafka-protocol-binding.md#32-binary-content-mode
contentMode: binary # or structured
安全¶
Knative 支持以下 Apache Kafka 安全功能
启用安全功能¶
要启用安全功能,您可以在 KafkaSink spec 中引用一个密钥
apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
name: my-kafka-sink
namespace: default
spec:
topic: mytopic
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
auth:
secret:
ref:
name: my_secret
注意
密钥 my_secret
必须存在于与 KafkaSink 相同的命名空间中。证书和密钥必须采用 PEM
格式。
使用 SASL 进行身份验证¶
Knative 支持以下 SASL 机制
PLAIN
SCRAM-SHA-256
SCRAM-SHA-512
要使用特定的 SASL 机制,请将 <sasl_mechanism>
替换为您选择的机制。
使用 SASL 进行身份验证,不进行加密¶
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SASL_PLAINTEXT \
--from-literal=sasl.mechanism=<sasl_mechanism> \
--from-literal=user=<my_user> \
--from-literal=password=<my_password>
使用 SASL 进行身份验证,并使用 SSL 进行加密¶
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SASL_SSL \
--from-literal=sasl.mechanism=<sasl_mechanism> \
--from-file=ca.crt=caroot.pem \
--from-literal=user=<my_user> \
--from-literal=password=<my_password>
使用 SSL 进行加密,不进行客户端身份验证¶
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SSL \
--from-file=ca.crt=<my_caroot.pem_file_path> \
--from-literal=user.skip=true
使用 SSL 进行身份验证和加密¶
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SSL \
--from-file=ca.crt=<my_caroot.pem_file_path> \
--from-file=user.crt=<my_cert.pem_file_path> \
--from-file=user.key=<my_key.pem_file_path>
注意
可以省略 ca.crt
以启用回退并使用系统的根 CA 集。
Kafka Producer 配置¶
Kafka Producer 是负责将事件发送到 Apache Kafka 集群的组件。您可以通过修改 knative-eventing
命名空间中的 config-kafka-sink-data-plane
ConfigMap 来更改集群中 Kafka Producer 的配置。
此 ConfigMap 中可用设置的文档可在 Apache Kafka 网站 上找到,特别是 Producer 配置。
为数据平面组件启用调试日志记录¶
要为数据平面组件启用调试日志记录,请在 kafka-config-logging
ConfigMap 中将日志级别更改为 DEBUG
。
-
创建一个包含以下内容的 YAML 文件,作为
kafka-config-logging
ConfigMap。apiVersion: v1 kind: ConfigMap metadata: name: kafka-config-logging namespace: knative-eventing data: config.xml: | <configuration> <appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender"> <encoder class="net.logstash.logback.encoder.LogstashEncoder"/> </appender> <root level="DEBUG"> <appender-ref ref="jsonConsoleAppender"/> </root> </configuration>
-
通过运行以下命令应用 YAML 文件
其中kubectl apply -f <filename>.yaml
<filename>
是您在上一步中创建的文件的名称。 -
重新启动
kafka-sink-receiver
kubectl rollout restart deployment -n knative-eventing kafka-sink-receiver