从零开始学python | 使用 gRPC 的 Python 微服务 III
从零开始学python | 使用 gRPC 的 Python 微服务 II
优雅地关闭
在开发机器上运行微服务时,可以按Ctrl+C停止它。这将导致 Python 解释器引发KeyboardInterrupt
异常。
当 Kubernetes 正在运行您的微服务并需要停止它以推出更新时,它会向您的微服务发送信号。具体来说,它会发送一个SIGTERM
信号并等待三十秒。如果到那时您的微服务还没有退出,它将发送一个SIGKILL
信号。
您可以并且应该捕获并处理 ,SIGTERM
以便您可以完成处理当前请求但拒绝新请求。您可以通过将以下代码放入serve()
:
1from signal import signal, SIGTERM
2
3...
4
5def serve():
6 server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
7 ...
8 server.add_insecure_port("[::]:50051")
9 server.start()
10
11 def handle_sigterm(*_):
12 print("Received shutdown signal")
13 all_rpcs_done_event = server.stop(30)
14 all_rpcs_done_event.wait(30)
15 print("Shut down gracefully")
16
17 signal(SIGTERM, handle_sigterm)
18 server.wait_for_termination()
这是一个细分:
- 第 1 行导入
signal
,它允许您捕获和处理来自 Kubernetes 或几乎任何其他进程的信号。 - 第 11 行定义了一个函数来处理
SIGTERM
。当 Python 收到SIGTERM
信号时将调用该函数,Python 将向其传递两个参数。但是,您不需要参数,因此请使用*_
忽略它们。 - 第 13 行调用
server.stop(30)
优雅地关闭服务器。它将拒绝新请求并等待30
当前请求完成几秒钟。它立即返回,但它返回一个threading.Event
您可以等待的对象。 - 第 14 行等待
Event
对象,因此 Python 不会过早退出。 - 第 17 行注册您的处理程序。
当您部署新版本的微服务时,Kubernetes 将发送信号以关闭现有的微服务。处理这些以正常关闭将确保不会丢弃请求。
保护通道
到目前为止,您一直在使用不安全的 gRPC 通道。这意味着几件事:
-
客户端无法确认它正在向目标服务器发送请求。有人可以创建一个冒名顶替的微服务并将其注入客户端可能会向其发送请求的某个地方。例如,他们可能能够将微服务注入到负载均衡器将向其发送请求的 Pod 中。
-
服务器无法确认客户端向其发送请求。只要有人可以连接到服务器,他们就可以向其发送任意 gRPC 请求。
-
流量未加密,因此任何路由流量的节点也可以查看它。
本节将介绍如何添加TLS身份验证和加密。
注意:这不涉及认证用户,仅涉及微服务进程。
您将学习两种设置 TLS 的方法:
- 直接的方式,客户端可以验证服务器,但服务器不验证客户端。
- 更复杂的方式,使用双向 TLS,客户端和服务器相互验证。
在这两种情况下,流量都是加密的。
TLS 基础知识
在深入研究之前,这里是 TLS 的简要概述: 通常,客户端验证服务器。例如,当您访问 Amazon.com 时,您的浏览器会验证它确实是 Amazon.com 而不是冒名顶替者。要做到这一点,客户必须从值得信赖的第三方那里得到某种保证,就像只有当你有一个共同的朋友为他们提供担保时,你才可能信任一个新人。
使用 TLS,客户端必须信任证书颁发机构 (CA)。CA 将对服务器持有的某些内容进行签名,以便客户端可以对其进行验证。这有点像你共同的朋友签了一张便条,然后你认出了他们的笔迹。欲了解更多信息,请参阅互联网安全的工作原理:TLS,SSL和CA。
您的浏览器隐式信任某些 CA,这些 CA 通常是 GoDaddy、DigiCert 或 Verisign 等公司。其他公司,如亚马逊,支付 CA 为他们签署数字证书,以便您的浏览器信任他们。通常,CA 会在签署证书之前验证 Amazon 是否拥有 Amazon.com。这样,冒名顶替者不会在 Amazon.com 的证书上签名,您的浏览器就会阻止该站点。
使用微服务,您不能真正要求 CA 签署证书,因为您的微服务在内部机器上运行。CA 可能很乐意签署证书并向您收费,但关键是这不切实际。在这种情况下,您的公司可以充当自己的 CA。gRPC 客户端将信任服务器,如果它具有由您的公司或您签署的证书(如果您正在执行个人项目)。
服务器认证
以下命令将创建可用于签署服务器证书的 CA 证书:
$ openssl req -x509 -nodes -newkey rsa:4096 -keyout ca.key -out ca.pem \
-subj /O=me
这将输出两个文件:
ca.key
是私钥。ca.pem
是公共证书。
然后,您可以为您的服务器创建一个证书并使用您的 CA 证书对其进行签名:
$ openssl req -nodes -newkey rsa:4096 -keyout server.key -out server.csr \
-subj /CN=recommendations
$ openssl x509 -req -in server.csr -CA ca.pem -CAkey ca.key -set_serial 1 \
-out server.pem
这将产生三个新文件:
server.key
是服务器的私钥。server.csr
是一个中间文件。server.pem
是服务器的公共证书。
注意:这些命令仅用于示例目的。私钥未加密。如果您想为您的公司生成证书,请咨询您的安全团队。他们可能会制定您应该遵循的创建、存储和撤销证书的策略。
您可以将其添加到 Recommendations 微服务中Dockerfile
。将机密安全地添加到 Docker 映像非常困难,但是有一种方法可以使用最新版本的 Docker 来做到这一点,如下所示:
1# syntax = docker/dockerfile:1.0-experimental
2# DOCKER_BUILDKIT=1 docker build . -f recommendations/Dockerfile \
3# -t recommendations --secret id=ca.key,src=ca.key
4
5FROM python
6
7RUN mkdir /service
8COPY infra/ /service/infra/
9COPY protobufs/ /service/protobufs/
10COPY recommendations/ /service/recommendations/
11COPY ca.pem /service/recommendations/
12
13WORKDIR /service/recommendations
14RUN python -m pip install --upgrade pip
15RUN python -m pip install -r requirements.txt
16RUN python -m grpc_tools.protoc -I ../protobufs --python_out=. \
17 --grpc_python_out=. ../protobufs/recommendations.proto
18RUN openssl req -nodes -newkey rsa:4096 -subj /CN=recommendations \
19 -keyout server.key -out server.csr
20RUN --mount=type=secret,id=ca.key \
21 openssl x509 -req -in server.csr -CA ca.pem -CAkey /run/secrets/ca.key \
22 -set_serial 1 -out server.pem
23
24EXPOSE 50051
25ENTRYPOINT [ "python", "recommendations.py" ]
新行突出显示。这是一个解释:
- 需要第 1 行来启用机密。
- 第 2 行和第 3 行显示了如何构建 Docker 映像的命令。
- 第 11行将 CA 公共证书复制到映像中。
- 第 18 行和第 19 行生成新的服务器私钥和证书。
- 第 20 到 22 行临时加载 CA 私钥,以便您可以用它签署服务器的证书。但是,它不会保留在图像中。
注意:有关 的更多信息‑‑mount=type=secret
,请参阅Docker 文档。将来,此功能可能会升级为稳定版本,届时您将无需syntax = docker/dockerfile:1.0-experimental
在Dockerfile
.
根据版本控制策略,实验性语法不会消失,因此您可以无限期地继续使用它。
您的图像现在将包含以下文件:
ca.pem
server.csr
server.key
server.pem
现在,您可以更新serve()
中recommendations.py
所强调的:
1def serve():
2 server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
3 recommendations_pb2_grpc.add_RecommendationsServicer_to_server(
4 RecommendationService(), server
5 )
6
7 with open("server.key", "rb") as fp:
8 server_key = fp.read()
9 with open("server.pem", "rb") as fp:
10 server_cert = fp.read()
11
12 creds = grpc.ssl_server_credentials([(server_key, server_cert)])
13 server.add_secure_port("[::]:443", creds)
14 server.start()
15 server.wait_for_termination()
以下是变化:
- 第 7 到 10 行加载服务器的私钥和证书。
- 第 12 行和第 13 行使用 TLS 运行服务器。它现在只接受 TLS 加密的连接。
您需要更新marketplace.py
以加载 CA 证书。您现在只需要客户端中的公共证书,如下所示:
1recommendations_host = os.getenv("RECOMMENDATIONS_HOST", "localhost")
2with open("ca.pem", "rb") as fp:
3 ca_cert = fp.read()
4creds = grpc.ssl_channel_credentials(ca_cert)
5recommendations_channel = grpc.secure_channel(
6 f"{recommendations_host}:443", creds
7)
8recommendations_client = RecommendationsStub(recommendations_channel)
您还需要添加COPY ca.pem /service/marketplace/
到 Marketplace Dockerfile
。
您现在可以加密运行客户端和服务器,客户端将验证服务器。为了让一切运行变得简单,您可以使用docker-compose
. 但是,在撰写本文时,docker-compose
不支持 build secrets。您必须手动构建 Docker 镜像,而不是使用docker-compose build
.
docker-compose up
但是,您仍然可以运行。更新docker-compose.yaml
文件以删除build
部分:
1version: "3.8"
2services:
3
4 marketplace:
5 environment:
6 RECOMMENDATIONS_HOST: recommendations
7 # DOCKER_BUILDKIT=1 docker build . -f marketplace/Dockerfile \
8 # -t marketplace --secret id=ca.key,src=ca.key
9 image: marketplace
10 networks:
11 - microservices
12 ports:
13 - 5000:5000
14
15 recommendations:
16 # DOCKER_BUILDKIT=1 docker build . -f recommendations/Dockerfile \
17 # -t recommendations --secret id=ca.key,src=ca.key
18 image: recommendations
19 networks:
20 - microservices
21
22networks:
23 microservices:
您现在正在加密流量并验证您正在连接到正确的服务器。
相互认证
服务器现在证明它是可信的,但客户端不可信。幸运的是,TLS 允许对双方进行验证。Dockerfile
按照突出显示更新市场:
1# syntax = docker/dockerfile:1.0-experimental
2# DOCKER_BUILDKIT=1 docker build . -f marketplace/Dockerfile \
3# -t marketplace --secret id=ca.key,src=ca.key
4
5FROM python
6
7RUN mkdir /service
8COPY protobufs/ /service/protobufs/
9COPY marketplace/ /service/marketplace/
10COPY ca.pem /service/marketplace/
11
12WORKDIR /service/marketplace
13RUN python -m pip install -r requirements.txt
14RUN python -m grpc_tools.protoc -I ../protobufs --python_out=. \
15 --grpc_python_out=. ../protobufs/recommendations.proto
16RUN openssl req -nodes -newkey rsa:4096 -subj /CN=marketplace \
17 -keyout client.key -out client.csr
18RUN --mount=type=secret,id=ca.key \
19 openssl x509 -req -in client.csr -CA ca.pem -CAkey /run/secrets/ca.key \
20 -set_serial 1 -out client.pem
21
22EXPOSE 5000
23ENV FLASK_APP=marketplace.py
24ENTRYPOINT [ "flask", "run", "--host=0.0.0.0"]
这些更改类似于您在上一节中为 Recommendations 微服务所做的更改。
注意:如果您将私钥放在 Dockerfile 中,则不要将它们托管在公共存储库中。最好在运行时通过网络从只能通过 VPN 访问的服务器加载私钥。
更新serve()
在recommendations.py
验证客户端所强调的:
1def serve():
2 server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
3 recommendations_pb2_grpc.add_RecommendationsServicer_to_server(
4 RecommendationService(), server
5 )
6
7 with open("server.key", "rb") as fp:
8 server_key = fp.read()
9 with open("server.pem", "rb") as fp:
10 server_cert = fp.read()
11 with open("ca.pem", "rb") as fp:
12 ca_cert = fp.read()
13
14 creds = grpc.ssl_server_credentials(
15 [(server_key, server_cert)],
16 root_certificates=ca_cert,
17 require_client_auth=True,
18 )
19 server.add_secure_port("[::]:443", creds)
20 server.start()
21 server.wait_for_termination()
这将加载 CA 证书并需要客户端身份验证。
最后,更新marketplace.py
以将其证书发送到服务器,如突出显示:
1recommendations_host = os.getenv("RECOMMENDATIONS_HOST", "localhost")
2with open("client.key", "rb") as fp:
3 client_key = fp.read()
4with open("client.pem", "rb") as fp:
5 client_cert = fp.read()
6with open("ca.pem", "rb") as fp:
7 ca_cert = fp.read()
8creds = grpc.ssl_channel_credentials(ca_cert, client_key, client_cert)
9recommendations_channel = grpc.secure_channel(
10 f"{recommendations_host}:443", creds
11)
12recommendations_client = RecommendationsStub(recommendations_channel)
这会加载证书并将它们发送到服务器进行验证。
现在,如果您尝试使用另一个客户端连接到服务器,即使是使用 TLS 但证书未知的客户端,服务器将拒绝它并显示错误PEER_DID_NOT_RETURN_A_CERTIFICATE
。
重要提示:虽然可以通过这种方式管理双向 TLS,但这样做并不容易。如果您只想启用某些微服务向其他微服务发出请求,这将变得特别困难。
如果您需要像这样提高安全性,最好使用服务网格并让它为您管理证书和授权。除了拦截器部分提到的流量监控之外,Istio还可以管理双向TLS 和每服务授权。它也更安全,因为它将为您管理机密并更频繁地重新颁发证书。
这样就完成了微服务之间的安全通信。接下来,您将了解如何将 AsyncIO 与微服务结合使用。
异步IO和gRPC
官方 gRPC 包中的 AsyncIO 支持长期以来一直缺乏,但最近添加了。它仍处于试验阶段并正在积极开发中,但如果您真的想在微服务中尝试 AsyncIO,那么它可能是一个不错的选择。您可以查看gRPC AsyncIO 文档以了解更多详细信息。
还有一个名为的第三方包grpclib
,它实现了对 gRPC 的 AsyncIO 支持,并且已经存在的时间更长了。
在服务器端使用 AsyncIO 时要格外小心。很容易不小心编写阻塞代码,这会使您的微服务瘫痪。作为演示,以下是您如何使用 AsyncIO 编写 Recommendations 微服务并删除所有逻辑:
1import time
2
3import asyncio
4import grpc
5import grpc.experimental.aio
6
7from recommendations_pb2 import (
8 BookCategory,
9 BookRecommendation,
10 RecommendationResponse,
11)
12import recommendations_pb2_grpc
13
14class AsyncRecommendations(recommendations_pb2_grpc.RecommendationsServicer):
15 async def Recommend(self, request, context):
16 print("Handling request")
17 time.sleep(5) # Oops, blocking!
18 print("Done")
19 return RecommendationResponse(recommendations=[])
20
21async def main():
22 grpc.experimental.aio.init_grpc_aio()
23 server = grpc.experimental.aio.server()
24 server.add_insecure_port("[::]:50051")
25 recommendations_pb2_grpc.add_RecommendationsServicer_to_server(
26 AsyncRecommendations(), server
27 )
28 await server.start()
29 await server.wait_for_termination()
30
31asyncio.run(main())
这段代码有错误。在第 17 行,您不小心在async
函数内部进行了阻塞调用,这是一个很大的禁忌。由于 AsyncIO 服务器是单线程的,这会阻塞整个服务器,因此它一次只能处理一个请求。这比线程服务器要糟糕得多。
您可以通过发出多个并发请求来证明这一点:
1from concurrent.futures import ThreadPoolExecutor
2
3import grpc
4
5from recommendations_pb2 import BookCategory, RecommendationRequest
6from recommendations_pb2_grpc import RecommendationsStub
7
8request = RecommendationRequest(user_id=1, category=BookCategory.MYSTERY)
9channel = grpc.insecure_channel("localhost:50051")
10client = RecommendationsStub(channel)
11
12executor = ThreadPoolExecutor(max_workers=5)
13a = executor.submit(client.Recommend, request)
14b = executor.submit(client.Recommend, request)
15c = executor.submit(client.Recommend, request)
16d = executor.submit(client.Recommend, request)
17e = executor.submit(client.Recommend, request)
这将发出五个并发请求,但在服务器端,您会看到:
Handling request
Done
Handling request
Done
Handling request
Done
Handling request
Done
Handling request
Done
请求是按顺序处理的,这不是您想要的!
在服务器端有 AsyncIO 的用例,但你必须非常小心不要阻塞。这意味着你不能像requests
其他微服务一样使用标准包,甚至不能对其他微服务进行 RPC,除非你在另一个线程中使用run_in_executor
.
您还必须小心数据库查询。您开始使用的许多出色的 Python 包可能还不支持 AsyncIO,因此请小心检查它们是否支持。除非您对服务器端的 AsyncIO 有非常强烈的需求,否则等到有更多包支持时可能会更安全。阻止错误可能很难找到。
如果您想了解更多关于ASYNCIO,那么你就可以检查出入门异步特性在Python和异步IO在Python:一个完整的演练。
结论
微服务是管理复杂系统的一种方式。随着组织的发展,它们成为组织代码的自然方式。了解如何在 Python 中有效实现微服务可以让您在公司成长过程中变得更有价值。
在本教程中,您学习了:
- 如何使用gRPC有效实现 Python 微服务
- 如何将微服务部署到Kubernetes
- 如何在您的微服务中加入集成测试、拦截器、TLS和AsyncIO 等功能
- 什么最佳实践创建Python的微服务时要遵循
您现在可以开始将较大的 Python 应用程序分解为较小的微服务,从而使您的代码更有条理和更易于维护。
- 点赞
- 收藏
- 关注作者
评论(0)