返回介绍

12.1 用 Flask 和 PostgreSQL 流化数据

发布于 2024-01-23 21:41:45 字数 5888 浏览 0 评论 0 收藏 0

前面一节讨论了掌握数据存储系统有多么重要。这里将展示如何用PostgreSQL的一个高级特性构造一个HTTP事件流系统。

这个小应用的目的是将消息存储在一个SQL表中并通过HTTP REST API提供对这些消息的访问。每个消息由一个整数类型的channel、一个字符串类型的source、一个字符串类型的content组成。创建这个表的代码非常简单,如示例12.1所示。

示例 12.1  创建message

CREATE TABLE message (
 id SERIAL PRIMARY KEY,
 channel INTEGER NOT NULL,
 source TEXT NOT NULL,
 content TEXT NOT NULL
);

另外还需要做的是序列化这些消息,以便客户端能够实时对它们进行处理。这需要用到PostgreSQL的LISTEN(http://www.postgresql.org/docs/9.2/static/sql-listen.html)和NOTIFY(http://www.postgresql.org/docs/9.2/static/sql-notify.html)功能。这些功能可以监听来自函数的消息,这个函数由用户提供,由PostgreSQL执行,如示例12.2所示。

示例 12.2 notify_on_insert函数

CREATE OR REPLACE FUNCTION notify_on_insert() RETURNS trigger AS $$
BEGIN
 PERFORM pg_notify('channel_' || NEW.channel,
          CAST(row_to_json(NEW) AS TEXT));
 RETURN NULL;
END;
$$ LANGUAGE plpgsql;

这会创建一个用pl/pgsql编写的触发器函数,pl/pgsql语言只有PostgreSQL可以理解。需要注意的是,这个函数也可以用其他语言编写,如Python本身,因为PostgreSQL是通过嵌入Python解释器支持pl/python语言的。

函数会执行一个对pg_notify的调用。这是实际发送通知的函数。第一个参数是一个代表一个信道的字符串,第二个参数是携带实际净荷(palyload)的字符串。这里根据channel列在行内的的值来动态定义信道。在这个例子中,净荷是以JSON格式表示的整个行。没错,PostgreSQL原生地就知道如何将行转换为JSON。

我们希望对message表的每一次INSERT操作都发送通知消息,所以需要在这样的事件上出发这个函数,如示例12.3所示。

示例 12.3 notify_on_insert的触发器

CREATE TRIGGER notify_on_message_insert AFTER INSERT ON message
FOR EACH ROW EXECUTE PROCEDURE notify_on_insert();

搞定。这个函数已经插入并且在message表每一次INSERT操作成功后都会被 执行。

可以通过psql中的LISTEN操作检查它是否工作正常:

$ psql
psql (9.3rc1)
SSL connection (cipher: DHE-RSA-AES256-SHA, bits: 256)
Type "help" for help.

mydatabase=> LISTEN channel_1;
LISTEN
mydatabase=> INSERT INTO message(channel, source, content)
mydatabase-> VALUES(1, 'jd', 'hello world');
INSERT 0 1
Asynchronous notification "channel_1" with payload
"{"id":1,"channel":1,"source":"jd","content":"hello world"}"
received from server process with PID 26393.

一旦行被插入,通知就被发送,并且可以通过PostgreSQL客户端进行接收。现在需要做的就是构建Python应用对这个事件进行流化(stream),如示例12.4所示。

示例 12.4  在 Python 中接收通知

import psycopg2
import psycopg2.extensions
import select

conn = psycopg2.connect(database='mydatabase', user='myuser',
            password='idkfa', host='localhost')

conn.set_isolation_level(
  psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)

curs = conn.cursor()
curs.execute("LISTEN channel_1;")

while True:
  select.select([conn], [], [])
  conn.poll()
  while conn.notifies:
    notify = conn.notifies.pop()
    print("Got NOTIFY:", notify.pid, notify.channel, notify.payload)

上面的代码利用库psycopg2连接PostgreSQL。也可以使用一个提供了抽象层的库,如SQLAlchemy,但是它们都无法提供对PostgreSQLLISTEN/NOTIFY功能的访问。通过访问底层数据库连接去执行代码也是可能的,但是在这个例子中没必要那么做,因为这里并不需要任何ORM库提供的其他功能。

这个程序会在channel_1上进行监听。一旦收到通知则将其打印到屏幕上。如果运行这个程序并向message表中插入一行,则会得到如下输出:

$ python3 listen.py
Got NOTIFY: 28797 channel_1
{"id":10,"channel":1,"source":"jd","content":"hello world"}

现在我们将使用Flask(http://flask.pocoo.org/),一个简单的HTTP微型框架,去构造应用程序。这里将使用由HTML52中定义的Server-Sent Events(http://www.w3.org/TR/2009/ WD-eventsource-20090423/)消息协议,如示例12.5所示。

示例 12.5 Flask 流化应用程序

import flask
import psycopg2
import psycopg2.extensions
import select

app = flask.Flask(__name__)

def stream_messages(channel):
  conn = psycopg2.connect(database='mydatabase', user='mydatabase',
              password='mydatabase', host='localhost')
  conn.set_isolation_level(
    psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)

  curs = conn.cursor()
  curs.execute("LISTEN channel_%d;" % int(channel))

  while True:
    select.select([conn], [], [])
    conn.poll()
    while conn.notifies:
      notify = conn.notifies.pop()
      yield "data: " + notify.payload + "\n\n"

@app.route("/message/<channel>", methods=['GET'])
def get_messages(channel):
  return flask.Response(stream_messages(channel),
             mimetype='text/event-stream')

if __name__ == "__main__":
  app.run()

这个应用程序非常简单并且只是为这个例子支持了流化。我们使用Flask将请求路由到GET/message/<channel>,一旦代码被调用,它将以mimetype为text/event-stream的格式进行响应,发回一个生成器函数而非一个字符串。Flask接下来将调用这个函数并在每次生成器生成东西时发送结果。

生成器stream_messages重用了之前写的用来监听PostgreSQL通知的代码。它接收信道Id作为参数,监听这个信道,并生成其有效载荷。记住,我们在触发器函数中用的是PostgreSQL的JSON编码函数,所以从PostgreSQL收到的就是JSON格式的数据,因为发送JSON数据给HTTP客户端没有任何问题,所以无需转换编码。

注意

为简单起见,这个示例应用程序被写在了一个单独的文件中。在一本书中描述一个横跨多个模块的例子有点儿困难。如果这是一个真正的应用程序,那么最好是将存储处理的实现放到一个自己的Python模块中。

现在可以运行这个服务器了:

$ python listen+http.py
 * Running on http://127.0.0.1:5000/

在另一个终端中,可以进行连接并在事件进入时对数据进行抽取。在连接时,不会接收收据并且连接保持开放状态。

$ curl -v http://127.0.0.1:5000/message/1
* About to connect() to 127.0.0.1 port 5000 (#0)
*  Trying 127.0.0.1...
* Adding handle: conn: 0x1d46e90
* Adding handle: send: 0
* Adding handle: recv: 0
* Curl_addHandleToPipeline: length: 1
* - Conn 0 (0x1d46e90) send_pipe: 1, recv_pipe: 0
* Connected to 127.0.0.1 (127.0.0.1) port 5000 (#0)
> GET /message/1 HTTP/1.1
> User-Agent: curl/
> Host: 127.0.0.1:5000
> Accept: */*
>

但一旦插入一些行到message表中时:

mydatabase=> INSERT INTO message(channel, source, content)
mydatabase-> VALUES(1, 'jd', 'hello world');
INSERT 0 1
mydatabase=> INSERT INTO message(channel, source, content)
mydatabase-> VALUES(1, 'jd', 'it works');
INSERT 0 1

终端上curl运行的位置就会有数据输出:

data: {"id":71,"channel":1,"source":"jd","content":"hello world"}

data: {"id":72,"channel":1,"source":"jd","content":"it works"}

关于这个应用程序的一个朴素的且可以说更轻便的实现3不是通过一个SELECT语句一次次查询是否有新数据插入表内。不过,没有必要在这里展示这样一个推送系统(push system),尽管它比持续地轮询数据库要效率高。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文