返回介绍

01. Python 工具

02. Python 基础

03. Numpy

04. Scipy

05. Python 进阶

06. Matplotlib

07. 使用其他语言进行扩展

08. 面向对象编程

09. Theano 基础

10. 有趣的第三方模块

11. 有用的工具

12. Pandas

Theano 循环:scan(详解)

发布于 2022-09-03 20:46:15 字数 28430 浏览 0 评论 0 收藏 0

In [1]:

import theano, time
import theano.tensor as T
import numpy as np

def floatX(X):
    return np.asarray(X, dtype=theano.config.floatX)
Using gpu device 1: Tesla C2075 (CNMeM is disabled)

theano 中可以使用 scan 进行循环,常用的 mapreduce 操作都可以看成是 scan 的特例。

scan 通常作用在一个序列上,每次处理一个输入,并输出一个结果。

sum(x) 函数可以看成是 z + x(i) 函数在给定 z = 0 的情况下,对 x 的一个 scan

通常我们可以将一个 for 循环表示成一个 scan 操作,其好处如下:

  • 迭代次数成为符号图结构的一部分
  • 最小化 GPU 数据传递
  • 序列化梯度计算
  • 速度比 for 稍微快一些
  • 降低内存使用

scan 的使用

函数的用法如下:

theano.scan(fn, 
            sequences=None, 
            outputs_info=None, 
            non_sequences=None, 
            n_steps=None, 
            truncate_gradient=-1, 
            go_backwards=False, 
            mode=None, 
            name=None, 
            profile=False, 
            allow_gc=None, 
            strict=False)

主要参数的含义:

  • fn
    • 一步 scan 所进行的操作
  • sequences
    • 输入的序列
  • outputs_info
    • 前一步输出结果的初始状态
  • non_sequences
    • 非序列参数
  • n_steps
    • 迭代步数
  • go_backwards
    • 是否从后向前遍历

输出为一个元组 (outputs, updates)

  • outputs
    • 从初始状态开始,每一步 fn 的输出结果
  • updates
    • 一个字典,用来记录 scan 过程中用到的共享变量更新规则,构造函数的时候,如果需要更新共享变量,将这个变量当作 updates 的参数传入。

scan 和 map

这里实现一个简单的 map 操作,将向量 $\mathbf x$ 中的所有元素变成原来的两倍:

map(lambda t: t * 2, x)

In [2]:

x = T.vector()

results, _ = theano.scan(fn = lambda t: t * 2,
                         sequences = x)
x_double_scan = theano.function([x], results)

print x_double_scan(range(10))
[  0\.   2\.   4\.   6\.   8\.  10\.  12\.  14\.  16\.  18.]

之前我们说到,theano 中的 mapscan 的一个特例,因此 theano.map 的用法其实跟 theano.scan 十分类似。

由于不需要考虑前一步的输出结果,所以 theano.map 的参数中没有 outputs_info 这一部分。

我们用 theano.map 实现相同的效果:

In [3]:

result, _ = theano.map(fn = lambda t: t * 2,
                       sequences = x)
x_double_map = theano.function([x], result)

print x_double_map(range(10))
[  0\.   2\.   4\.   6\.   8\.  10\.  12\.  14\.  16\.  18.]

scan 和 reduce

这里一个简单的 reduce 操作,求和:

reduce(lambda a, b: a + b, x)

In [4]:

result, _ = theano.scan(fn = lambda t, v: t + v,
                        sequences = x,
                        outputs_info = floatX(0.))

# 因为每一步的输出值都会被记录到最后的 result 中,所以最后的和是 result 的最后一个元素。
x_sum_scan = theano.function([x], result[-1])

# 计算 1 + 2 + ... + 10
print x_sum_scan(range(10))
45.0

theano.reduce 也是 scan 的一个特例,使用 theano.reduce 实现相同的效果:

In [5]:

result, _ = theano.reduce(fn = lambda t, v: t + v,
                          sequences = x,
                          outputs_info = 0.)

x_sum_reduce = theano.function([x], result)

# 计算 1 + 2 + ... + 10
print x_sum_reduce(range(10))
45.0

reducescan 不同的地方在于,result 包含的内容并不是每次输出的结果,而是最后一次输出的结果。

scan 的使用

输入与输出

fn 是一个函数句柄,对于这个函数句柄,它每一步接受的参数是由 sequences, outputs_info, non_sequence 这三个参数所决定的,并且按照以下的顺序排列:

  • sequences 中第一个序列的值
  • ...
  • sequences 中最后一个序列的值
  • outputs_info 中第一个输出之前的值
  • ...
  • outputs_info 中最后一个输出之前的值
  • non_squences 中的参数

这些序列的顺序与在参数 sequences, outputs_info 中指定的顺序相同。

默认情况下,在第 k 次迭代时,如果 sequencesoutputs_info 中给定的值不是字典(dictionary)或者一个字典列表(list of dictionaries),那么

  • sequences 中的序列 seq 传入 fn 的是 seq[k] 的值
  • outputs_info 中的序列 output 传入 fn 的是 output[k-1] 的值

fn 的返回值有两部分 (outputs_list, update_dictionary),第一部分将作为序列,传入 outputs 中,与 outputs_info 中的初始输入值的维度一致(如果没有给定 outputs_info ,输出值可以任意。)

第二部分则是更新规则的字典,告诉我们如何对 scan 中使用到的一些共享的变量进行更新:

return [y1_t, y2_t], {x:x+1}

这两部分可以任意,即顺序既可以是 (outputs_list, update_dictionary), 也可以是 (update_dictionary, outputs_list)theano 会根据类型自动识别。

两部分只需要有一个存在即可,另一个可以为空。

例子分析

例如,在我们的第一个例子中

theano.scan(fn = lambda t: t * 2,
            sequences = x)

在第 k 次迭代的时候,传入参数 t 的值为 x[k]

再如,在我们的第二个例子中:

theano.scan(fn = lambda t, v: t + v,
            sequences = x,
            outputs_info = floatX(0.))

fn 接受了两个参数,初始迭代时,按照规则,t 接受的参数为 x[0]v 接受的参数为我们传入 outputs_info 的第一个初始值即 0 (认为是 outputs[-1]),他们的结果 t+v 将作为 outputs[0] 的值传入下一次迭代以及最终 scan 输出的 outputs 值中。

输入多个序列

我们可以一次输入多个序列,这些序列会按照顺序传入 fn 的参数中,例如计算多项式 $$ \sum_{n=0}^N a_n x^ n $$ 时,我们可以将多项式的系数和幂数两个序列放到一个 list 中作为输入参数:

In [6]:

# 变量 x
x = T.scalar("x")

# 不为 0 的系数
A = T.vectors("A")

# 对应的幂数
N = T.ivectors("N")

# a 对应的是 A, n 对应 N,v 对应 x
components, _ = theano.scan(fn = lambda a, n, v: a * (v ** n),
                            sequences = [A, N],
                            non_sequences = x)

result = components.sum()

polynomial = theano.function([x, A, N], result)

# 计算 1 + 3 * 10 ^ 2 + 2 * 10^3 = 2301
print polynomial(floatX(10), 
                 floatX([1, 3, 2]),
                 [0, 2, 3])
2301.0

使用序列的多个值

默认情况下,我们只能使用输入序列的当前时刻的值,以及前一个输出的输出值。

事实上,theano 会将参数中的序列变成一个有 inputtaps 两个键值的 dict

  • input:输入的序列
  • taps:要传入 fn 的值的列表
    • 对于 sequences 参数中的序列来说,默认值为 [0],表示时间 t 传入 t+0 时刻的序列值,可以为正,可以为负。
    • 对于 outputs_info 参数中的序列来说,默认值为 [-1],表示时间 t 传入 t-1 时刻的序列值,只能为负值,如果值为 None,表示这个输出结果不会作为参数传入 fn 中。

传入 fn 的参数也会按照 taps 中的顺序来排列,我们考虑下面这个例子:

scan(fn, sequences = [ dict(input= Sequence1, taps = [-3,2,-1])
                     , Sequence2
                     , dict(input =  Sequence3, taps = 3) ]
       , outputs_info = [ dict(initial =  Output1, taps = [-3,-5])
                        , dict(initial = Output2, taps = None)
                        , Output3 ]
       , non_sequences = [ Argument1, Argument2])

首先是 Sequence1[-3, 2, -1] 被传入,然后 Sequence2 不是 dict, 所以传入默认值 [0]Sequence3 传入的参数是 3,所以 fn 在第 t 步接受的前几个参数是:

Sequence1[t-3]
Sequence1[t+2]
Sequence1[t-1]
Sequence2[t]
Sequence3[t+3]

然后 Output1 传入的是 [-3, -5]传入的初始值的形状应为 shape (5,)+),Output2 不作为参数传入,Output3 传入的是 [-1],所以接下的参数是:

Output1[t-3]
Output1[t-5]
Output3[t-1]
Argument1
Argument2

总的说来上面的例子中,fn 函数按照以下顺序最多接受这样 10 个参数:

Sequence1[t-3]
Sequence1[t+2]
Sequence1[t-1]
Sequence2[t]
Sequence3[t+3]
Output1[t-3]
Output1[t-5]
Output3[t-1]
Argument1
Argument2

例子,假设 $x$ 是我们的输入,$y$ 是我们的输出,我们需要计算 $y(t) = tanh\left[W{1} y(t-1) + W{2} x(t) + W_{3} x(t-1)\right]$ 的值:

In [7]:

X = T.matrix("X")
Y = T.vector("y")

W_1 = T.matrix("W_1")
W_2 = T.matrix("W_2")
W_3 = T.matrix("W_3")

# W_yy 和 W_xy 作为不变的参数可以直接使用
results, _ = theano.scan(fn = lambda x, x_pre, y: T.tanh(T.dot(W_1, y) + T.dot(W_2, x) + T.dot(W_3, x_pre)), 
                         # 0 对应 x,-1 对应 x_pre
                         sequences = dict(input=X, taps=[0, -1]), 
                         outputs_info = Y)

Y_seq = theano.function(inputs = [X, Y, W_1, W_2, W_3], 
                        outputs = results)

测试小矩阵计算:

In [8]:

# 测试
t = 1001
x_dim = 10
y_dim = 20

x = 2 * floatX(np.random.random([t, x_dim])) - 1
y = 2 * floatX(np.zeros(y_dim)) - 1
w_1 = 2 * floatX(np.random.random([y_dim, y_dim])) - 1
w_2 = 2 * floatX(np.random.random([y_dim, x_dim])) - 1
w_3 = 2 * floatX(np.random.random([y_dim, x_dim])) - 1

tic = time.time()

y_res_theano = Y_seq(x, y, w_1, w_2, w_3)

print "theano running time {:.4f} s".format(time.time() - tic)

tic = time.time()
# 与 numpy 的结果进行比较:
y_res_numpy = np.zeros([t, y_dim])
y_res_numpy[0] = y

for i in range(1, t):
    y_res_numpy[i] = np.tanh(w_1.dot(y_res_numpy[i-1]) + w_2.dot(x[i]) + w_3.dot(x[i-1]))

print "numpy  running time {:.4f} s".format(time.time() - tic)

# 这里要从 1 开始,因为使用了 x(t-1),所以 scan 从第 1 个位置开始计算
print "the max difference of the first 10 results is", np.max(np.abs(y_res_theano[0:10] - y_res_numpy[1:11]))
theano running time 0.0537 s
numpy  running time 0.0197 s
the max difference of the first 10 results is 1.25780650354e-06

测试大矩阵运算:

In [9]:

# 测试
t = 1001
x_dim = 100
y_dim = 200

x = 2 * floatX(np.random.random([t, x_dim])) - 1
y = 2 * floatX(np.zeros(y_dim)) - 1
w_1 = 2 * floatX(np.random.random([y_dim, y_dim])) - 1
w_2 = 2 * floatX(np.random.random([y_dim, x_dim])) - 1
w_3 = 2 * floatX(np.random.random([y_dim, x_dim])) - 1

tic = time.time()

y_res_theano = Y_seq(x, y, w_1, w_2, w_3)

print "theano running time {:.4f} s".format(time.time() - tic)

tic = time.time()
# 与 numpy 的结果进行比较:
y_res_numpy = np.zeros([t, y_dim])
y_res_numpy[0] = y

for i in range(1, t):
    y_res_numpy[i] = np.tanh(w_1.dot(y_res_numpy[i-1]) + w_2.dot(x[i]) + w_3.dot(x[i-1]))

print "numpy  running time {:.4f} s".format(time.time() - tic)

# 这里要从 1 开始,因为使用了 x(t-1),所以 scan 从第 1 个位置开始计算
print "the max difference of the first 10 results is", np.max(np.abs(y_res_theano[:10] - y_res_numpy[1:11]))
theano running time 0.0754 s
numpy  running time 0.1334 s
the max difference of the first 10 results is 0.000656997077348

值得注意的是,由于 theanonumpy 在某些计算的实现上存在一定的差异,随着序列长度的增加,这些差异将被放大:

In [10]:

for i in xrange(20):
    print "iter {:03d}, max diff:{:.6f}".format(i + 1, 
                                                np.max(np.abs(y_res_numpy[i + 1,:] - y_res_theano[i,:])))
iter 001, max diff:0.000002
iter 002, max diff:0.000005
iter 003, max diff:0.000007
iter 004, max diff:0.000010
iter 005, max diff:0.000024
iter 006, max diff:0.000049
iter 007, max diff:0.000113
iter 008, max diff:0.000145
iter 009, max diff:0.000334
iter 010, max diff:0.000657
iter 011, max diff:0.001195
iter 012, max diff:0.002778
iter 013, max diff:0.004561
iter 014, max diff:0.004748
iter 015, max diff:0.014849
iter 016, max diff:0.012696
iter 017, max diff:0.043639
iter 018, max diff:0.046540
iter 019, max diff:0.083032
iter 020, max diff:0.123678

控制循环次数

假设我们要计算方阵$A$的$A^k$,$k$ 是一个未知变量,我们可以这样通过 n_steps 参数来控制循环计算的次数:

In [11]:

A = T.matrix("A")
k = T.iscalar("k")

results, _ = theano.scan(fn = lambda P, A: P.dot(A),
                         # 初始值设为单位矩阵
                         outputs_info = T.eye(A.shape[0]),
                         # 乘 k 次
                         non_sequences = A,
                         n_steps = k)

A_k = theano.function(inputs = [A, k], outputs = results[-1])

test_a = floatX([[2, -2], [-1, 2]])

print A_k(test_a, 10)

# 使用 numpy 进行验证
a_k = np.eye(2)
for i in range(10):
    a_k = a_k.dot(test_a)

print a_k
[[ 107616\. -152192.]
 [ -76096\.  107616.]]
[[ 107616\. -152192.]
 [ -76096\.  107616.]]

使用共享变量

可以在 scan 中使用并更新共享变量,例如,利用共享变量 n,我们可以实现这样一个迭代 k 步的简单计数器:

In [12]:

n = theano.shared(floatX(0))
k = T.iscalar("k")

# 这里 lambda 的返回值是一个 dict,因此这个值会被传入 updates 中
_, updates = theano.scan(fn = lambda n: {n:n+1},
                         non_sequences = n,
                         n_steps = k)

counter = theano.function(inputs = [k], 
                          outputs = [],
                          updates = updates)

print n.get_value()
counter(10)
print n.get_value()
counter(10)
print n.get_value()
0.0
10.0
20.0

之前说到,fn 函数的返回值应该是 (outputs_list, update_dictionary) 或者 (update_dictionary, outputs_list) 或者两者之一。

这里 fn 函数返回的是一个字典,因此自动被放入了 update_dictionary 中,然后传入 functionupdates 参数中进行迭代。

使用条件语句结束循环

我们可以将 scan 设计为 loop-until 的模式,具体方法是在 scan 中,将 fn 的返回值增加一个参数,使用 theano.scan_module 来设置停止条件。

假设我们要计算所有不小于某个值的 2 的幂,我们可以这样定义:

In [13]:

max_value = T.scalar()

results, _ = theano.scan(fn = lambda v_pre, max_v: (v_pre * 2, theano.scan_module.until(v_pre * 2 > max_v)), 
                         outputs_info = T.constant(1.),
                         non_sequences = max_value,
                         n_steps = 1000)

# 注意,这里不能取 results 的全部
# 例如在输入值为 40 时,最后的输出可以看成 (64, False)
# scan 发现停止条件满足,停止循环,但是不影响 64 被输出到 results 中,因此要将 64 去掉
power_of_2 = theano.function(inputs = [max_value], outputs = results[:-1])

print power_of_2(40)
[  2\.   4\.   8\.  16\.  32.]

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文