Python入门Web应用开发 Tornado

举报
Gere 发表于 2022/07/16 21:43:43 2022/07/16
【摘要】 网络需要同时支持数千个客户,Tornado 是一个 Python Web 框架和一个异步网络库,它依赖于非阻塞网络 I/O 来为 Web 应用程序提供服务。因此,它可以处理数千个活动服务器连接。它是维护长轮询和大量活动连接的应用程序的救星。一个函数阻塞, 执行它的计算,一旦完成就返回。一个函数可能会因为多种原因而阻塞:网络 I/O、磁盘 I/O、互斥锁等。应用程序性能取决于应用程序使用 CP...

网络需要同时支持数千个客户,Tornado 是一个 Python Web 框架和一个异步网络库,它依赖于非阻塞网络 I/O 来为 Web 应用程序提供服务。因此,它可以处理数千个活动服务器连接。它是维护长轮询和大量活动连接的应用程序的救星。

一个函数阻塞, 执行它的计算,一旦完成就返回。一个函数可能会因为多种原因而阻塞:网络 I/O、磁盘 I/O、互斥锁等。

应用程序性能取决于应用程序使用 CPU 周期的效率,这就是为什么必须认真对待阻塞语句/调用的原因。考虑像bcrypt这样的密码散列函数,它在设计上使用数百毫秒的 CPU 时间,远远超过典型的网络或磁盘访问。由于 CPU 不是空闲的,因此无需使用异步功能。 一个函数可以在一个函数中是阻塞的,而在其他函数中是非阻塞的。在 Tornado 的上下文中,我们通常会考虑由于网络 I/O 和磁盘而导致的阻塞,尽管需要尽量减少各种阻塞。

异步程序

1)单线程架构:

    意味着,它不能并行执行以计算为中心的任务。

2)I/O并发:

    它可以将IO任务交给操作系统,继续下一个任务,实现并行。

3)epoll/kqueue:

    下划线与系统相关的构造,它允许应用程序获取文件描述符或 I/O 特定任务的事件。

4)事件循环:

    它使用 epoll 或 kqueue 来检查是否发生了任何事件,并执行等待这些网络事件的回调。

异步与同步 Web 框架:

在同步模型的情况下,每个请求或任务都被转移到线程或路由中,并在完成时将结果移交给调用者。在这里,管理事情很容易,但创建新线程的开销太大。

另一方面,在异步框架中,如 Node.js,有一个单线程模型,因此开销非常小,但它具有复杂性。

让我们假设有数千个请求通过,服务器使用事件循环和回调。现在,在请求被处理之前,它必须有效地存储和管理该请求的状态,以将回调结果映射到实际的客户端。

协程是在 Tornado 中编写异步代码的推荐方式。协程使用 Pythonyield关键字来挂起和恢复执行,而不是一系列回调(在gevent等框架中看到的协作轻量级线程有时也称为协程,但在 Tornado 中,所有协程都使用显式上下文切换并被称为异步函数)。 协程几乎和同步代码一样简单,但没有线程的开销。它们还通过减少可能发生上下文切换的位置数量,使并发更容易推理。 

安装pip install tornado

Tornado Web 应用程序通常由一个或多 RequestHandler个子类、一个Application将传入请求路由到处理程序的对象和一个main()启动服务器的函数组成。  

官网:Tornado Web Server — Tornado 6.1 documentation

在本地运行时,可以通过127.0.0.1:8888 从浏览器访问来访问服务器。
服务器将返回“Hello World”。
make_app() 函数中,根/ 映射到MainHandler. 这意味着对根 IP 的请求127.0.0.1:8888 将被映射到该MainHandler 函数。 

Application:Application对象负责全局配置,包括将请求映射到处理程序的路由表。

​RequestHandler:Tornado Web 应用程序的大部分工作都是在RequestHandler. 处理程序子类的主要入口点是一个以正在处理的 HTTP 方法命名的方法:get()、 post()等。每个处理程序可以定义一个或多个这些方法来处理不同的 HTTP 操作。

处理请求输入:请求处理程序可以使用 访问表示当前请求的对象self.request。

错误处理:如果处理程序引发异常,Tornado 将调用 RequestHandler.write_error以生成错误页面。 tornado.web.HTTPError可用于生成指定的状态码;所有其他异常都返回 500 状态。

在 Tornado 中有两种主要的方式可以重定向请求: RequestHandler.redirect和使用RedirectHandler.

异步处理程序:Tornado 处理程序默认是同步的:当 get()/post()方法返回时,请求被认为已完成并发送响应。由于在一个处理程序运行时所有其他请求都被阻塞,因此任何长时间运行的处理程序都应设为异步,以便它可以以非阻塞方式调用其慢速操作。​ 使处理程序异步的最简单方法是使用 coroutine装饰器。这允许使用关键字执行非阻塞 I/O yield,并且在协程返回之前不会发送任何响应。 ​

 Tornado 的一个简单的“Hello, world”示例 Web 应用程序: ​

import tornado.ioloop
import tornado.web

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("Hello, world")

def make_app():
    return tornado.web.Application([
        (r"/", MainHandler),
    ])

if __name__ == "__main__":
    app = make_app()
    app.listen(8888)
    tornado.ioloop.IOLoop.current().start()

使用AsyncHTTPClient模块,我们可以异步进行 REST 调用。 `yield http_client.fetch(url)` 将作为协程运行。

from tornado.httpclient import AsyncHTTPClient
from tornado import gen

@gen.coroutine
def async_fetch_gen(url):
    http_client = AsyncHTTPClient()
    response = yield http_client.fetch(url)
    raise gen.Return(response.body)

使用 Tornado 的 WebSockets:

Tornado 有内置的 WebSockets 包,可以很容易地与协程一起使用来实现并发

import logging
import tornado.escape
import tornado.ioloop
import tornado.options
import tornado.web
import tornado.websocket
from tornado.options import define, options
from tornado.httpserver import HTTPServer

define("port", default=8888, help="run on the given port", type=int)


# queue_size = 1
# producer_num_items = 5
# q = queues.Queue(queue_size)

def isPrime(num):
    """
    Simple worker but mostly IO/network call
    """
    if num > 1:
        for i in range(2, num // 2):
            if (num % i) == 0:
                return ("is not a prime number")
        else:
            return("is a prime number")
    else:
        return ("is not a prime number")

class Application(tornado.web.Application):
    def __init__(self):
        handlers = [(r"/chatsocket", TornadoWebSocket)]
        super(Application, self).__init__(handlers)

class TornadoWebSocket(tornado.websocket.WebSocketHandler):
    clients = set()

    # enable cross domain origin
    def check_origin(self, origin):
        return True

    def open(self):
        TornadoWebSocket.clients.add(self)

    # when client closes connection
    def on_close(self):
        TornadoWebSocket.clients.remove(self)

    @classmethod
    def send_updates(cls, producer, result):

        for client in cls.clients:

            # check if result is mapped to correct sender
            if client == producer:
                try:
                    client.write_message(result)
                except:
                    logging.error("Error sending message", exc_info=True)

    def on_message(self, message):
        try:
            num = int(message)
        except ValueError:
            TornadoWebSocket.send_updates(self, "Invalid input")
            return
        TornadoWebSocket.send_updates(self, isPrime(num))

def start_websockets():
    tornado.options.parse_command_line()
    app = Application()
    server = HTTPServer(app)
    server.listen(options.port)
    tornado.ioloop.IOLoop.current().start()



if __name__ == "__main__":
    start_websockets()

一个可以使用 WebSocket 客户端应用程序连接到服务器,消息可以是任何整数。处理后,客户端会收到整数是否为素数的结果。  

启动 worker(s),他们会监听 ' tornado.queue '。这个队列是异步的,与 asyncio 包非常相似。

# Example 1
from tornado import gen, queues
from tornado.ioloop import IOLoop

@gen.coroutine
def consumer(queue, num_expected):
    for _ in range(num_expected):
        # heavy I/O or network task
        print('got: %s' % (yield queue.get()))


@gen.coroutine
def producer(queue, num_items):
    for i in range(num_items):
        print('putting %s' % i)
        yield queue.put(i)

@gen.coroutine
def main():
    """
    Starts producer and consumer and wait till they finish
    """
    yield [producer(q, producer_num_items), consumer(q, producer_num_items)]

queue_size = 1
producer_num_items = 5
q = queues.Queue(queue_size)

results = IOLoop.current().run_sync(main)


# Output:
# putting 0
# putting 1
# got: 0
# got: 1
# putting 2
# putting 3
# putting 4
# got: 2
# got: 3
# got: 4


# Example 2

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.locks import Condition

my_condition = Condition()

@gen.coroutine
def waiter():
    print("I'll wait right here")
    yield my_condition.wait()
    print("Received notification now doing my things")

@gen.coroutine
def notifier():
    yield gen.sleep(60)
    print("About to notify")
    my_condition.notify()
    print("Done notifying")

@gen.coroutine
def runner():
    # Wait for waiter() and notifier() in parallel
    yield([waiter(), notifier()])

results = IOLoop.current().run_sync(runner)

# output:

# I'll wait right here
# About to notify
# Done notifying
# Received notification now doing my things

运行和部署

由于 Tornado 提供了自己的 HTTPServer,因此运行和部署它与其他 Python Web 框架略有不同。无需配置 WSGI 容器来查找您的应用程序,而是编写一个 main()启动服务器的函数。

def main():
    app = make_app()
    app.listen(8888)
    IOLoop.current().start()

if __name__ == '__main__':
    main()

配置操作系统或进程管理器以运行此程序来启动服务器。可能需要增加每个进程的打开文件数(以避免“打开文件过多”-错误)。要提高此限制(例如将其设置为 50000),可以使用 ulimit 命令,修改 /etc/security/limits.conf 或minfds在您的 supervisord 配置中设置。

由于 Python GIL(全局解释器锁),需要运行多个 Python 进程才能充分利用多 CPU 机器。通常最好每个 CPU 运行一个进程。 Tornado 包含一个内置的多进程模式,可以同时启动多个进程。

def main():
    app = make_app()
    server = tornado.httpserver.HTTPServer(app)
    server.bind(8888)
    server.start(0)  # forks one process per cpu
    IOLoop.current().start()

在像 nginx 这样的负载均衡器后面运行时,建议传递xheaders=True给HTTPServer构造函数。这将告诉 Tornado 使用诸如X-Real-IP获取用户 IP 地址之类的标头,而不是将所有流量归因于平衡器的 IP 地址。 这是一个准系统 nginx 配置文件,其结构类似于我们在 FriendFeed 使用的配置文件。它假设 nginx 和 Tornado 服务器运行在同一台机器上,并且四个 Tornado 服务器运行在端口 8000 - 8003 上:

user nginx;
worker_processes 1;

error_log /var/log/nginx/error.log;
pid /var/run/nginx.pid;

events {
    worker_connections 1024;
    use epoll;
}

http {
    # Enumerate all the Tornado servers here
    upstream frontends {
        server 127.0.0.1:8000;
        server 127.0.0.1:8001;
        server 127.0.0.1:8002;
        server 127.0.0.1:8003;
    }

    include /etc/nginx/mime.types;
    default_type application/octet-stream;

    access_log /var/log/nginx/access.log;

    keepalive_timeout 65;
    proxy_read_timeout 200;
    sendfile on;
    tcp_nopush on;
    tcp_nodelay on;
    gzip on;
    gzip_min_length 1000;
    gzip_proxied any;
    gzip_types text/plain text/html text/css text/xml
               application/x-javascript application/xml
               application/atom+xml text/javascript;

    # Only retry if there was a communication error, not a timeout
    # on the Tornado server (to avoid propagating "queries of death"
    # to all frontends)
    proxy_next_upstream error;

    server {
        listen 80;

        # Allow file uploads
        client_max_body_size 50M;

        location ^~ /static/ {
            root /var/www;
            if ($query_string) {
                expires max;
            }
        }
        location = /favicon.ico {
            rewrite (.*) /static/favicon.ico;
        }
        location = /robots.txt {
            rewrite (.*) /static/robots.txt;
        }

        location / {
            proxy_pass_header Server;
            proxy_set_header Host $http_host;
            proxy_redirect off;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Scheme $scheme;
            proxy_pass http://frontends;
        }
    }
}
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。