# Theano 循环:scan(详解) In [1]: ```py import theano, time import theano.tensor as T import numpy as np def floatX(X): return np.asarray(X, dtype=theano.config.floatX) ``` ```py Using gpu device 1: Tesla C2075 (CNMeM is disabled) ``` `theano` 中可以使用 `scan` 进行循环,常用的 `map` 和 `reduce` 操作都可以看成是 `scan` 的特例。 `scan` 通常作用在一个序列上,每次处理一个输入,并输出一个结果。 `sum(x)` 函数可以看成是 `z + x(i)` 函数在给定 `z = 0` 的情况下,对 `x` 的一个 `scan`。 通常我们可以将一个 `for` 循环表示成一个 `scan` 操作,其好处如下: * 迭代次数成为符号图结构的一部分 * 最小化 GPU 数据传递 * 序列化梯度计算 * 速度比 `for` 稍微快一些 * 降低内存使用 ## scan 的使用 函数的用法如下: ```py theano.scan(fn, sequences=None, outputs_info=None, non_sequences=None, n_steps=None, truncate_gradient=-1, go_backwards=False, mode=None, name=None, profile=False, allow_gc=None, strict=False) ``` 主要参数的含义: * `fn` * 一步 `scan` 所进行的操作 * `sequences` * 输入的序列 * `outputs_info` * 前一步输出结果的初始状态 * `non_sequences` * 非序列参数 * `n_steps` * 迭代步数 * `go_backwards` * 是否从后向前遍历 输出为一个元组 `(outputs, updates)`: * `outputs` * 从初始状态开始,每一步 `fn` 的输出结果 * `updates` * 一个字典,用来记录 `scan` 过程中用到的共享变量更新规则,构造函数的时候,如果需要更新共享变量,将这个变量当作 `updates` 的参数传入。 ## scan 和 map 这里实现一个简单的 `map` 操作,将向量 $\mathbf x$ 中的所有元素变成原来的两倍: ```py map(lambda t: t * 2, x) ``` In [2]: ```py x = T.vector() results, _ = theano.scan(fn = lambda t: t * 2, sequences = x) x_double_scan = theano.function([x], results) print x_double_scan(range(10)) ``` ```py [ 0\. 2\. 4\. 6\. 8\. 10\. 12\. 14\. 16\. 18.] ``` 之前我们说到,`theano` 中的 `map` 是 `scan` 的一个特例,因此 `theano.map` 的用法其实跟 `theano.scan` 十分类似。 由于不需要考虑前一步的输出结果,所以 `theano.map` 的参数中没有 `outputs_info` 这一部分。 我们用 `theano.map` 实现相同的效果: In [3]: ```py result, _ = theano.map(fn = lambda t: t * 2, sequences = x) x_double_map = theano.function([x], result) print x_double_map(range(10)) ``` ```py [ 0\. 2\. 4\. 6\. 8\. 10\. 12\. 14\. 16\. 18.] ``` ## scan 和 reduce 这里一个简单的 `reduce` 操作,求和: ```py reduce(lambda a, b: a + b, x) ``` In [4]: ```py result, _ = theano.scan(fn = lambda t, v: t + v, sequences = x, outputs_info = floatX(0.)) # 因为每一步的输出值都会被记录到最后的 result 中,所以最后的和是 result 的最后一个元素。 x_sum_scan = theano.function([x], result[-1]) # 计算 1 + 2 + ... + 10 print x_sum_scan(range(10)) ``` ```py 45.0 ``` `theano.reduce` 也是 `scan` 的一个特例,使用 `theano.reduce` 实现相同的效果: In [5]: ```py result, _ = theano.reduce(fn = lambda t, v: t + v, sequences = x, outputs_info = 0.) x_sum_reduce = theano.function([x], result) # 计算 1 + 2 + ... + 10 print x_sum_reduce(range(10)) ``` ```py 45.0 ``` `reduce` 与 `scan` 不同的地方在于,`result` 包含的内容并不是每次输出的结果,而是最后一次输出的结果。 ## scan 的使用 ### 输入与输出 `fn` 是一个函数句柄,对于这个函数句柄,它每一步接受的参数是由 `sequences, outputs_info, non_sequence` 这三个参数所决定的,并且按照以下的顺序排列: * `sequences` 中第一个序列的值 * ... * `sequences` 中最后一个序列的值 * `outputs_info` 中第一个输出之前的值 * ... * `outputs_info` 中最后一个输出之前的值 * `non_squences` 中的参数 这些序列的顺序与在参数 `sequences, outputs_info` 中指定的顺序相同。 默认情况下,在第 `k` 次迭代时,如果 `sequences` 和 `outputs_info` 中给定的值不是字典(`dictionary`)或者一个字典列表(`list of dictionaries`),那么 * `sequences` 中的序列 `seq` 传入 `fn` 的是 `seq[k]` 的值 * `outputs_info` 中的序列 `output` 传入 `fn` 的是 `output[k-1]` 的值 `fn` 的返回值有两部分 `(outputs_list, update_dictionary)`,第一部分将作为序列,传入 `outputs` 中,与 `outputs_info` 中的**初始输入值的维度一致**(如果没有给定 `outputs_info` ,输出值可以任意。) 第二部分则是更新规则的字典,告诉我们如何对 `scan` 中使用到的一些共享的变量进行更新: ```py return [y1_t, y2_t], {x:x+1} ``` 这两部分可以任意,即顺序既可以是 `(outputs_list, update_dictionary)`, 也可以是 `(update_dictionary, outputs_list)`,`theano` 会根据类型自动识别。 两部分只需要有一个存在即可,另一个可以为空。 ### 例子分析 例如,在我们的第一个例子中 ```py theano.scan(fn = lambda t: t * 2, sequences = x) ``` 在第 `k` 次迭代的时候,传入参数 `t` 的值为 `x[k]`。 再如,在我们的第二个例子中: ```py theano.scan(fn = lambda t, v: t + v, sequences = x, outputs_info = floatX(0.)) ``` `fn` 接受了两个参数,初始迭代时,按照规则,`t` 接受的参数为 `x[0]`,`v` 接受的参数为我们传入 `outputs_info` 的第一个初始值即 `0` (认为是 `outputs[-1]`),他们的结果 `t+v` 将作为 `outputs[0]` 的值传入下一次迭代以及最终 `scan` 输出的 `outputs` 值中。 ### 输入多个序列 我们可以一次输入多个序列,这些序列会按照顺序传入 fn 的参数中,例如计算多项式 $$ \sum_{n=0}^N a_n x^ n $$ 时,我们可以将多项式的系数和幂数两个序列放到一个 `list` 中作为输入参数: In [6]: ```py # 变量 x x = T.scalar("x") # 不为 0 的系数 A = T.vectors("A") # 对应的幂数 N = T.ivectors("N") # a 对应的是 A, n 对应 N,v 对应 x components, _ = theano.scan(fn = lambda a, n, v: a * (v ** n), sequences = [A, N], non_sequences = x) result = components.sum() polynomial = theano.function([x, A, N], result) # 计算 1 + 3 * 10 ^ 2 + 2 * 10^3 = 2301 print polynomial(floatX(10), floatX([1, 3, 2]), [0, 2, 3]) ``` ```py 2301.0 ``` ### 使用序列的多个值 默认情况下,我们只能使用输入序列的当前时刻的值,以及前一个输出的输出值。 事实上,`theano` 会将参数中的序列变成一个有 `input` 和 `taps` 两个键值的 `dict`: * `input`:输入的序列 * `taps`:要传入 `fn` 的值的列表 * 对于 `sequences` 参数中的序列来说,默认值为 [0],表示时间 `t` 传入 `t+0` 时刻的序列值,可以为正,可以为负。 * 对于 `outputs_info` 参数中的序列来说,默认值为 [-1],表示时间 `t` 传入 `t-1` 时刻的序列值,只能为负值,如果值为 `None`,表示这个输出结果不会作为参数传入 `fn` 中。 传入 `fn` 的参数也会按照 `taps` 中的顺序来排列,我们考虑下面这个例子: ```py scan(fn, sequences = [ dict(input= Sequence1, taps = [-3,2,-1]) , Sequence2 , dict(input = Sequence3, taps = 3) ] , outputs_info = [ dict(initial = Output1, taps = [-3,-5]) , dict(initial = Output2, taps = None) , Output3 ] , non_sequences = [ Argument1, Argument2]) ``` 首先是 `Sequence1` 的 `[-3, 2, -1]` 被传入,然后 `Sequence2` 不是 `dict`, 所以传入默认值 `[0]`,`Sequence3` 传入的参数是 `3`,所以 `fn` 在第 `t` 步接受的前几个参数是: ```py Sequence1[t-3] Sequence1[t+2] Sequence1[t-1] Sequence2[t] Sequence3[t+3] ``` 然后 `Output1` 传入的是 `[-3, -5]`(**传入的初始值的形状应为 `shape (5,)+`**),`Output2` 不作为参数传入,`Output3` 传入的是 `[-1]`,所以接下的参数是: ```py Output1[t-3] Output1[t-5] Output3[t-1] Argument1 Argument2 ``` 总的说来上面的例子中,`fn` 函数按照以下顺序最多接受这样 10 个参数: ```py Sequence1[t-3] Sequence1[t+2] Sequence1[t-1] Sequence2[t] Sequence3[t+3] Output1[t-3] Output1[t-5] Output3[t-1] Argument1 Argument2 ``` 例子,假设 $x$ 是我们的输入,$y$ 是我们的输出,我们需要计算 $y(t) = tanh\left[W_{1} y(t-1) + W_{2} x(t) + W_{3} x(t-1)\right]$ 的值: In [7]: ```py X = T.matrix("X") Y = T.vector("y") W_1 = T.matrix("W_1") W_2 = T.matrix("W_2") W_3 = T.matrix("W_3") # W_yy 和 W_xy 作为不变的参数可以直接使用 results, _ = theano.scan(fn = lambda x, x_pre, y: T.tanh(T.dot(W_1, y) + T.dot(W_2, x) + T.dot(W_3, x_pre)), # 0 对应 x,-1 对应 x_pre sequences = dict(input=X, taps=[0, -1]), outputs_info = Y) Y_seq = theano.function(inputs = [X, Y, W_1, W_2, W_3], outputs = results) ``` 测试小矩阵计算: In [8]: ```py # 测试 t = 1001 x_dim = 10 y_dim = 20 x = 2 * floatX(np.random.random([t, x_dim])) - 1 y = 2 * floatX(np.zeros(y_dim)) - 1 w_1 = 2 * floatX(np.random.random([y_dim, y_dim])) - 1 w_2 = 2 * floatX(np.random.random([y_dim, x_dim])) - 1 w_3 = 2 * floatX(np.random.random([y_dim, x_dim])) - 1 tic = time.time() y_res_theano = Y_seq(x, y, w_1, w_2, w_3) print "theano running time {:.4f} s".format(time.time() - tic) tic = time.time() # 与 numpy 的结果进行比较: y_res_numpy = np.zeros([t, y_dim]) y_res_numpy[0] = y for i in range(1, t): y_res_numpy[i] = np.tanh(w_1.dot(y_res_numpy[i-1]) + w_2.dot(x[i]) + w_3.dot(x[i-1])) print "numpy running time {:.4f} s".format(time.time() - tic) # 这里要从 1 开始,因为使用了 x(t-1),所以 scan 从第 1 个位置开始计算 print "the max difference of the first 10 results is", np.max(np.abs(y_res_theano[0:10] - y_res_numpy[1:11])) ``` ```py theano running time 0.0537 s numpy running time 0.0197 s the max difference of the first 10 results is 1.25780650354e-06 ``` 测试大矩阵运算: In [9]: ```py # 测试 t = 1001 x_dim = 100 y_dim = 200 x = 2 * floatX(np.random.random([t, x_dim])) - 1 y = 2 * floatX(np.zeros(y_dim)) - 1 w_1 = 2 * floatX(np.random.random([y_dim, y_dim])) - 1 w_2 = 2 * floatX(np.random.random([y_dim, x_dim])) - 1 w_3 = 2 * floatX(np.random.random([y_dim, x_dim])) - 1 tic = time.time() y_res_theano = Y_seq(x, y, w_1, w_2, w_3) print "theano running time {:.4f} s".format(time.time() - tic) tic = time.time() # 与 numpy 的结果进行比较: y_res_numpy = np.zeros([t, y_dim]) y_res_numpy[0] = y for i in range(1, t): y_res_numpy[i] = np.tanh(w_1.dot(y_res_numpy[i-1]) + w_2.dot(x[i]) + w_3.dot(x[i-1])) print "numpy running time {:.4f} s".format(time.time() - tic) # 这里要从 1 开始,因为使用了 x(t-1),所以 scan 从第 1 个位置开始计算 print "the max difference of the first 10 results is", np.max(np.abs(y_res_theano[:10] - y_res_numpy[1:11])) ``` ```py theano running time 0.0754 s numpy running time 0.1334 s the max difference of the first 10 results is 0.000656997077348 ``` 值得注意的是,由于 `theano` 和 `numpy` 在某些计算的实现上存在一定的差异,随着序列长度的增加,这些差异将被放大: In [10]: ```py for i in xrange(20): print "iter {:03d}, max diff:{:.6f}".format(i + 1, np.max(np.abs(y_res_numpy[i + 1,:] - y_res_theano[i,:]))) ``` ```py iter 001, max diff:0.000002 iter 002, max diff:0.000005 iter 003, max diff:0.000007 iter 004, max diff:0.000010 iter 005, max diff:0.000024 iter 006, max diff:0.000049 iter 007, max diff:0.000113 iter 008, max diff:0.000145 iter 009, max diff:0.000334 iter 010, max diff:0.000657 iter 011, max diff:0.001195 iter 012, max diff:0.002778 iter 013, max diff:0.004561 iter 014, max diff:0.004748 iter 015, max diff:0.014849 iter 016, max diff:0.012696 iter 017, max diff:0.043639 iter 018, max diff:0.046540 iter 019, max diff:0.083032 iter 020, max diff:0.123678 ``` ### 控制循环次数 假设我们要计算方阵$A$的$A^k$,$k$ 是一个未知变量,我们可以这样通过 `n_steps` 参数来控制循环计算的次数: In [11]: ```py A = T.matrix("A") k = T.iscalar("k") results, _ = theano.scan(fn = lambda P, A: P.dot(A), # 初始值设为单位矩阵 outputs_info = T.eye(A.shape[0]), # 乘 k 次 non_sequences = A, n_steps = k) A_k = theano.function(inputs = [A, k], outputs = results[-1]) test_a = floatX([[2, -2], [-1, 2]]) print A_k(test_a, 10) # 使用 numpy 进行验证 a_k = np.eye(2) for i in range(10): a_k = a_k.dot(test_a) print a_k ``` ```py [[ 107616\. -152192.] [ -76096\. 107616.]] [[ 107616\. -152192.] [ -76096\. 107616.]] ``` ### 使用共享变量 可以在 `scan` 中使用并更新共享变量,例如,利用共享变量 `n`,我们可以实现这样一个迭代 `k` 步的简单计数器: In [12]: ```py n = theano.shared(floatX(0)) k = T.iscalar("k") # 这里 lambda 的返回值是一个 dict,因此这个值会被传入 updates 中 _, updates = theano.scan(fn = lambda n: {n:n+1}, non_sequences = n, n_steps = k) counter = theano.function(inputs = [k], outputs = [], updates = updates) print n.get_value() counter(10) print n.get_value() counter(10) print n.get_value() ``` ```py 0.0 10.0 20.0 ``` 之前说到,`fn` 函数的返回值应该是 `(outputs_list, update_dictionary)` 或者 `(update_dictionary, outputs_list)` 或者两者之一。 这里 `fn` 函数返回的是一个字典,因此自动被放入了 `update_dictionary` 中,然后传入 `function` 的 `updates` 参数中进行迭代。 ### 使用条件语句结束循环 我们可以将 `scan` 设计为 `loop-until` 的模式,具体方法是在 `scan` 中,将 `fn` 的返回值增加一个参数,使用 `theano.scan_module` 来设置停止条件。 假设我们要计算所有不小于某个值的 2 的幂,我们可以这样定义: In [13]: ```py max_value = T.scalar() results, _ = theano.scan(fn = lambda v_pre, max_v: (v_pre * 2, theano.scan_module.until(v_pre * 2 > max_v)), outputs_info = T.constant(1.), non_sequences = max_value, n_steps = 1000) # 注意,这里不能取 results 的全部 # 例如在输入值为 40 时,最后的输出可以看成 (64, False) # scan 发现停止条件满足,停止循环,但是不影响 64 被输出到 results 中,因此要将 64 去掉 power_of_2 = theano.function(inputs = [max_value], outputs = results[:-1]) print power_of_2(40) ``` ```py [ 2\. 4\. 8\. 16\. 32.] ```