跳到内容

使用 Knative 和 Restate 构建有状态应用程序

发布时间:2024-09-02,修订时间:2024-09-04

使用 Knative 和 Restate 构建有状态应用程序

Building Stateful applications with Knative and Restate

作者:Francesco Guardiani,Restate 高级软件工程师,Giselle van Dongen,Restate 开发者布道师

Knative 彻底改变了在 Kubernetes 上开发和运行无服务器应用程序的方式,但在其之上构建有状态应用程序仍然非常具有挑战性。

例如,假设您想构建一个需要持久化某些状态的应用程序。为此,您可能需要将您的服务连接到数据库,并且在这样做时,您需要处理重试、重复事件、双重写入以及各种其他分布式系统问题。

再举一个例子,假设您想构建一个服务编排器,该编排器需要调用不同的服务,并在其中一个服务失败时最终补偿某些操作。理想情况下,您只需要编写一些按顺序执行一个操作后执行另一个操作的代码,并在其中一个操作失败时执行回滚。但在实践中,这并不容易,因为您需要解决诸如调用下游服务时的重试、编排器服务的故障,甚至更棘手的调用下游服务时的长时间等待等问题。

如果您可以在 Knative 服务中嵌入应用程序状态并执行复杂的服务协调,而无需处理上述任何问题,那该怎么办?

进入 Restate

Restate 是一个开源的持久执行引擎,用于构建有状态的无服务器应用程序。换句话说,您构建的代码看起来像普通的 RPC 服务,并且代码会持久执行,即引擎会存储执行进度。发生崩溃后,引擎会透明地将应用程序恢复到先前状态,并从中断的地方恢复执行。

记录执行进度的另一个方面是,如果出现长时间等待,例如由于服务响应缓慢,引擎会自动挂起执行,以避免浪费计算资源。在实践中,这意味着在“等待时间”期间,应用程序可以缩减到零!

通过将 Restate 和 Knative 结合使用,您可以开发有状态实体、编排微服务、实现 Saga 模式、对事件进行去重,同时能够在不需要工作时缩减到零。Restate 将负责处理诸如状态一致性、跨服务通信、故障恢复等复杂的分布式系统问题。

使用 Restate,您可以使用可用的 Restate SDK 之一构建应用程序,然后将其部署为无服务器/无状态 HTTP 服务器,例如使用 Knative 服务。目前,Restate 支持 Golang、Java、Kotlin、Typescript、Rust 和 Python。要调用您的服务,您向 Restate 发送请求,而不是直接向您的服务发送请求,这样 Restate 充当客户端和服务之间的“代理”。

要部署 Restate 引擎,有不同的策略:您可以将其作为有状态部署部署到您的 k8s 集群中,类似于您部署数据库的方式,或者您可以使用 Restate Cloud 托管服务。有关更多信息,请查看如何部署 Restate

注册流程示例

为了让您了解其工作原理,我将向您展示一个如何使用 Knative 和 Restate 结合构建注册流程的示例。示例应用程序的组成如下

  • 用户服务,我们将在其中存储用户信息。
  • 注册服务,它封装了注册新用户、发送确认电子邮件并在之后激活用户的流程。

用户服务

让我们从用户服务开始。

要构建它,我们将创建一个 Restate 的虚拟对象,它是一种抽象,用于封装一组 RPC 处理程序及其关联的 K/V 存储。虚拟对象可以通过密钥寻址,您在调用其处理程序之一时提供该密钥。此外,虚拟对象每个密钥都有一个内在锁,这意味着 Restate 将确保对于给定密钥,**最多只有一个请求**可以同时运行,任何其他请求都将被排队到一个**每个密钥**的队列中。

让我们首先定义获取用户数据的处理程序

// Struct to encapsulate the user service logic
type userObject struct{}

// User struct definition, ser/deserializeable with json
type User struct {
    Name     string `json:"name"`
    Surname  string `json:"surname"`
    Password string `json:"password"`
}

func (t *userObject) Get(ctx restate.ObjectSharedContext) (User, error) {
    return restate.Get[User](ctx, "user")
}

每个 Restate 处理程序都使用Context调用,Context是一个接口,封装了 Restate 向开发者公开的各种功能。此上下文根据处理程序的类型而有所不同。

在本例中,我们使用restate.Get,它从 Restate 的虚拟对象 K/V 存储中读取一个值。

然后,我们可以定义处理程序来初始化用户

// Initialize will initialize the user object
func (t *userObject) Initialize(ctx restate.ObjectContext, user User) error {
    // Check if the user doesn't exist first
    usr, err := restate.Get[*User](ctx, "user")
    if err != nil {
        return err
    }
    if usr != nil {
        return restate.TerminalError(fmt.Errorf("the user was already initialized"))
    }

    // Store the user
    restate.Set(ctx, "user", user)

    // Store the unactivated status
    restate.Set(ctx, "activated", false)

    return nil
}

restate.Get类似,使用restate.Set,我们可以写入虚拟对象 K/V 存储。

最后,在用户初始化后处理程序来激活用户

// Activate will signal the user is activated
func (t *userObject) Activate(ctx restate.ObjectContext) error {
    // Check if the user exists first
    usr, err := restate.Get[*User](ctx, "user")
    if err != nil {
        return err
    }
    if usr == nil {
        return restate.TerminalError(fmt.Errorf("the user doesn't exist"))
    }

    // Store the activated status
    restate.Set(ctx, "activated", false)

    return nil
}

现在我们准备实现注册服务了。

注册服务

注册服务有一个处理程序来编排注册

func (t *signupService) Signup(ctx restate.Context, newUser NewUser) (string, error) {
    // Initialize the newUser first
    user := User{
        Name:     newUser.Name,
        Surname:  newUser.Surname,
        Password: newUser.Password,
    }
    _, err := restate.Object[restate.Void](ctx, "User", newUser.Username, "Initialize").Request(user)
    if err != nil {
        return "", err
    }

    // Prepare an awakeable to await the email activation
    awakeable := restate.Awakeable[restate.Void](ctx)

    // Send the activation email
    _, err = restate.Run[restate.Void](ctx, func(ctx restate.RunContext) (restate.Void, error) {
        return restate.Void{}, sendEmail(newUser.Username, awakeable.Id())
    })
    if err != nil {
        return "", err
    }

    // Await the activation
    _, err = awakeable.Result()
    if err != nil {
        return "", err
    }

    // Activate the user
    _, err = restate.Object[restate.Void](ctx, "User", newUser.Username, "Activate").Request(user)
    if err != nil {
        return "", err
    }

    return fmt.Sprintf("The new user %s is signed up and activated", newUser.Username), nil
}

使用restate.Call,我们可以调用其他 Restate 服务。保证这些请求只执行一次。

使用restate.Awakeable,我们可以等待任意事件发生。您可以简单地向 Restate 发送 HTTP 请求来完成请求,并提供 Awakeable ID。在我们的示例中,电子邮件将嵌入一个包含 Awakeable ID 的链接,一旦用户点击验证按钮,该链接将被完成。

使用restate.Run,我们可以执行任何任意代码段并记忆结果,这样,如果发生崩溃,Restate 不会重新执行该代码块,而是会加载存储的结果并将其用于后续操作。

启动 HTTP 服务并使用 Knative 部署

要使用 HTTP 公开服务

func main() {
    // Read PORT env injected by Knative Serving
    port := os.Getenv("PORT")
    if port == "" {
        port = "9080"
    }
    bindAddress := fmt.Sprintf(":%s", port)

    // Bind services to the Restate HTTP/2 server
    srv := server.NewRestate().
        Bind(restate.Reflect(&userObject{})).
        Bind(restate.Reflect(&signupService{}))

    // Start HTTP/2 server
    if err := srv.Start(context.Background(), bindAddress); err != nil {
        slog.Error("application exited unexpectedly", "err", err.Error())
        os.Exit(1)
    }
}

您现在可以使用您的工具构建容器镜像,例如使用ko

$ ko build main.go -B

并使用kn部署它

$ kn service create signup \
  --image $MY_IMAGE_REGISTRY/main.go \
  --port h2c:8080

在发送请求之前,您需要告诉 Restate 您新的服务部署

$ restate deployments register http://signup.default.svc

就是这样!您现在可以发送请求了

$ curl https://#:8080/Signup/Signup --json '{"username": "slinkydeveloper", "name": "Francesco", "surname": "Guardiani", "password": "Pizza-without-pineapple"}'

请注意:出于简洁起见,代码示例中省略了一些部分,请查看完整示例以获取更多详细信息以及如何使用kind在本地运行此示例。

我们为您提供支持

假设一下,在Signup流程中,第一次尝试执行sendEmail函数时失败了,会发生什么情况?

如果没有 Restate,您需要在循环中重试执行sendEmail几次。但是,如果在重试执行sendEmail时,**signup 服务崩溃或消失**了呢?在这种情况下,您将无法跟踪 signup 的进度,下次用户按下 F5 时,您将需要一些逻辑来重建先前 signup 的状态和/或将其丢弃。

使用 Restate,如果sendEmail失败,它将自动重试,并且之前执行的所有操作(在本例中为对User/Initialize处理程序的调用)将不会再次执行,但其结果值将简单地恢复。

这得益于 Restate 的持久执行引擎,它会记录应用程序的进度,并在发生崩溃时从上次中断的地方重新启动。更重要的是,Restate 能够在无法取得进展时暂停执行,例如长时间休眠时,或在等待来自其他服务的响应时,所有这些都无需将您的业务逻辑拆分为一系列不同的处理程序。是的,您猜对了,**在等待时,您的 Knative 服务可以缩减到零!**

下一步

在这篇文章中,我们了解了如何使用 Restate 构建有状态实体和简单的编排流程,并在 Knative 上部署它。

通过将 Restate 和 Knative 结合在一起,您可以获得两全其美,因为您可以轻松地构建有状态应用程序的无服务器应用程序。

使用 Restate 和 Knative,您可以构建更多内容:工作流saga有状态事件处理(也结合 Knative Eventing!)仅举几例。查看 Restate 示例以了解可以构建的内容:https://github.com/restatedev/examples

我们使用分析和 Cookie 来了解网站流量。出于此目的,您的网站使用信息将与 Google 共享。了解更多。