跳至内容

用于 Apache Kafka 的 Knative 源

stage version

KafkaSource 读取存储在现有 Apache Kafka 主题中的消息,并将这些消息作为 CloudEvents 通过 HTTP 发送到其配置的 sinkKafkaSource 保留主题分区中存储的消息顺序。它通过等待从 sink 收到成功响应来实现这一点,然后才会传递同一分区中的下一条消息。

安装 KafkaSource 控制器

  1. 通过输入以下命令来安装 KafkaSource 控制器

    kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.16.0/eventing-kafka-controller.yaml
    
  2. 通过输入以下命令来安装 Kafka Source 数据平面

    kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.16.0/eventing-kafka-source.yaml
    
  3. 通过输入以下命令来验证 kafka-controllerkafka-source-dispatcher 是否正在运行

    kubectl get deployments.apps,statefulsets.apps -n knative-eventing
    

    示例输出

    NAME                                           READY   UP-TO-DATE   AVAILABLE   AGE
    deployment.apps/kafka-controller               1/1     1            1           3s
    
    NAME                                       READY   AGE
    statefulset.apps/kafka-source-dispatcher   1/1     3s
    

可选:创建 Kafka 主题

注意

创建 Kafka 主题部分假设您使用 Strimzi 来操作 Apache Kafka,但是可以使用 Apache Kafka CLI 或任何其他工具来复制等效操作。

如果您使用的是 Strimzi

  1. 创建 KafkaTopic YAML 文件

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaTopic
    metadata:
      name: knative-demo-topic
      namespace: kafka
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      partitions: 3
      replicas: 1
      config:
        retention.ms: 7200000
        segment.bytes: 1073741824
    
  2. 通过运行以下命令来部署 KafkaTopic YAML 文件

    kubectl apply -f <filename>.yaml
    
    其中 <filename> 是您的 KafkaTopic YAML 文件的名称。

    示例输出

    kafkatopic.kafka.strimzi.io/knative-demo-topic created
    

  3. 通过运行以下命令来确保 KafkaTopic 正在运行

    kubectl -n kafka get kafkatopics.kafka.strimzi.io
    

    示例输出

    NAME                 CLUSTER      PARTITIONS   REPLICATION FACTOR
    knative-demo-topic   my-cluster   3            1
    

创建服务

  1. 创建 event-display 服务作为 YAML 文件

    apiVersion: serving.knative.dev/v1
    kind: Service
    metadata:
      name: event-display
      namespace: default
    spec:
      template:
        spec:
          containers:
            - # This corresponds to
              # https://github.com/knative/eventing/tree/main/cmd/event_display/main.go
              image: gcr.io/knative-releases/knative.dev/eventing/cmd/event_display
    
  2. 通过运行以下命令来应用 YAML 文件

    kubectl apply -f <filename>.yaml
    
    其中 <filename> 是您在上一步中创建的文件的名称。

    示例输出

    service.serving.knative.dev/event-display created
    

  3. 通过运行以下命令来确保服务 Pod 正在运行

    kubectl get pods
    

    Pod 名称以 event-display 为前缀

    NAME                                            READY     STATUS    RESTARTS   AGE
    event-display-00001-deployment-5d5df6c7-gv2j4   2/2       Running   0          72s
    

Kafka 事件源

  1. 使用引导服务器、主题等相应地修改 source/event-source.yaml

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: kafka-source
    spec:
      consumerGroup: knative-group
      bootstrapServers:
      - my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
      topics:
      - knative-demo-topic
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display
    
  2. 部署事件源

    kubectl apply -f event-source.yaml
    

    示例输出

    kafkasource.sources.knative.dev/kafka-source created
    

  3. 验证 KafkaSource 是否已准备就绪

    kubectl get kafkasource kafka-source
    

    示例输出

    NAME           TOPICS                   BOOTSTRAPSERVERS                            READY   REASON   AGE
    kafka-source   ["knative-demo-topic"]   ["my-cluster-kafka-bootstrap.kafka:9092"]   True             26h
    

扩展

要调度更多或更少的消费者,可以扩展 KafkaSource,并将它们分配到不同的调度程序 Pod 中。kafkasource 状态在状态.placements 键下显示此类分配。

您可以使用 kubectl 按以下符号扩展 KafkaSource

kubectl scale kafkasource -n <ns> <kafkasource-name> --replicas=<number-of-replicas> # e.g. 12 replicas for a topic with 12 partitions

或者,如果您使用的是 GitOps 方法,则可以添加 consumers 键(如下例所示)并将其提交到您的存储库

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: kafka-source
    spec:
      consumerGroup: knative-group
      bootstrapServers:
      - my-cluster-kafka-bootstrap.kafka:9092 
      consumers: 12    # Number of replicas
      topics:
      - knative-demo-topic
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display

使用 KEDA 自动扩展

您可以使用 KEDA 对 KafkaSource 进行自动扩展。有关如何启用和配置此功能的信息,请阅读 此处的说明

验证

  1. 向 Apache Kafka 主题生成一条消息({"msg": "This is a test!"}),如以下示例所示

    kubectl -n kafka run kafka-producer -ti --image=strimzi/kafka:0.14.0-kafka-2.3.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic knative-demo-topic
    

    提示

    如果您没有看到命令提示符,请尝试按 Enter 键。

  2. 验证服务是否已从事件源接收到消息

    kubectl logs --selector='serving.knative.dev/service=event-display' -c user-container
    

    示例输出

    ☁️ cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.kafka.event
      source: /apis/v1/namespaces/default/kafkasources/kafka-source#my-topic
      subject: partition:0#564
      id: partition:0/offset:564
      time: 2020-02-10T18:10:23.861866615Z
      datacontenttype: application/json
    Extensions,
      key:
    Data,
        {
          "msg": "This is a test!"
        }
    

处理交付失败

KafkaSource 实施了 Delivery 规范,允许您为其配置事件交付参数,这些参数在事件无法交付的情况下适用

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: kafka-source
    spec:
      consumerGroup: knative-group
      bootstrapServers:
      - my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
      topics:
      - knative-demo-topic
      delivery:
        deadLetterSink:
          ref:
            apiVersion: serving.knative.dev/v1
            kind: Service
            name: example-sink
        backoffDelay: <duration>
        backoffPolicy: <policy-type>
        retry: <integer>
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display

delivery API 在 处理交付失败 章节中进行了讨论。

可选:指定键反序列化器

KafkaSource 从 Kafka 接收消息时,它会将键转储到名为 Key 的事件扩展中,并将 Kafka 消息头转储到以 kafkaheader 开头的扩展中。

您可以在四种类型中指定键反序列化器

  • string(默认)表示 UTF-8 编码字符串
  • int 表示 32 位和 64 位有符号整数
  • float 表示 32 位和 64 位浮点数
  • byte-array 表示 Base64 编码的字节数组

要指定键反序列化器,请将标签 kafkasources.sources.knative.dev/key-type 添加到 KafkaSource 定义中,如以下示例所示

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source
labels:
  kafkasources.sources.knative.dev/key-type: int
spec:
consumerGroup: knative-group
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
topics:
- knative-demo-topic
sink:
  ref:
    apiVersion: serving.knative.dev/v1
    kind: Service
    name: event-display

可选:指定初始偏移量

默认情况下,KafkaSource 从每个分区中的最新偏移量开始消费。如果您要从最早的偏移量开始消费,请将 initialOffset 字段设置为 earliest,例如

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source
spec:
consumerGroup: knative-group
initialOffset: earliest
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
topics:
- knative-demo-topic
sink:
  ref:
    apiVersion: serving.knative.dev/v1
    kind: Service
    name: event-display

注意

initialOffset 的有效值为 earliestlatest。任何其他值都会导致验证错误。此字段仅在该消费者组没有已提交的偏移量时才适用。

连接到支持 TLS 的 Kafka Broker

KafkaSource 支持 TLS 和 SASL 身份验证方法。要启用 TLS 身份验证,您必须拥有以下文件

  • CA 证书
  • 客户端证书和密钥

KafkaSource 期望这些文件采用 PEM 格式。如果它们采用其他格式(例如 JKS),请将其转换为 PEM。

  1. 通过运行以下命令来在 KafkaSource 将要设置的命名空间中创建证书文件作为密钥

    kubectl create secret generic cacert --from-file=caroot.pem
    
    kubectl create secret tls kafka-secret --cert=certificate.pem --key=key.pem
    
  2. 应用 KafkaSource。相应地修改 bootstrapServerstopics 字段。

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
     name: kafka-source-with-tls
    spec:
     net:
       tls:
         enable: true
         cert:
           secretKeyRef:
             key: tls.crt
             name: kafka-secret
         key:
           secretKeyRef:
             key: tls.key
             name: kafka-secret
         caCert:
           secretKeyRef:
             key: caroot.pem
             name: cacert
     consumerGroup: knative-group
     bootstrapServers:
     - my-secure-kafka-bootstrap.kafka:443
     topics:
     - knative-demo-topic
     sink:
       ref:
         apiVersion: serving.knative.dev/v1
         kind: Service
         name: event-display
    

为 KafkaSources 启用 SASL

简单身份验证和安全层 (SASL) 由 Apache Kafka 用于身份验证。如果您在集群上使用 SASL 身份验证,则用户必须向 Knative 提供凭据以与 Kafka 集群通信,否则无法生成或使用事件。

先决条件

  • 您有权访问具有简单身份验证和安全层 (SASL) 的 Kafka 集群。

步骤

  1. 通过运行以下命令来创建使用 Kafka 集群的 SASL 信息的密钥

    STRIMZI_CRT=$(kubectl -n kafka get secret example-cluster-cluster-ca-cert --template='{{index.data "ca.crt"}}' | base64 --decode )
    
    SASL_PASSWD=$(kubectl -n kafka get secret example-user --template='{{index.data "password"}}' | base64 --decode )
    
    kubectl create secret -n default generic <secret_name> \
        --from-literal=ca.crt="$STRIMZI_CRT" \
        --from-literal=password="$SASL_PASSWD" \
        --from-literal=saslType="SCRAM-SHA-512" \
        --from-literal=user="example-user"
    
  2. 创建或修改 KafkaSource,使其包含以下规范选项

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: example-source
    spec:
    ...
      net:
        sasl:
          enable: true
          user:
            secretKeyRef:
              name: <secret_name>
              key: user
          password:
            secretKeyRef:
              name: <secret_name>
              key: password
          type:
            secretKeyRef:
              name: <secret_name>
              key: saslType
        tls:
          enable: true
          caCert:
            secretKeyRef:
              name: <secret_name>
              key: ca.crt
    ...
    

    其中 <secret_name> 是在上一步中生成的密钥的名称。

清理步骤

  1. 删除 Kafka 事件源

    kubectl delete -f source/source.yaml kafkasource.sources.knative.dev
    

    示例输出

    "kafka-source" deleted
    

  2. 删除 event-display 服务

    kubectl delete -f source/event-display.yaml service.serving.knative.dev
    

    示例输出

    "event-display" deleted
    

  3. 可选:删除 Apache Kafka 主题

    kubectl delete -f kafka-topic.yaml
    

    示例输出

    kafkatopic.kafka.strimzi.io "knative-demo-topic" deleted
    

我们使用分析和 Cookie 来了解网站流量。有关您使用我们网站的信息将与 Google 共享,以用于该目的。 了解更多。