使用 BigQuery 和 TensorFlow 进行需求预测
在本 notebook 中,我们将开发一个机器学习模型,用于预测对纽约的出租汽车需求。
要开发该模型,我们将需要获得出租汽车使用的历史数据。该数据存在于 BigQuery 中。让我们通过查看模式开始吧。
In [25]:
import gcp.bigquery as bq
import pandas as pd
import numpy as np
In [26]:
%%bigquery schema --table "nyc-tlc:green.trips_2015"
分析出租车请求
让我们拉取在 2015 年数据集中的每天出游数我们可以使用 BigQuery 的内建 Date-and-time 函数 DAYOFYEAR (见 BigQuery 查询参考 以获得这样的函数的完整列表)。
In [27]:
%%sql
SELECT DAYOFYEAR(pickup_datetime) AS daynumber FROM [nyc-tlc:green.trips_2015] LIMIT 10
Out[27]:
daynumber
120
120
120
120
149
150
150
150
89
89
(rows: 10, time: 1.6s, cached, job: job_awmH7VUQHbnwGhY2Tsa3ElMwsvk)
模块化查询和 Pandas dataframe
让我们使用出游数的总数作为出租车请求的代理(其他合理的替代方案是总 trip_distance 或者总 fare_amount)。可以使用 Tensorflow 预测多个变量,但是为了简单起见,我们将坚持只预测出游数。
我们将把我们的查询命名为'taxiquery',并让他使用一个输入变量'$YEAR'。然后,可以通过提供一个 YEAR 来调用'taxiquery'。to_dataframe() 将 BigQuery 结果转换成一个 Pandas dataframe。
In [28]:
%%sql --module taxiquery
SELECT daynumber, COUNT(*) AS numtrips FROM
(SELECT DAYOFYEAR(pickup_datetime) AS daynumber FROM [nyc-tlc:green.trips_$YEAR])
GROUP BY daynumber ORDER BY daynumber
In [29]:
trips = bq.Query(taxiquery, YEAR=2015).to_dataframe()
trips[:5]
Out[29]:
daynumber | numtrips | |
---|---|---|
0 | 1 | 62943 |
1 | 2 | 43410 |
2 | 3 | 53866 |
3 | 4 | 41602 |
4 | 5 | 41923 |
基准
通常情况下,某事物的合理估计是其历史平均水平。因此,我们可以将我们的机器学习模型与历史平均水平进行比较。
In [30]:
avg = np.mean(trips['numtrips'])
print 'Just using average={0} has RMSE of {1}'.format(avg, np.sqrt(np.mean((trips['numtrips'] - avg)**2)))
Just using average=54674.0994475 has RMSE of 10163.4654442
这里的平均值约为 55,000,而在这种情况下的根均方误差(RMSE) 约为 10,000。换句话说,如果我们估计在任意一天中都有 55,000 次出租车出游,那么在任一方向将平均有约为 10,000 的误差。
让我们看看是否可以做的比这更好 —— 我们的目的是进行出租车请求预测,使得 RMSE 少于 10,000。
什么样的因素影响了人们对出租车的使用?
天气数据
我们怀疑天气影响人们使用出租车的频率。也许在极冷或阴雨的情况下,那些通常走路去上班会乘坐出租车。
Google 用户 Felipe Hoffa 已经公开他从美国国家海洋和大气管理局做出的气象观测到 BigQuery 中。让我们使用该数据集,并找到对应纽约 La Guardia 机场的站号。
In [31]:
%%sql
SELECT * FROM [fh-bigquery:weather_gsod.stations]
WHERE state = 'NY' AND wban != '99999' AND name contains 'LA GUARDIA'
Out[31]:
usaf | wban | name | country | fips | state | call | lat | lon | elev | begin | end |
---|---|---|---|---|---|---|---|---|---|---|---|
725030 | 14732 | NEW YORK/LA GUARDIA | US | US | NY | KLGA | 40779.0 | -73880.0 | 94 | ||
19350222 | 20120619 |
(rows: 1, time: 0.9s, cached, job: job_UpFXBnE1mCNSKcjG9VzAGXRsuZc)
变量
让我们拉取 La Guardia 机场的最低和最高气温(华氏温度),以及降雨量(英寸)。
In [32]:
%%sql --module wxquery
SELECT DAYOFYEAR(TIMESTAMP('$YEAR'+mo+da)) daynumber,
FIRST(DAYOFWEEK(TIMESTAMP('$YEAR'+mo+da))) dayofweek,
MIN(min) mintemp, MAX(max) maxtemp, MAX(IF(prcp=99.99,0,prcp)) rain
FROM [fh-bigquery:weather_gsod.gsod$YEAR]
WHERE stn='725030' GROUP BY 1 ORDER BY daynumber DESC
In [33]:
weather = bq.Query(wxquery, YEAR=2015).to_dataframe()
weather[:5]
Out[33]:
daynumber | dayofweek | mintemp | maxtemp | rain | |
---|---|---|---|---|---|
0 | 365 | 5 | 46.0 | 48.2 | 0.17 |
1 | 364 | 4 | 34.0 | 48.0 | 0.13 |
2 | 363 | 3 | 33.8 | 46.9 | 0.37 |
3 | 362 | 2 | 39.0 | 62.1 | 0.02 |
4 | 361 | 1 | 46.0 | 62.6 | 0.14 |
合并数据集
使用 Pandas,按天合并出租车和气象数据集。
In [34]:
data = pd.merge(weather, trips, on='daynumber')
data[:5]
Out[34]:
daynumber | dayofweek | mintemp | maxtemp | rain | numtrips | |
---|---|---|---|---|---|---|
0 | 181 | 3 | 64.0 | 82.0 | 0.00 | 42978 |
1 | 180 | 2 | 62.1 | 78.1 | 0.26 | 41397 |
2 | 179 | 1 | 60.1 | 73.9 | 1.32 | 54632 |
3 | 178 | 7 | 62.1 | 80.1 | 0.00 | 74883 |
4 | 177 | 6 | 69.1 | 84.9 | 0.00 | 58371 |
探索性分析
在最高气温和出游数之间是否存在关系呢?
In [35]:
j = data.plot(kind='scatter', x='maxtemp', y='numtrips')
上面的散点图看上去并不十分乐观。
一周内的某天与出游数是否存在关系呢?
In [36]:
j = data.plot(kind='scatter', x='dayofweek', y='numtrips')
哇,我们似乎已经找到了一个预测点。看起来,一周中越往后使用出租车越频繁。也许纽约人每周做出决定要多走路,再后来逐渐失去决心,或者它反映了纽约市旅游动态。
也许如果我们拿出一周内天的混杂影响,最高气温将会开始发挥效果。让我们看看是否是这样:
In [37]:
j = data[data['dayofweek'] == 7].plot(kind='scatter', x='maxtemp', y='numtrips')
移除混杂因素似乎反映了关于温度的基本趋势。但是……你不觉得数据有点稀疏吗?有些事情你必须铭记在心 —— 你开始考虑越多的预测点(这里我们使用两个:一周中的天以及最高气温),则需要越多的行,从而避免过度拟合模型。
添加 2014 年数据
让我们添加 2014 年的数据到 Pandas dataframe 中。注意,对我们来说,对 YEAR 模块化查询是多么有用。
In [38]:
trips = bq.Query(taxiquery, YEAR=2014).to_dataframe()
weather = bq.Query(wxquery, YEAR=2014).to_dataframe()
data2014 = pd.merge(weather, trips, on='daynumber')
data2014[:5]
Out[38]:
daynumber | dayofweek | mintemp | maxtemp | rain | numtrips | |
---|---|---|---|---|---|---|
0 | 365 | 4 | 27.0 | 35.1 | 0.0 | 67518 |
1 | 364 | 3 | 28.9 | 44.1 | 0.0 | 43173 |
2 | 363 | 2 | 37.9 | 52.0 | 0.1 | 37979 |
3 | 362 | 1 | 42.1 | 55.0 | 0.0 | 43055 |
4 | 361 | 7 | 39.9 | 55.0 | 0.0 | 46961 |
In [39]:
data2 = pd.concat([data, data2014])
data2.describe()
Out[39]:
daynumber | dayofweek | mintemp | maxtemp | rain | numtrips | |
---|---|---|---|---|---|---|
count | 546.000000 | 546.000000 | 546.000000 | 546.000000 | 546.000000 |
546.000000
mean | 152.501832 | 4.000000 | 43.511355 | 61.806410 | 0.130568 | 47130.009158
std | 101.099356 | 2.001834 | 18.598263 | 19.169493 | 0.360620 | 11613.865885
min | 1.000000 | 1.000000 | 3.000000 | 21.000000 | 0.000000 | 13436.000000
25% | 69.000000 | 2.000000 | 28.225000 | 44.100000 | 0.000000 | 39150.250000
50% | 137.000000 | 4.000000 | 44.100000 | 64.000000 | 0.000000 | 46720.500000
75% | 228.750000 | 6.000000 | 61.000000 | 79.000000 | 0.070000 | 53855.500000
max | 365.000000 | 7.000000 | 75.900000 | 93.000000 | 4.880000 | 81574.000000
In [40]:
j = data2[data2['dayofweek'] == 7].plot(kind='scatter', x='maxtemp', y='numtrips')
这些数据似乎更稳健一点。如果我们有更多的数据,当然会更好。但是,在这种情况下,我们只有 2014 年和 2015 年的数据,所以这就是我们将处理的。
使用 Tensorflow 进行机器学习
我们将使用训练数据集的 80%和 20%的数据对我们已经训练的模型进行测试。让我们打乱 Pandas dataframe 的行,以使划分是随机的。预测(或输入)列将是数据库中的每一列,而不是出游数(这是我们的目标,或者我们所期望的预测)。
我们将使用的机器学习模型 —— 线性回归和神经网络 —— 都要求输入变量在本质上是数字。
然而,一周中的天是一个分类变量(例如,周二并不是真的大于周一)。因此,我们应该为它是否是周一(值为 0 或 1)、周二等创建单独的列。
在这种情况下,我们确实有有限的数据(记住:你使用作为输入特性的列越多,在训练数据集中需要的行越多),并且对于一周的天,似乎有清晰的线性趋势。所以这里,为了简单起见,我们将选择它,并使用数据原样。如果你对这种简化的影响感到好奇,那么尝试取消为了对一周的天创建单独的列的代码的注释,然后重新运行 notebook。
In [41]:
import tensorflow as tf
shuffled = data2.sample(frac=1)
# It would be a good idea, if we had more data, to treat the days as categorical variables
# with the small amount of data, we have though, the model tends to overfit
#predictors = shuffled.iloc[:,2:5]
#for day in xrange(1,8):
# matching = shuffled['dayofweek'] == day
# predictors.loc[matching, 'day_' + str(day)] = 1
# predictors.loc[~matching, 'day_' + str(day)] = 0
predictors = shuffled.iloc[:,1:5]
predictors[:5]
Out[41]:
dayofweek | mintemp | maxtemp | rain | |
---|---|---|---|---|
118 | 5 | 71.1 | 86.0 | 0.00 |
15 | 2 | 62.6 | 79.0 | 0.79 |
360 | 1 | 26.1 | 35.6 | 0.00 |
114 | 2 | 64.9 | 80.1 | 0.00 |
354 | 7 | 27.0 | 55.0 | 0.22 |
In [42]:
shuffled[:5]
Out[42]:
daynumber | dayofweek | mintemp | maxtemp | rain | numtrips | |
---|---|---|---|---|---|---|
118 | 247 | 5 | 71.1 | 86.0 | 0.00 | 43408 |
15 | 166 | 2 | 62.6 | 79.0 | 0.79 | 46807 |
360 | 5 | 1 | 26.1 | 35.6 | 0.00 | 20560 |
114 | 251 | 2 | 64.9 | 80.1 | 0.00 | 37058 |
354 | 11 | 7 | 27.0 | 55.0 | 0.22 | 29799 |
In [43]:
targets = shuffled.iloc[:,5]
targets[:5]
Out[43]:
118 43408
15 46807
360 20560
114 37058
354 29799
Name: numtrips, dtype: int64
让我们基于 80-20 划分和更大的数据集来更新我们的基准。
In [44]:
trainsize = int(len(shuffled['numtrips']) * 0.8)
avg = np.mean(shuffled['numtrips'][:trainsize])
rmse = np.sqrt(np.mean((shuffled['numtrips'][trainsize:] - avg)**2))
print 'Just using average={0} has RMSE of {1}'.format(avg, rmse)
Just using average=47147.5206422 has RMSE of 12732.1343487
使用 Tensorflow 进行线性回归
这里的关键语句是我们正在创建的模型:
model = (tf.matmul(feature_data, weights) + biases) * SCALE_NUM_TRIPS
这是一种线性模型和我们最小化的误差测量:
cost = tf.nn.l2_loss(model - target_data)
这是方误差(所以我们正在做的是最小二乘线性回归)。
剩下的部分就相当样板式了。缩放使得优化好得多(当权重是小数字时,更容易收敛,也就是说,不大可能放大)。我们保存权重到/tmp/trained_model,并且早测试数据集上展示均方根误差。
In [45]:
SCALE_NUM_TRIPS = 100000
trainsize = int(len(shuffled['numtrips']) * 0.8)
testsize = len(shuffled['numtrips']) - trainsize
npredictors = len(predictors.columns)
noutputs = 1
numiter = 10000
modelfile = '/tmp/trained_model'
with tf.Session() as sess:
feature_data = tf.placeholder("float", [None, npredictors])
target_data = tf.placeholder("float", [None, noutputs])
weights = tf.Variable(tf.truncated_normal([npredictors, noutputs], stddev=0.01))
biases = tf.Variable(tf.ones([noutputs]))
model = (tf.matmul(feature_data, weights) + biases) * SCALE_NUM_TRIPS
cost = tf.nn.l2_loss(model - target_data)
training_step = tf.train.AdamOptimizer(learning_rate=0.0001).minimize(cost)
init = tf.initialize_all_variables()
sess.run(init)
saver = tf.train.Saver({'weights' : weights, 'biases' : biases})
for iter in xrange(0, numiter):
sess.run(training_step, feed_dict = {
feature_data : predictors[:trainsize].values,
target_data : targets[:trainsize].values.reshape(trainsize, noutputs)
})
if iter%1000 == 0:
print '{0} error={1}'.format(iter, np.sqrt(cost.eval(feed_dict = {
feature_data : predictors[:trainsize].values,
target_data : targets[:trainsize].values.reshape(trainsize, noutputs)
}) / trainsize))
filename = saver.save(sess, modelfile, global_step=numiter)
print 'Model written to {0}'.format(filename)
print 'testerror={0}'.format(np.sqrt(cost.eval(feed_dict = {
feature_data : predictors[trainsize:].values,
target_data : targets[trainsize:].values.reshape(testsize, noutputs)
}) / testsize))
0 error=37226.769573
1000 error=11397.6734661
2000 error=10759.6089285
3000 error=10037.6474454
4000 error=9306.57428869
5000 error=8646.24931995
6000 error=8103.11729799
7000 error=7699.99690218
8000 error=7440.12137614
9000 error=7305.52026543
Model written to /tmp/trained_model-10000
testerror=8386.6236006
大约 8400 的测试误差表明,较之只是用历史平均值(基准),使用机器学习模型会得到更好的结果。
使用 Tensorflow 的神经网络
要从线性回归变成神经网络回归,修改模型以拥有更多层次,并且插入线性组合的功能性转变(这里,我们进行热鲁(relu),但我们也可以进行正切(tanh) 或者 sigmoid):
model = (tf.matmul(tf.nn.relu(tf.matmul(feature_data, weights1) + biases1), weights2) + biases2) * SCALE_NUM_TRIPS
还有跟多的权重,需要保存和检索,但是样板代码几乎跟线性回归的例子相同。
In [46]:
SCALE_NUM_TRIPS = 100000
trainsize = int(len(shuffled['numtrips']) * 0.8)
testsize = len(shuffled['numtrips']) - trainsize
npredictors = len(predictors.columns)
noutputs = 1
nhidden = 5
numiter = 10000
modelfile = '/tmp/trained_model'
with tf.Session() as sess:
feature_data = tf.placeholder("float", [None, npredictors])
target_data = tf.placeholder("float", [None, noutputs])
weights1 = tf.Variable(tf.truncated_normal([npredictors, nhidden], stddev=0.01))
weights2 = tf.Variable(tf.truncated_normal([nhidden, noutputs], stddev=0.01))
biases1 = tf.Variable(tf.ones([nhidden]))
biases2 = tf.Variable(tf.ones([noutputs]))
model = (tf.matmul(tf.nn.relu(tf.matmul(feature_data, weights1) + biases1), weights2) + biases2) * SCALE_NUM_TRIPS
cost = tf.nn.l2_loss(model - target_data)
training_step = tf.train.AdamOptimizer(learning_rate=0.0001).minimize(cost)
init = tf.initialize_all_variables()
sess.run(init)
saver = tf.train.Saver({'weights1' : weights1, 'biases1' : biases1, 'weights2' : weights2, 'biases2' : biases2})
for iter in xrange(0, numiter):
sess.run(training_step, feed_dict = {
feature_data : predictors[:trainsize].values,
target_data : targets[:trainsize].values.reshape(trainsize, noutputs)
})
if iter%1000 == 0:
print '{0} error={1}'.format(iter, np.sqrt(cost.eval(feed_dict = {
feature_data : predictors[:trainsize].values,
target_data : targets[:trainsize].values.reshape(trainsize, noutputs)
}) / trainsize))
filename = saver.save(sess, modelfile, global_step=numiter)
print 'Model written to {0}'.format(filename)
print 'testerror={0}'.format(np.sqrt(cost.eval(feed_dict = {
feature_data : predictors[trainsize:].values,
target_data : targets[trainsize:].values.reshape(testsize, noutputs)
}) / testsize))
0 error=38225.6868073
1000 error=10581.3908453
2000 error=8598.12454908
3000 error=7775.54995489
4000 error=7289.19077071
5000 error=7166.95208592
6000 error=7115.87436393
7000 error=7073.102492
8000 error=7059.94672122
9000 error=7048.97842816
Model written to /tmp/trained_model-10000
testerror=8181.71200578
使用神经网络似乎让我们在性能上有了额外的提升(当我运行它时,是从 8400 到 8200 —— 由于随机种子,你实际的结果可能有所不同)。
运行一个训练模型
所以,我们训练了一个模型,将其保存到一个文件中。给定三天的预期天气,让我们使用这个模型来预测出租车需求。
这里,根据那些输入,我们创建了一个 Dataframe,加载保存的模型(注意,我们必须知道模型方程 —— 它并不保存在模型文件中),并用它来预测出租车需求。
In [47]:
input = pd.DataFrame.from_dict(data =
{'dayofweek' : [4, 5, 6],
'mintemp' : [50, 36, 54],
'maxtemp' : [80, 40, 75],
'rain' : [0, 0.8, 0.3]})
with tf.Session() as sess:
filename = modelfile + '-' + str(numiter)
saver = tf.train.Saver({'weights1' : weights1, 'biases1' : biases1, 'weights2' : weights2, 'biases2' : biases2})
saver.restore(sess, filename)
feature_data = tf.placeholder("float", [None, npredictors])
predict_operation = (tf.matmul(tf.nn.relu(tf.matmul(feature_data, weights1) + biases1), weights2) + biases2) * SCALE_NUM_TRIPS
predicted = sess.run(predict_operation, feed_dict = {
feature_data : input.values
})
print predicted
[[ 32465.81054688]
[ 49777.13671875]
[ 41207.85546875]]
看起来,我们应该告诉我们的一些出租车司机在周三 (day=4) 放一天假,并且在周四 (day=5) 全体出动。不足为奇 —— 预报显示周四有冷的要死的雨!
注意,周四通常是 慢 日子(周末出现出租车需求高峰),但是机器学习模型告诉我们,由于天气,这个特殊的周四会出现大量的需求。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
上一篇: 使用 JavaScript 将 Blob 转换为文件对象
下一篇: Go 内存分配详解
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论