跳到内容

使用比特币交易数据开始使用 Knative Eventing

发布日期:2020-06-22 ,  修订日期:2024-01-17

使用比特币交易数据开始使用 Knative Eventing

作者:Johana Saladas,IBM 软件工程师

我一直在探索 Knative Eventing,这是一个系统,它使云原生事件生态系统能够通过使用 **事件生产者** 和 **事件消费者** 来轻松部署。此演示的大部分工作都是在 0.11 版中完成的,我也在 0.13 版中运行了它,现在它也适用于 0.15 版。

此演示是在第一个 Knative 社区会议上展示的,因此您也可以在这里观看视频版本:{{< youtube sGi_LuAaaT0 >}}

我创建了一个简单的演示,以探索事件驱动架构的一些关键优势,例如

  • 基于推送的消息传递。
  • 生产者和消费者的解耦。
  • 在数据移动时应用业务逻辑。
  • 用于数据科学的实时事件流 - 毫秒级决策,例如欺诈检测。

在这篇文章中,我将向您展示如何使用一些基本的 Knative 事件组件来启动并运行一个示例事件场景;**代理、触发器、生产者** 和 **消费者**。此演示展示了实时流式事件、流内转换和基于推送的前端在实际中的作用。您可以以此为基础,进一步探索可能性。

此场景使用比特币交易事件作为事件流的示例。这些事件将通过 UI 前端实时显示,并根据其总交易价值由另一项服务分类成不同的规模。

在下图中,您可以看到我们即将部署的计划。

Diagram of this example Knative Eventing scenario

有一个 github 存储库可以配合此演示 这里。所有单个服务的源代码都可以在 github 存储库中找到。如果您想使用 Docker Hub 上预构建的镜像,那么您只需要 yaml 目录中的文件即可。

步骤 1:创建命名空间并创建代理

kubectl apply -f 001-namespace.yaml

第一步是部署 001-namespace.yaml,它将创建一个 Kubernetes 命名空间并自动添加标签 knative-eventing-injection=enabled。这将创建一个 Knative Eventing **代理**。

**代理** 是事件从事件源或 **生产者** 发送到的位置。它可以作为消息传递通道,默认情况下是内存中的,但也可以是其他东西(例如 Kafka 通道)。从这里,它们可以被感兴趣的服务消费。

步骤 2:部署比特币事件源

kubectl apply -f 010-deployment.yaml
我们的 **事件源** 是一个应用程序,它从 blockchain.info websocket 获取(未确认的比特币交易)消息,然后创建一个新的 CloudEvent。这是我们的事件生产者,它 *产生* 其他服务 *可能或可能不会* 订阅的事件。

在本例中,我们告诉它将事件发送到哪里,使用 **sink** 变量。sink 作为环境变量传递到部署中,在本例中,它是我们的代理地址。要获取代理 URL,您可以使用以下命令

kubectl get broker -n knative-eventing-websocket-source
部署事件源后,您可以通过获取日志来验证它是否正在运行
kubectl —namespace knative-eventing-websocket-source logs -l app=wseventsource — tail=100 -f

步骤 3:订阅 event-display 到比特币事件

kubectl apply -f 040-trigger.yaml

**触发器** 提供一个过滤器,用于选择匹配某些属性的事件,以将其传递给指定的服务。此场景中指定了三个触发器。您不需要先部署订阅服务,即可设置触发器。

步骤 4:部署我们的消费服务

在本例中,我们拥有 *多个* **消费者**。这些是对事件感兴趣(或不感兴趣)的服务。我们有三个消费服务

  • **event-display** - 订阅来自“wss://ws.blockchain.info/inv”的事件的 Kubernetes 部署。此服务获取这些事件,然后通过 UI 前端实时显示它们。此服务是 Kubernetes 服务。
    kubectl apply -f 050-kubernetesdeploy.yaml
    kubectl apply -f 060-kubernetesservice.yaml
    

部署此服务后,您可以使用 Web 浏览器访问 localhost:31234,您应该会看到比特币交易事件在 UI 中实时呈现:您应该看到此 UI 实时更新

  • **classifier** - 订阅来自“wss://ws.blockchain.info/inv”的事件。此服务获取这些事件,然后对每个交易值进行分类。在应用程序代码中,使用新的 **type** 和 **source** 创建了一个新的 CloudEvent。这些新的事件会重新发出到 Knative 事件生态系统中。

逻辑保持简单,只有两个规模类别;*小* 和 *大*。它代表了作为数据科学或 ML 过程的一部分而可能发生的流内转换或建模应用程序的示例。这种架构可以用于欺诈检测、异常检测和其他对速度至关重要的决策。

kubectl apply -f 031-classifier-service.yaml

  • **test-display** - 订阅来自 **type** 和 **source** 为 'classifier' 的事件的 Knative 服务。
    kubectl apply -f 030-test-display-service.yaml
    

此服务订阅包含交易规模的事件,因此当我们查看日志时,我们可以看到这些事件

kubectl logs -l serving.knative.dev/service=test-display -c user-container — tail=100 -n knative-eventing-websocket-source -f
Our test-display service consumes the size reply that is emitted by the classifier

进一步阅读

尝试一下此演示,并 告诉我你的结果!

如果您想了解更多信息,可以查看 Knative 文档 中的其他示例。‘Knative Cookbook’ 是另一个了解 Knative Serving 和 Eventing 的好资源。

我们使用分析和 Cookie 来了解网站流量。为此目的,我们与 Google 共享有关您使用我们网站的信息。 了解更多。