跳到内容

向 Broker 发送审查评论

Image

我们书店的首页有一个评论区,读者可以在其中查看其他人的评论并通过文本输入区域提交自己的评论。虽然这个过程看起来很简单 - 点击一个按钮,评论就会发布 - 但其背后的机制是由强大的事件驱动架构驱动的。

我们将了解哪些 Knative 功能?

  • Knative 事件 Broker
  • Knative 事件 Sink
  • Knative SinkBinding

可交付成果是什么样的?

Image

简单来说:在用户点击前端的提交按钮后,评论将显示在事件显示服务中。

实现

步骤 0:了解基础知识

Image

在微服务和 REST API 的世界中,我们通常将请求称为服务之间通信的主要方法。然而,在事件驱动架构中,最小的单元是事件。Knative 事件遵循 CloudEvents 规范,因此在继续之前了解此概念至关重要。在继续之前,请从 这里 了解有关 CloudEvents 的更多信息!

步骤 1:了解图书评论服务

Image

图书评论服务是我们的 Node.js API 服务器,在我们的事件驱动架构中起着至关重要的作用。了解它的运作方式至关重要,因为它接收事件并适当地处理它们。

Image

关键概念:Broker 和 SinkBinding

在我们深入研究代码之前,让我们澄清两个重要的概念

  • Broker:在事件驱动架构中充当中心点,将事件路由到正确的目的地。
  • SinkBinding:此 Knative 事件组件会自动将 Broker 的地址注入到环境变量 K_SINK 中,确保 Node.js 服务器始终拥有正确的地址,而无需手动更新。

您将在沿途获得更深入的了解。

Image

让我们检查一下 node-server/index.js 文件,从 /add 函数开始。当用户通过前端提交评论时,它首先被此端点接收。

app.post('/add', async (req, res) => {
  try {
    const receivedEvent = HTTP.toEvent({headers: req.headers, body: req.body});
    const brokerURI = process.env.K_SINK;

    if (receivedEvent.type === 'new-review-comment') {
      const response = await fetch(brokerURI, {
        method: 'POST',
        headers: {
          'Content-Type': 'application/cloudevents+json',
          'ce-specversion': receivedEvent.specversion,
          'ce-type': receivedEvent.type,
          'ce-source': receivedEvent.source,
          'ce-id': receivedEvent.id,
        },
        body: JSON.stringify(receivedEvent.data),
      });
    }
  } catch (err) {
    console.error(err);
  }
});

接收事件/add 端点接收来自前端的事件。它使用 CloudEvents 的 SDK 将传入的请求转换为 CloudEvent 对象

const receivedEvent = HTTP.toEvent({headers: req.headers, body: req.body});

确定 Broker 地址:Broker 的地址是在集群中动态分配的。Node.js 服务器从环境变量 K_SINK 中检索此地址

const brokerURI = process.env.K_SINK;

您可能想知道是谁告诉环境变量该地址?那就是 Knative SinkBinding。

事件过滤:该服务检查事件类型。如果它是 new-review-comment,则它将事件转发到 Broker

if (receivedEvent.type === 'new-review-comment') {
  // logic that forwards the event, see below
}

转发事件逻辑:该事件将使用适当的 CloudEvent 标头转发到 Broker

const response = await fetch(brokerURI, {
  method: 'POST',
  headers: {
    'Content-Type': 'application/cloudevents+json',
    'ce-specversion': receivedEvent.specversion,
    'ce-type': receivedEvent.type,
    'ce-source': receivedEvent.source,
    'ce-id': receivedEvent.id,
  },
  body: JSON.stringify(receivedEvent.data),
});

Image

探索其他函数

Node.js 服务器包含其他遵循类似模式的函数,详细的注释解释了它们的功能

  • /insert:接收 CloudEvents 并将有效负载插入 PostgreSQL 数据库。
  • /comment:创建与前端的 WebSocket 连接以传输来自数据库的评论。

步骤 2:创建 Broker

Image

Broker 在您的事件驱动应用程序中充当路由器,接收事件并将它们路由到正确的目的地。

  • 1:创建一个名为 node-server/config/200-broker.yaml 的新 YAML 文件,并添加以下内容
node-server/config/200-broker.yaml
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
    name: bookstore-broker
  • 2:应用 YAML 文件
kubectl apply -f node-server/config/200-broker.yaml

您将看到以下输出

broker.eventing.knative.dev/bookstore-broker created

或者,使用 Knative CLI kn 来创建 Broker

kn broker create bookstore-broker

您将看到以下输出

Broker 'bookstore-broker' successfully created in namespace 'default'.

验证

运行以下命令来列出 Broker

kubectl get brokers

您应该看到 Broker bookstore-broker,其 READY 状态为 True

NAME               URL                                                                                 AGE     READY   REASON
bookstore-broker   http://broker-ingress.knative-eventing.svc.cluster.local/default/bookstore-broker   7m30s   True    
故障排除

如果出现问题,请使用以下命令进行诊断

kubectl describe broker bookstore-broker

步骤 3:在 Node.js 服务器和 Broker 之间创建 SinkBinding

Image

在您的应用程序中对连接到 Kubernetes 服务的 URL 进行硬编码可能具有局限性和灵活性。SinkBinding 会动态地将 Kubernetes 服务的 URL 注入到您的应用程序中。

这里 了解有关 SinkBinding 的更多信息,以及 规范模式

创建 SinkBinding

  • 1:创建一个名为 node-server/config/300-sinkbinding.yaml 的新 YAML 文件,并添加以下内容
node-server/config/300-sinkbinding.yaml
apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
  name: node-sinkbinding
spec:
  subject:
    apiVersion: apps/v1
    kind: Deployment
    selector:
      matchLabels:
        app: node-server
  sink: # In this case, the sink is our Broker, which is the eventing service that will receive the events
    ref:
      apiVersion: eventing.knative.dev/v1
      kind: Broker
      name: bookstore-broker
  • 2:应用 YAML 文件
kubectl apply -f node-server/config/300-sinkbinding.yaml

您将看到以下输出

sinkbinding.sources.knative.dev/node-sinkbinding created

验证

运行以下命令来列出 sinkbinding

kubectl get sinkbindings
您应该看到 sinkbinding node-sinkbinding,其 READY 状态为 True

NAME               SINK                                                                                AGE     READY   REASON
node-sinkbinding   http://broker-ingress.knative-eventing.svc.cluster.local/default/bookstore-broker   2m43s   True    

步骤 4:创建事件显示服务

Image

事件显示是 Knative 事件中的一个调试工具,允许您将其用作事件的临时目的地(即接收器)。

创建事件显示服务

  • 1:创建一个名为 node-server/config/100-event-display.yaml 的新 YAML 文件,并添加以下内容
node-server/config/100-event-display.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: event-display
spec:
  replicas: 1
  selector:
    matchLabels:
      app: event-display
  template:
    metadata:
      labels:
        app: event-display
    spec:
      containers:
        - name: event-display
          image: gcr.io/knative-releases/knative.dev/eventing-contrib/cmd/event_display
          ports:
            - containerPort: 8080

---
apiVersion: v1
kind: Service
metadata:
  name: event-display
spec:
  selector:
    app: event-display
  ports:
    - protocol: TCP
      port: 80
      targetPort: 8080
  type: ClusterIP
  • 2:应用 YAML 文件
kubectl apply -f node-server/config/100-event-display.yaml

您将看到以下输出

deployment.apps/event-display created
service/event-display created

验证

运行以下命令来列出 Pod

kubectl get pods

您应该在“正在运行”状态下看到 Pod event-display-XXXXXXX-XXXXX

NAME                                  READY   STATUS    RESTARTS   AGE
bookstore-frontend-7b879ffb78-9bln6   1/1     Running   0          91m
event-display-55967c745d-bxrgh        1/1     Running   0          4m44s
node-server-644795d698-r9zlr          1/1     Running   0          4m43s

步骤 5:创建一个将 Broker 和事件显示连接起来的 Trigger

Image

Trigger 能够根据 CloudEvent 的属性 将事件转发到正确的目的地。它是 Broker 和事件目的地的连接器。

Trigger 中的 Filter 将根据过滤器条件过滤事件。您将在 Trigger 的 YAML 文件中指定您的过滤器条件。如果没有指定过滤器,Trigger 将转发 Broker 收到的所有事件。

Image

Knative 中还有一个名为 Channel 的概念,一般来说,您可以将没有过滤器的 Broker & Trigger 与 Channel & Subscription 相同对待。

这里 了解有关 Broker & Trigger 的更多信息!

创建 Trigger

Image

这里我们创建一个 Trigger,它将把所有事件发送到事件显示。

Image

  • 1:创建一个名为 node-server/config/200-log-trigger.yaml 的新 YAML 文件,并添加以下内容
node-server/config/200-log-trigger.yaml
# This Trigger subscribes to the Broker and will forward all the events that it received to event-display.
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: log-trigger
spec:
  broker: bookstore-broker
  subscriber:
    ref:
      apiVersion: v1
      kind: Service
      name: event-display
  • 2:应用 YAML 文件
kubectl apply -f node-server/config/200-log-trigger.yaml

您将看到以下输出

trigger.eventing.knative.dev/log-trigger created

验证

运行以下命令来列出 Trigger

kubectl get triggers
Trigger log-trigger 应该具有 READY 状态为 True

NAME                BROKER             SUBSCRIBER_URI                                                       AGE     READY   REASON
log-trigger         bookstore-broker   http://event-display.default.svc.cluster.local                       6m2s    True    

验证

Image

使用以下命令打开事件显示的日志

kubectl logs -l=app=event-display -f
验证

在 UI 的评论框中输入一些内容,然后单击提交按钮。评论应该出现在事件显示服务中,并输出以下内容

☁️  cloudevents.Event
Validation: valid
Context Attributes,
specversion: 1.0
type: new-review-comment
source: bookstore-eda
id: unique-comment-id
datacontenttype: application/json
Extensions,
knativearrivaltime: 2024-05-19T05:27:36.232562628Z
Data,
{
    "reviewText": "test"
}

下一步

Image

请确保您通过验证测试后才能继续。

转到部署 ML 工作流:情感分析

我们使用分析和 Cookie 来了解网站流量。为此目的,有关您使用我们网站的信息将与 Google 共享。 了解更多。