跳至内容

Knative Apache Kafka Sink

KafkaSink 是一个 Apache Kafka 原生的 Sink 实现,它将传入的 CloudEvent 持久化到可配置的 Apache Kafka 主题。本页展示如何安装和配置 Knative KafkaSink

先决条件

您必须有权访问已安装 Knative Eventing 的 Kubernetes 集群。

安装

  1. 安装 Kafka 控制器

    kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.16.0/eventing-kafka-controller.yaml
    
  2. 安装 KafkaSink 数据平面

    kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.16.0/eventing-kafka-sink.yaml
    
  3. 验证 kafka-controllerkafka-sink-receiver 部署是否正在运行

    kubectl get deployments.apps -n knative-eventing
    

    示例输出

    NAME                           READY   UP-TO-DATE   AVAILABLE   AGE
    eventing-controller            1/1     1            1           10s
    eventing-webhook               1/1     1            1           9s
    kafka-controller               1/1     1            1           3s
    kafka-sink-receiver            1/1     1            1           5s
    

KafkaSink 示例

KafkaSink 对象类似于以下内容

apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
  name: my-kafka-sink
  namespace: default
spec:
  topic: mytopic
  bootstrapServers:
   - my-cluster-kafka-bootstrap.kafka:9092

输出主题内容模式

CloudEvent 规范定义了两种传输 CloudEvent 的模式:结构化模式和二进制模式。

"结构化模式消息"是指事件使用独立的事件格式进行完全编码并存储在消息体中的消息。

结构化内容模式将事件元数据和数据保存在有效载荷中,允许在多个路由跳跃和多个协议之间简单地转发同一个事件。

"二进制模式消息"是指将事件数据存储在消息体中,并将事件属性存储为消息元数据的一部分的消息。

二进制内容模式适用于任何形状的事件数据,并允许在没有转码工作的情况下进行高效传输。

具有指定 contentMode 的 KafkaSink 对象类似于以下内容

apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
  name: my-kafka-sink
  namespace: default
spec:
  topic: mytopic
  bootstrapServers:
   - my-cluster-kafka-bootstrap.kafka:9092

  # CloudEvent content mode of Kafka messages sent to the topic.
  # Possible values:
  # - structured
  # - binary
  #
  # default: binary.
  #
  # CloudEvent spec references:
  # - https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/spec.md#message
  # - https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/bindings/kafka-protocol-binding.md#33-structured-content-mode
  # - https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/bindings/kafka-protocol-binding.md#32-binary-content-mode
  contentMode: binary # or structured

安全

Knative 支持以下 Apache Kafka 安全功能

启用安全功能

要启用安全功能,您可以在 KafkaSink spec 中引用一个密钥

apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
   name: my-kafka-sink
   namespace: default
spec:
   topic: mytopic
   bootstrapServers:
      - my-cluster-kafka-bootstrap.kafka:9092
   auth:
     secret:
       ref:
         name: my_secret

注意

密钥 my_secret 必须存在于与 KafkaSink 相同的命名空间中。证书和密钥必须采用 PEM 格式

使用 SASL 进行身份验证

Knative 支持以下 SASL 机制

  • PLAIN
  • SCRAM-SHA-256
  • SCRAM-SHA-512

要使用特定的 SASL 机制,请将 <sasl_mechanism> 替换为您选择的机制。

使用 SASL 进行身份验证,不进行加密

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SASL_PLAINTEXT \
  --from-literal=sasl.mechanism=<sasl_mechanism> \
  --from-literal=user=<my_user> \
  --from-literal=password=<my_password>

使用 SASL 进行身份验证,并使用 SSL 进行加密

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SASL_SSL \
  --from-literal=sasl.mechanism=<sasl_mechanism> \
  --from-file=ca.crt=caroot.pem \
  --from-literal=user=<my_user> \
  --from-literal=password=<my_password>

使用 SSL 进行加密,不进行客户端身份验证

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SSL \
  --from-file=ca.crt=<my_caroot.pem_file_path> \
  --from-literal=user.skip=true

使用 SSL 进行身份验证和加密

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SSL \
  --from-file=ca.crt=<my_caroot.pem_file_path> \
  --from-file=user.crt=<my_cert.pem_file_path> \
  --from-file=user.key=<my_key.pem_file_path>

注意

可以省略 ca.crt 以启用回退并使用系统的根 CA 集。

Kafka Producer 配置

Kafka Producer 是负责将事件发送到 Apache Kafka 集群的组件。您可以通过修改 knative-eventing 命名空间中的 config-kafka-sink-data-plane ConfigMap 来更改集群中 Kafka Producer 的配置。

此 ConfigMap 中可用设置的文档可在 Apache Kafka 网站 上找到,特别是 Producer 配置

为数据平面组件启用调试日志记录

要为数据平面组件启用调试日志记录,请在 kafka-config-logging ConfigMap 中将日志级别更改为 DEBUG

  1. 创建一个包含以下内容的 YAML 文件,作为 kafka-config-logging ConfigMap。

    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: kafka-config-logging
      namespace: knative-eventing
    data:
      config.xml: |
        <configuration>
          <appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
            <encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
          </appender>
          <root level="DEBUG">
            <appender-ref ref="jsonConsoleAppender"/>
          </root>
        </configuration>
    
  2. 通过运行以下命令应用 YAML 文件

    kubectl apply -f <filename>.yaml
    
    其中 <filename> 是您在上一步中创建的文件的名称。

  3. 重新启动 kafka-sink-receiver

    kubectl rollout restart deployment -n knative-eventing kafka-sink-receiver
    

我们使用分析和 Cookie 来了解网站流量。有关您使用我们网站的信息将与 Google 共享,以用于该目的。 了解更多信息。