12.1 用 Flask 和 PostgreSQL 流化数据
前面一节讨论了掌握数据存储系统有多么重要。这里将展示如何用PostgreSQL的一个高级特性构造一个HTTP事件流系统。
这个小应用的目的是将消息存储在一个SQL表中并通过HTTP REST API提供对这些消息的访问。每个消息由一个整数类型的channel、一个字符串类型的source、一个字符串类型的content组成。创建这个表的代码非常简单,如示例12.1所示。
示例 12.1 创建message表
CREATE TABLE message ( id SERIAL PRIMARY KEY, channel INTEGER NOT NULL, source TEXT NOT NULL, content TEXT NOT NULL );
另外还需要做的是序列化这些消息,以便客户端能够实时对它们进行处理。这需要用到PostgreSQL的LISTEN(http://www.postgresql.org/docs/9.2/static/sql-listen.html)和NOTIFY(http://www.postgresql.org/docs/9.2/static/sql-notify.html)功能。这些功能可以监听来自函数的消息,这个函数由用户提供,由PostgreSQL执行,如示例12.2所示。
示例 12.2 notify_on_insert函数
CREATE OR REPLACE FUNCTION notify_on_insert() RETURNS trigger AS $$ BEGIN PERFORM pg_notify('channel_' || NEW.channel, CAST(row_to_json(NEW) AS TEXT)); RETURN NULL; END; $$ LANGUAGE plpgsql;
这会创建一个用pl/pgsql编写的触发器函数,pl/pgsql语言只有PostgreSQL可以理解。需要注意的是,这个函数也可以用其他语言编写,如Python本身,因为PostgreSQL是通过嵌入Python解释器支持pl/python语言的。
函数会执行一个对pg_notify的调用。这是实际发送通知的函数。第一个参数是一个代表一个信道的字符串,第二个参数是携带实际净荷(palyload)的字符串。这里根据channel列在行内的的值来动态定义信道。在这个例子中,净荷是以JSON格式表示的整个行。没错,PostgreSQL原生地就知道如何将行转换为JSON。
我们希望对message表的每一次INSERT操作都发送通知消息,所以需要在这样的事件上出发这个函数,如示例12.3所示。
示例 12.3 notify_on_insert的触发器
CREATE TRIGGER notify_on_message_insert AFTER INSERT ON message FOR EACH ROW EXECUTE PROCEDURE notify_on_insert();
搞定。这个函数已经插入并且在message表每一次INSERT操作成功后都会被 执行。
可以通过psql中的LISTEN操作检查它是否工作正常:
$ psql psql (9.3rc1) SSL connection (cipher: DHE-RSA-AES256-SHA, bits: 256) Type "help" for help. mydatabase=> LISTEN channel_1; LISTEN mydatabase=> INSERT INTO message(channel, source, content) mydatabase-> VALUES(1, 'jd', 'hello world'); INSERT 0 1 Asynchronous notification "channel_1" with payload "{"id":1,"channel":1,"source":"jd","content":"hello world"}" received from server process with PID 26393.
一旦行被插入,通知就被发送,并且可以通过PostgreSQL客户端进行接收。现在需要做的就是构建Python应用对这个事件进行流化(stream),如示例12.4所示。
示例 12.4 在 Python 中接收通知
import psycopg2 import psycopg2.extensions import select conn = psycopg2.connect(database='mydatabase', user='myuser', password='idkfa', host='localhost') conn.set_isolation_level( psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) curs = conn.cursor() curs.execute("LISTEN channel_1;") while True: select.select([conn], [], []) conn.poll() while conn.notifies: notify = conn.notifies.pop() print("Got NOTIFY:", notify.pid, notify.channel, notify.payload)
上面的代码利用库psycopg2连接PostgreSQL。也可以使用一个提供了抽象层的库,如SQLAlchemy,但是它们都无法提供对PostgreSQLLISTEN/NOTIFY功能的访问。通过访问底层数据库连接去执行代码也是可能的,但是在这个例子中没必要那么做,因为这里并不需要任何ORM库提供的其他功能。
这个程序会在channel_1上进行监听。一旦收到通知则将其打印到屏幕上。如果运行这个程序并向message表中插入一行,则会得到如下输出:
$ python3 listen.py Got NOTIFY: 28797 channel_1 {"id":10,"channel":1,"source":"jd","content":"hello world"}
现在我们将使用Flask(http://flask.pocoo.org/),一个简单的HTTP微型框架,去构造应用程序。这里将使用由HTML52中定义的Server-Sent Events(http://www.w3.org/TR/2009/ WD-eventsource-20090423/)消息协议,如示例12.5所示。
示例 12.5 Flask 流化应用程序
import flask import psycopg2 import psycopg2.extensions import select app = flask.Flask(__name__) def stream_messages(channel): conn = psycopg2.connect(database='mydatabase', user='mydatabase', password='mydatabase', host='localhost') conn.set_isolation_level( psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) curs = conn.cursor() curs.execute("LISTEN channel_%d;" % int(channel)) while True: select.select([conn], [], []) conn.poll() while conn.notifies: notify = conn.notifies.pop() yield "data: " + notify.payload + "\n\n" @app.route("/message/<channel>", methods=['GET']) def get_messages(channel): return flask.Response(stream_messages(channel), mimetype='text/event-stream') if __name__ == "__main__": app.run()
这个应用程序非常简单并且只是为这个例子支持了流化。我们使用Flask将请求路由到GET/message/<channel>,一旦代码被调用,它将以mimetype为text/event-stream的格式进行响应,发回一个生成器函数而非一个字符串。Flask接下来将调用这个函数并在每次生成器生成东西时发送结果。
生成器stream_messages重用了之前写的用来监听PostgreSQL通知的代码。它接收信道Id作为参数,监听这个信道,并生成其有效载荷。记住,我们在触发器函数中用的是PostgreSQL的JSON编码函数,所以从PostgreSQL收到的就是JSON格式的数据,因为发送JSON数据给HTTP客户端没有任何问题,所以无需转换编码。
注意
为简单起见,这个示例应用程序被写在了一个单独的文件中。在一本书中描述一个横跨多个模块的例子有点儿困难。如果这是一个真正的应用程序,那么最好是将存储处理的实现放到一个自己的Python模块中。
现在可以运行这个服务器了:
$ python listen+http.py * Running on http://127.0.0.1:5000/
在另一个终端中,可以进行连接并在事件进入时对数据进行抽取。在连接时,不会接收收据并且连接保持开放状态。
$ curl -v http://127.0.0.1:5000/message/1 * About to connect() to 127.0.0.1 port 5000 (#0) * Trying 127.0.0.1... * Adding handle: conn: 0x1d46e90 * Adding handle: send: 0 * Adding handle: recv: 0 * Curl_addHandleToPipeline: length: 1 * - Conn 0 (0x1d46e90) send_pipe: 1, recv_pipe: 0 * Connected to 127.0.0.1 (127.0.0.1) port 5000 (#0) > GET /message/1 HTTP/1.1 > User-Agent: curl/ > Host: 127.0.0.1:5000 > Accept: */* >
但一旦插入一些行到message表中时:
mydatabase=> INSERT INTO message(channel, source, content) mydatabase-> VALUES(1, 'jd', 'hello world'); INSERT 0 1 mydatabase=> INSERT INTO message(channel, source, content) mydatabase-> VALUES(1, 'jd', 'it works'); INSERT 0 1
终端上curl运行的位置就会有数据输出:
data: {"id":71,"channel":1,"source":"jd","content":"hello world"} data: {"id":72,"channel":1,"source":"jd","content":"it works"}
关于这个应用程序的一个朴素的且可以说更轻便的实现3不是通过一个SELECT语句一次次查询是否有新数据插入表内。不过,没有必要在这里展示这样一个推送系统(push system),尽管它比持续地轮询数据库要效率高。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论