使用 Knative 和 Restate 构建有状态应用程序 ¶
发布时间:2024-09-02,修订时间:2024-09-04
使用 Knative 和 Restate 构建有状态应用程序¶
作者:Francesco Guardiani,Restate 高级软件工程师,Giselle van Dongen,Restate 开发者布道师
Knative 彻底改变了在 Kubernetes 上开发和运行无服务器应用程序的方式,但在其之上构建有状态应用程序仍然非常具有挑战性。
例如,假设您想构建一个需要持久化某些状态的应用程序。为此,您可能需要将您的服务连接到数据库,并且在这样做时,您需要处理重试、重复事件、双重写入以及各种其他分布式系统问题。
再举一个例子,假设您想构建一个服务编排器,该编排器需要调用不同的服务,并在其中一个服务失败时最终补偿某些操作。理想情况下,您只需要编写一些按顺序执行一个操作后执行另一个操作的代码,并在其中一个操作失败时执行回滚。但在实践中,这并不容易,因为您需要解决诸如调用下游服务时的重试、编排器服务的故障,甚至更棘手的调用下游服务时的长时间等待等问题。
如果您可以在 Knative 服务中嵌入应用程序状态并执行复杂的服务协调,而无需处理上述任何问题,那该怎么办?
进入 Restate¶
Restate 是一个开源的持久执行引擎,用于构建有状态的无服务器应用程序。换句话说,您构建的代码看起来像普通的 RPC 服务,并且代码会持久执行,即引擎会存储执行进度。发生崩溃后,引擎会透明地将应用程序恢复到先前状态,并从中断的地方恢复执行。
记录执行进度的另一个方面是,如果出现长时间等待,例如由于服务响应缓慢,引擎会自动挂起执行,以避免浪费计算资源。在实践中,这意味着在“等待时间”期间,应用程序可以缩减到零!
通过将 Restate 和 Knative 结合使用,您可以开发有状态实体、编排微服务、实现 Saga 模式、对事件进行去重,同时能够在不需要工作时缩减到零。Restate 将负责处理诸如状态一致性、跨服务通信、故障恢复等复杂的分布式系统问题。
使用 Restate,您可以使用可用的 Restate SDK 之一构建应用程序,然后将其部署为无服务器/无状态 HTTP 服务器,例如使用 Knative 服务。目前,Restate 支持 Golang、Java、Kotlin、Typescript、Rust 和 Python。要调用您的服务,您向 Restate 发送请求,而不是直接向您的服务发送请求,这样 Restate 充当客户端和服务之间的“代理”。
要部署 Restate 引擎,有不同的策略:您可以将其作为有状态部署部署到您的 k8s 集群中,类似于您部署数据库的方式,或者您可以使用 Restate Cloud 托管服务。有关更多信息,请查看如何部署 Restate。
注册流程示例¶
为了让您了解其工作原理,我将向您展示一个如何使用 Knative 和 Restate 结合构建注册流程的示例。示例应用程序的组成如下
- 用户服务,我们将在其中存储用户信息。
- 注册服务,它封装了注册新用户、发送确认电子邮件并在之后激活用户的流程。
用户服务¶
让我们从用户服务开始。
要构建它,我们将创建一个 Restate 的虚拟对象,它是一种抽象,用于封装一组 RPC 处理程序及其关联的 K/V 存储。虚拟对象可以通过密钥寻址,您在调用其处理程序之一时提供该密钥。此外,虚拟对象每个密钥都有一个内在锁,这意味着 Restate 将确保对于给定密钥,**最多只有一个请求**可以同时运行,任何其他请求都将被排队到一个**每个密钥**的队列中。
让我们首先定义获取用户数据的处理程序
// Struct to encapsulate the user service logic
type userObject struct{}
// User struct definition, ser/deserializeable with json
type User struct {
Name string `json:"name"`
Surname string `json:"surname"`
Password string `json:"password"`
}
func (t *userObject) Get(ctx restate.ObjectSharedContext) (User, error) {
return restate.Get[User](ctx, "user")
}
每个 Restate 处理程序都使用Context
调用,Context
是一个接口,封装了 Restate 向开发者公开的各种功能。此上下文根据处理程序的类型而有所不同。
在本例中,我们使用restate.Get
,它从 Restate 的虚拟对象 K/V 存储中读取一个值。
然后,我们可以定义处理程序来初始化
用户
// Initialize will initialize the user object
func (t *userObject) Initialize(ctx restate.ObjectContext, user User) error {
// Check if the user doesn't exist first
usr, err := restate.Get[*User](ctx, "user")
if err != nil {
return err
}
if usr != nil {
return restate.TerminalError(fmt.Errorf("the user was already initialized"))
}
// Store the user
restate.Set(ctx, "user", user)
// Store the unactivated status
restate.Set(ctx, "activated", false)
return nil
}
与restate.Get
类似,使用restate.Set
,我们可以写入虚拟对象 K/V 存储。
最后,在用户初始化后处理程序来激活
用户
// Activate will signal the user is activated
func (t *userObject) Activate(ctx restate.ObjectContext) error {
// Check if the user exists first
usr, err := restate.Get[*User](ctx, "user")
if err != nil {
return err
}
if usr == nil {
return restate.TerminalError(fmt.Errorf("the user doesn't exist"))
}
// Store the activated status
restate.Set(ctx, "activated", false)
return nil
}
现在我们准备实现注册服务了。
注册服务¶
注册服务有一个处理程序来编排注册
func (t *signupService) Signup(ctx restate.Context, newUser NewUser) (string, error) {
// Initialize the newUser first
user := User{
Name: newUser.Name,
Surname: newUser.Surname,
Password: newUser.Password,
}
_, err := restate.Object[restate.Void](ctx, "User", newUser.Username, "Initialize").Request(user)
if err != nil {
return "", err
}
// Prepare an awakeable to await the email activation
awakeable := restate.Awakeable[restate.Void](ctx)
// Send the activation email
_, err = restate.Run[restate.Void](ctx, func(ctx restate.RunContext) (restate.Void, error) {
return restate.Void{}, sendEmail(newUser.Username, awakeable.Id())
})
if err != nil {
return "", err
}
// Await the activation
_, err = awakeable.Result()
if err != nil {
return "", err
}
// Activate the user
_, err = restate.Object[restate.Void](ctx, "User", newUser.Username, "Activate").Request(user)
if err != nil {
return "", err
}
return fmt.Sprintf("The new user %s is signed up and activated", newUser.Username), nil
}
使用restate.Call
,我们可以调用其他 Restate 服务。保证这些请求只执行一次。
使用restate.Awakeable
,我们可以等待任意事件发生。您可以简单地向 Restate 发送 HTTP 请求来完成请求,并提供 Awakeable ID。在我们的示例中,电子邮件将嵌入一个包含 Awakeable ID 的链接,一旦用户点击验证按钮,该链接将被完成。
使用restate.Run
,我们可以执行任何任意代码段并记忆结果,这样,如果发生崩溃,Restate 不会重新执行该代码块,而是会加载存储的结果并将其用于后续操作。
启动 HTTP 服务并使用 Knative 部署¶
要使用 HTTP 公开服务
func main() {
// Read PORT env injected by Knative Serving
port := os.Getenv("PORT")
if port == "" {
port = "9080"
}
bindAddress := fmt.Sprintf(":%s", port)
// Bind services to the Restate HTTP/2 server
srv := server.NewRestate().
Bind(restate.Reflect(&userObject{})).
Bind(restate.Reflect(&signupService{}))
// Start HTTP/2 server
if err := srv.Start(context.Background(), bindAddress); err != nil {
slog.Error("application exited unexpectedly", "err", err.Error())
os.Exit(1)
}
}
您现在可以使用您的工具构建容器镜像,例如使用ko
$ ko build main.go -B
并使用kn
部署它
$ kn service create signup \
--image $MY_IMAGE_REGISTRY/main.go \
--port h2c:8080
在发送请求之前,您需要告诉 Restate 您新的服务部署
$ restate deployments register http://signup.default.svc
就是这样!您现在可以发送请求了
$ curl https://#:8080/Signup/Signup --json '{"username": "slinkydeveloper", "name": "Francesco", "surname": "Guardiani", "password": "Pizza-without-pineapple"}'
请注意:出于简洁起见,代码示例中省略了一些部分,请查看完整示例以获取更多详细信息以及如何使用kind
在本地运行此示例。
我们为您提供支持¶
假设一下,在Signup
流程中,第一次尝试执行sendEmail
函数时失败了,会发生什么情况?
如果没有 Restate,您需要在循环中重试执行sendEmail
几次。但是,如果在重试执行sendEmail
时,**signup 服务崩溃或消失**了呢?在这种情况下,您将无法跟踪 signup 的进度,下次用户按下 F5 时,您将需要一些逻辑来重建先前 signup 的状态和/或将其丢弃。
使用 Restate,如果sendEmail
失败,它将自动重试,并且之前执行的所有操作(在本例中为对User/Initialize
处理程序的调用)将不会再次执行,但其结果值将简单地恢复。
这得益于 Restate 的持久执行引擎,它会记录应用程序的进度,并在发生崩溃时从上次中断的地方重新启动。更重要的是,Restate 能够在无法取得进展时暂停执行,例如长时间休眠时,或在等待来自其他服务的响应时,所有这些都无需将您的业务逻辑拆分为一系列不同的处理程序。是的,您猜对了,**在等待时,您的 Knative 服务可以缩减到零!**
下一步¶
在这篇文章中,我们了解了如何使用 Restate 构建有状态实体和简单的编排流程,并在 Knative 上部署它。
通过将 Restate 和 Knative 结合在一起,您可以获得两全其美,因为您可以轻松地构建有状态应用程序的无服务器应用程序。
使用 Restate 和 Knative,您可以构建更多内容:工作流、saga、有状态事件处理(也结合 Knative Eventing!)仅举几例。查看 Restate 示例以了解可以构建的内容:https://github.com/restatedev/examples