跳至内容

使用 Broker 和 Trigger 的 Sequence

我们将创建以下逻辑配置。我们创建一个 PingSource,将事件馈送到 Broker,然后我们创建一个 Filter,将这些事件连接到一个 Sequence,该序列包含 3 个步骤。然后,我们取 Sequence 的末尾,并将新生成的事件重新馈送到 Broker,并创建一个另一个 Trigger,该 Trigger 将显示这些事件。

先决条件

  • Knative Serving
  • InMemoryChannel

注意

这些示例使用 default 命名空间。

如果您想使用不同类型的 Channel,则需要修改 Sequence.Spec.ChannelTemplate 以创建相应的 Channel 资源。

Logical Configuration

这些示例中使用的函数位于 https://github.com/knative/eventing/blob/main/cmd/appender/main.go 中。

设置

创建 Broker

  1. 要创建集群默认 Broker 类型,请将以下 YAML 复制到一个文件中

    apiVersion: eventing.knative.dev/v1
    kind: Broker
    metadata:
     name: default
    
  2. 通过运行以下命令应用 YAML 文件

    kubectl apply -f <filename>.yaml
    
    其中 <filename> 是您在上一步中创建的文件的名称。

创建 Knative 服务

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: first
spec:
  template:
    spec:
      containers:
        - image: gcr.io/knative-releases/knative.dev/eventing/cmd/appender
          env:
            - name: MESSAGE
              value: " - Handled by 0"

---
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: second
spec:
  template:
    spec:
      containers:
        - image: gcr.io/knative-releases/knative.dev/eventing/cmd/appender
          env:
            - name: MESSAGE
              value: " - Handled by 1"
---
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: third
spec:
  template:
    spec:
      containers:
        - image: gcr.io/knative-releases/knative.dev/eventing/cmd/appender
          env:
            - name: MESSAGE
              value: " - Handled by 2"
            - name: TYPE
              value: "samples.http.mod3"
---

在以下命令中将 default 更改为创建您已配置 Broker 的命名空间中的服务

kubectl -n default create -f ./steps.yaml

创建 Sequence

sequence.yaml 文件包含创建 Sequence 的规范。如果您使用的是不同类型的 Channel,则需要更改 spec.channelTemplate 以指向您想要的 Channel。

此外,将 spec.reply.name 更改为指向您的 Broker

apiVersion: flows.knative.dev/v1
kind: Sequence
metadata:
  name: sequence
spec:
  channelTemplate:
    apiVersion: messaging.knative.dev/v1
    kind: InMemoryChannel
  steps:
    - ref:
        apiVersion: serving.knative.dev/v1
        kind: Service
        name: first
    - ref:
        apiVersion: serving.knative.dev/v1
        kind: Service
        name: second
    - ref:
        apiVersion: serving.knative.dev/v1
        kind: Service
        name: third
  reply:
    ref:
      kind: Broker
      apiVersion: eventing.knative.dev/v1
      name: default

在以下命令中将 default 更改为创建您已配置 Broker 的命名空间中的 Sequence

kubectl -n default create -f ./sequence.yaml

创建目标 Broker 的 PingSource

这将创建一个 PingSource,每 2 分钟发送一个带有 {"message": "Hello world!"} 作为数据有效负载的 CloudEvent。

apiVersion: sources.knative.dev/v1
kind: PingSource
metadata:
  name: ping-source
spec:
  schedule: "*/2 * * * *"
  contentType: "application/json"
  data: '{"message": "Hello world!"}'
  sink:
    ref:
      apiVersion: eventing.knative.dev/v1
      kind: Broker
      name: default

在以下命令中将 default 更改为创建您已配置 Broker 和 Sequence 的命名空间中的 PingSource

kubectl -n default create -f ./ping-source.yaml

创建针对 Sequence 的触发器

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: sequence-trigger
spec:
  broker: default
  filter:
    attributes:
      type: dev.knative.sources.ping
  subscriber:
    ref:
      apiVersion: flows.knative.dev/v1
      kind: Sequence
      name: sequence

在以下命令中将 default 更改为创建您已配置 Broker 和 Sequence 的命名空间中的 Trigger

kubectl -n default create -f ./trigger.yaml

创建服务和触发器,显示序列创建的事件

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: sequence-display
spec:
  template:
    spec:
      containers:
        - image: gcr.io/knative-releases/knative.dev/eventing/cmd/event_display
---
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: display-trigger
spec:
  broker: default
  filter:
    attributes:
      type: samples.http.mod3
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: sequence-display
---

在以下命令中将 default 更改为创建您已配置 Broker 的命名空间中的服务和 Trigger

kubectl -n default create -f ./display-trigger.yaml

检查结果

您现在可以通过检查 sequence-display pod 的日志来查看最终输出。

kubectl -n default get pods

查看 sequence-display pod 的日志

kubectl -n default logs -l serving.knative.dev/service=sequence-display -c user-container --tail=-1
☁️  cloudevents.Event
Validation: valid
Context Attributes,
  specversion: 1.0
  type: samples.http.mod3
  source: /apis/v1/namespaces/default/pingsources/ping-source
  id: 159bba01-054a-4ae7-b7be-d4e7c5f773d2
  time: 2020-03-03T14:56:00.000652027Z
  datacontenttype: application/json
Extensions,
  knativearrivaltime: 2020-03-03T14:56:00.018390608Z
  knativehistory: default-kne-trigger-kn-channel.default.svc.cluster.local; sequence-kn-sequence-0-kn-channel.default.svc.cluster.local; sequence-kn-sequence-1-kn-channel.default.svc.cluster.local; sequence-kn-sequence-2-kn-channel.default.svc.cluster.local; default-kne-trigger-kn-channel.default.svc.cluster.local
  traceparent: 00-e893412106ff417a90a5695e53ffd9cc-5829ae45a14ed462-00
Data,
  {
    "id": 0,
    "message": "Hello world! - Handled by 0 - Handled by 1 - Handled by 2"
  }

您会看到初始 PingSource 消息 {"Hello World!"} 已被 Sequence 中的每个步骤附加到它。

我们使用分析和 Cookie 来了解网站流量。有关您使用我们网站的信息将与 Google 共享,用于该目的。 了解更多。