使用 BigQuery 和 TensorFlow 进行需求预测

发布于 2024-09-14 01:41:09 字数 19409 浏览 21 评论 0

在本 notebook 中,我们将开发一个机器学习模型,用于预测对纽约的出租汽车需求。

要开发该模型,我们将需要获得出租汽车使用的历史数据。该数据存在于 BigQuery 中。让我们通过查看模式开始吧。

In [25]:

    import gcp.bigquery as bq
    import pandas as pd
    import numpy as np

In [26]:

    %%bigquery schema --table "nyc-tlc:green.trips_2015"

分析出租车请求

让我们拉取在 2015 年数据集中的每天出游数我们可以使用 BigQuery 的内建 Date-and-time 函数 DAYOFYEAR (见 BigQuery 查询参考 以获得这样的函数的完整列表)。

In [27]:

    %%sql
    SELECT DAYOFYEAR(pickup_datetime) AS daynumber FROM [nyc-tlc:green.trips_2015] LIMIT 10

Out[27]:

daynumber

120
120
120
120
149
150
150
150
89
89

(rows: 10, time: 1.6s, cached, job: job_awmH7VUQHbnwGhY2Tsa3ElMwsvk)

模块化查询和 Pandas dataframe

让我们使用出游数的总数作为出租车请求的代理(其他合理的替代方案是总 trip_distance 或者总 fare_amount)。可以使用 Tensorflow 预测多个变量,但是为了简单起见,我们将坚持只预测出游数。

我们将把我们的查询命名为'taxiquery',并让他使用一个输入变量'$YEAR'。然后,可以通过提供一个 YEAR 来调用'taxiquery'。to_dataframe() 将 BigQuery 结果转换成一个 Pandas dataframe。

In [28]:

    %%sql --module taxiquery
    SELECT daynumber, COUNT(*) AS numtrips FROM
        (SELECT DAYOFYEAR(pickup_datetime) AS daynumber FROM [nyc-tlc:green.trips_$YEAR])
    GROUP BY daynumber ORDER BY daynumber

In [29]:

    trips = bq.Query(taxiquery, YEAR=2015).to_dataframe()
    trips[:5]

Out[29]:

 daynumbernumtrips
0162943
1243410
2353866
3441602
4541923

基准

通常情况下,某事物的合理估计是其历史平均水平。因此,我们可以将我们的机器学习模型与历史平均水平进行比较。

In [30]:

    avg = np.mean(trips['numtrips'])
    print 'Just using average={0} has RMSE of {1}'.format(avg, np.sqrt(np.mean((trips['numtrips'] - avg)**2)))
    Just using average=54674.0994475 has RMSE of 10163.4654442

这里的平均值约为 55,000,而在这种情况下的根均方误差(RMSE) 约为 10,000。换句话说,如果我们估计在任意一天中都有 55,000 次出租车出游,那么在任一方向将平均有约为 10,000 的误差。

让我们看看是否可以做的比这更好 —— 我们的目的是进行出租车请求预测,使得 RMSE 少于 10,000。

什么样的因素影响了人们对出租车的使用?

天气数据

我们怀疑天气影响人们使用出租车的频率。也许在极冷或阴雨的情况下,那些通常走路去上班会乘坐出租车。

Google 用户 Felipe Hoffa 已经公开他从美国国家海洋和大气管理局做出的气象观测到 BigQuery 中。让我们使用该数据集,并找到对应纽约 La Guardia 机场的站号。

In [31]:

    %%sql
    SELECT * FROM [fh-bigquery:weather_gsod.stations]
    WHERE state = 'NY' AND wban != '99999' AND name contains 'LA GUARDIA'

Out[31]:

usafwbannamecountryfipsstatecalllatlonelevbeginend
72503014732NEW YORK/LA GUARDIAUSUSNYKLGA40779.0-73880.094 
1935022220120619

(rows: 1, time: 0.9s, cached, job: job_UpFXBnE1mCNSKcjG9VzAGXRsuZc)

变量

让我们拉取 La Guardia 机场的最低和最高气温(华氏温度),以及降雨量(英寸)。

In [32]:

    %%sql --module wxquery
    SELECT DAYOFYEAR(TIMESTAMP('$YEAR'+mo+da)) daynumber,
           FIRST(DAYOFWEEK(TIMESTAMP('$YEAR'+mo+da))) dayofweek,
           MIN(min) mintemp, MAX(max) maxtemp, MAX(IF(prcp=99.99,0,prcp)) rain
    FROM [fh-bigquery:weather_gsod.gsod$YEAR]
    WHERE stn='725030' GROUP BY 1 ORDER BY daynumber DESC

In [33]:

    weather = bq.Query(wxquery, YEAR=2015).to_dataframe()
    weather[:5]

Out[33]:

 daynumberdayofweekmintempmaxtemprain
0365546.048.20.17
1364434.048.00.13
2363333.846.90.37
3362239.062.10.02
4361146.062.60.14

合并数据集

使用 Pandas,按天合并出租车和气象数据集。

In [34]:

    data = pd.merge(weather, trips, on='daynumber')
    data[:5]

Out[34]:

 daynumberdayofweekmintempmaxtemprainnumtrips
0181364.082.00.0042978
1180262.178.10.2641397
2179160.173.91.3254632
3178762.180.10.0074883
4177669.184.90.0058371

探索性分析

在最高气温和出游数之间是否存在关系呢?

In [35]:

    j = data.plot(kind='scatter', x='maxtemp', y='numtrips')

上面的散点图看上去并不十分乐观。

一周内的某天与出游数是否存在关系呢?

In [36]:

    j = data.plot(kind='scatter', x='dayofweek', y='numtrips')

哇,我们似乎已经找到了一个预测点。看起来,一周中越往后使用出租车越频繁。也许纽约人每周做出决定要多走路,再后来逐渐失去决心,或者它反映了纽约市旅游动态。

也许如果我们拿出一周内天的混杂影响,最高气温将会开始发挥效果。让我们看看是否是这样:

In [37]:

    j = data[data['dayofweek'] == 7].plot(kind='scatter', x='maxtemp', y='numtrips')

移除混杂因素似乎反映了关于温度的基本趋势。但是……你不觉得数据有点稀疏吗?有些事情你必须铭记在心 —— 你开始考虑越多的预测点(这里我们使用两个:一周中的天以及最高气温),则需要越多的行,从而避免过度拟合模型。

添加 2014 年数据

让我们添加 2014 年的数据到 Pandas dataframe 中。注意,对我们来说,对 YEAR 模块化查询是多么有用。

In [38]:

    trips = bq.Query(taxiquery, YEAR=2014).to_dataframe()
    weather = bq.Query(wxquery, YEAR=2014).to_dataframe()
    data2014 = pd.merge(weather, trips, on='daynumber')
    data2014[:5]

Out[38]:

 daynumberdayofweekmintempmaxtemprainnumtrips
0365427.035.10.067518
1364328.944.10.043173
2363237.952.00.137979
3362142.155.00.043055
4361739.955.00.046961

In [39]:

    data2 = pd.concat([data, data2014])
    data2.describe()

Out[39]:

 daynumberdayofweekmintempmaxtemprainnumtrips
count546.000000546.000000546.000000546.000000546.000000 
546.000000
mean | 152.501832 | 4.000000 | 43.511355 | 61.806410 | 0.130568 | 47130.009158
std | 101.099356 | 2.001834 | 18.598263 | 19.169493 | 0.360620 | 11613.865885
min | 1.000000 | 1.000000 | 3.000000 | 21.000000 | 0.000000 | 13436.000000
25% | 69.000000 | 2.000000 | 28.225000 | 44.100000 | 0.000000 | 39150.250000
50% | 137.000000 | 4.000000 | 44.100000 | 64.000000 | 0.000000 | 46720.500000
75% | 228.750000 | 6.000000 | 61.000000 | 79.000000 | 0.070000 | 53855.500000
max | 365.000000 | 7.000000 | 75.900000 | 93.000000 | 4.880000 | 81574.000000

In [40]:

    j = data2[data2['dayofweek'] == 7].plot(kind='scatter', x='maxtemp', y='numtrips')

这些数据似乎更稳健一点。如果我们有更多的数据,当然会更好。但是,在这种情况下,我们只有 2014 年和 2015 年的数据,所以这就是我们将处理的。

使用 Tensorflow 进行机器学习

我们将使用训练数据集的 80%和 20%的数据对我们已经训练的模型进行测试。让我们打乱 Pandas dataframe 的行,以使划分是随机的。预测(或输入)列将是数据库中的每一列,而不是出游数(这是我们的目标,或者我们所期望的预测)。

我们将使用的机器学习模型 —— 线性回归和神经网络 —— 都要求输入变量在本质上是数字。

然而,一周中的天是一个分类变量(例如,周二并不是真的大于周一)。因此,我们应该为它是否是周一(值为 0 或 1)、周二等创建单独的列。

在这种情况下,我们确实有有限的数据(记住:你使用作为输入特性的列越多,在训练数据集中需要的行越多),并且对于一周的天,似乎有清晰的线性趋势。所以这里,为了简单起见,我们将选择它,并使用数据原样。如果你对这种简化的影响感到好奇,那么尝试取消为了对一周的天创建单独的列的代码的注释,然后重新运行 notebook。

In [41]:

    import tensorflow as tf
    shuffled = data2.sample(frac=1)
    # It would be a good idea, if we had more data, to treat the days as categorical variables
    # with the small amount of data, we have though, the model tends to overfit
    #predictors = shuffled.iloc[:,2:5]
    #for day in xrange(1,8):
    #  matching = shuffled['dayofweek'] == day
    #  predictors.loc[matching, 'day_' + str(day)] = 1
    #  predictors.loc[~matching, 'day_' + str(day)] = 0
    predictors = shuffled.iloc[:,1:5]
    predictors[:5]

Out[41]:

 dayofweekmintempmaxtemprain
118571.186.00.00
15262.679.00.79
360126.135.60.00
114264.980.10.00
354727.055.00.22

In [42]:

    shuffled[:5]

Out[42]:

 daynumberdayofweekmintempmaxtemprainnumtrips
118247571.186.00.0043408
15166262.679.00.7946807
3605126.135.60.0020560
114251264.980.10.0037058
35411727.055.00.2229799

In [43]:

    targets = shuffled.iloc[:,5]
    targets[:5]

Out[43]:

    118    43408
    15     46807
    360    20560
    114    37058
    354    29799
    Name: numtrips, dtype: int64

让我们基于 80-20 划分和更大的数据集来更新我们的基准。

In [44]:

    trainsize = int(len(shuffled['numtrips']) * 0.8)
    avg = np.mean(shuffled['numtrips'][:trainsize])
    rmse = np.sqrt(np.mean((shuffled['numtrips'][trainsize:] - avg)**2))
    print 'Just using average={0} has RMSE of {1}'.format(avg, rmse)
    Just using average=47147.5206422 has RMSE of 12732.1343487

使用 Tensorflow 进行线性回归

这里的关键语句是我们正在创建的模型:

    model = (tf.matmul(feature_data, weights) + biases) * SCALE_NUM_TRIPS

这是一种线性模型和我们最小化的误差测量:

    cost = tf.nn.l2_loss(model - target_data)

这是方误差(所以我们正在做的是最小二乘线性回归)。

剩下的部分就相当样板式了。缩放使得优化好得多(当权重是小数字时,更容易收敛,也就是说,不大可能放大)。我们保存权重到/tmp/trained_model,并且早测试数据集上展示均方根误差。

In [45]:

    SCALE_NUM_TRIPS = 100000
    trainsize = int(len(shuffled['numtrips']) * 0.8)
    testsize = len(shuffled['numtrips']) - trainsize
    npredictors = len(predictors.columns)
    noutputs = 1
    numiter = 10000
    modelfile = '/tmp/trained_model'
    with tf.Session() as sess:
      feature_data = tf.placeholder("float", [None, npredictors])
      target_data = tf.placeholder("float", [None, noutputs])
      weights = tf.Variable(tf.truncated_normal([npredictors, noutputs], stddev=0.01))
      biases = tf.Variable(tf.ones([noutputs]))
      model = (tf.matmul(feature_data, weights) + biases) * SCALE_NUM_TRIPS
      cost = tf.nn.l2_loss(model - target_data)
      training_step = tf.train.AdamOptimizer(learning_rate=0.0001).minimize(cost)
      init = tf.initialize_all_variables()
      sess.run(init)

      saver = tf.train.Saver({'weights' : weights, 'biases' : biases})
      for iter in xrange(0, numiter):
        sess.run(training_step, feed_dict = {
            feature_data : predictors[:trainsize].values,
            target_data : targets[:trainsize].values.reshape(trainsize, noutputs)
          })

        if iter%1000 == 0:
          print '{0} error={1}'.format(iter, np.sqrt(cost.eval(feed_dict = {
              feature_data : predictors[:trainsize].values,
              target_data : targets[:trainsize].values.reshape(trainsize, noutputs)
          }) / trainsize))

      filename = saver.save(sess, modelfile, global_step=numiter)
      print 'Model written to {0}'.format(filename)

      print 'testerror={0}'.format(np.sqrt(cost.eval(feed_dict = {
              feature_data : predictors[trainsize:].values,
              target_data : targets[trainsize:].values.reshape(testsize, noutputs)
          }) / testsize))
    0 error=37226.769573
    1000 error=11397.6734661
    2000 error=10759.6089285
    3000 error=10037.6474454
    4000 error=9306.57428869
    5000 error=8646.24931995
    6000 error=8103.11729799
    7000 error=7699.99690218
    8000 error=7440.12137614
    9000 error=7305.52026543
    Model written to /tmp/trained_model-10000
    testerror=8386.6236006

大约 8400 的测试误差表明,较之只是用历史平均值(基准),使用机器学习模型会得到更好的结果。

使用 Tensorflow 的神经网络

要从线性回归变成神经网络回归,修改模型以拥有更多层次,并且插入线性组合的功能性转变(这里,我们进行热鲁(relu),但我们也可以进行正切(tanh) 或者 sigmoid):

    model = (tf.matmul(tf.nn.relu(tf.matmul(feature_data, weights1) + biases1), weights2) + biases2) * SCALE_NUM_TRIPS

还有跟多的权重,需要保存和检索,但是样板代码几乎跟线性回归的例子相同。

In [46]:

    SCALE_NUM_TRIPS = 100000
    trainsize = int(len(shuffled['numtrips']) * 0.8)
    testsize = len(shuffled['numtrips']) - trainsize
    npredictors = len(predictors.columns)
    noutputs = 1
    nhidden = 5
    numiter = 10000
    modelfile = '/tmp/trained_model'
    with tf.Session() as sess:
      feature_data = tf.placeholder("float", [None, npredictors])
      target_data = tf.placeholder("float", [None, noutputs])
      weights1 = tf.Variable(tf.truncated_normal([npredictors, nhidden], stddev=0.01))
      weights2 = tf.Variable(tf.truncated_normal([nhidden, noutputs], stddev=0.01))
      biases1 = tf.Variable(tf.ones([nhidden]))
      biases2 = tf.Variable(tf.ones([noutputs]))
      model = (tf.matmul(tf.nn.relu(tf.matmul(feature_data, weights1) + biases1), weights2) + biases2) * SCALE_NUM_TRIPS
      cost = tf.nn.l2_loss(model - target_data)
      training_step = tf.train.AdamOptimizer(learning_rate=0.0001).minimize(cost)
      init = tf.initialize_all_variables()
      sess.run(init)

      saver = tf.train.Saver({'weights1' : weights1, 'biases1' : biases1, 'weights2' : weights2, 'biases2' : biases2})
      for iter in xrange(0, numiter):
        sess.run(training_step, feed_dict = {
            feature_data : predictors[:trainsize].values,
            target_data : targets[:trainsize].values.reshape(trainsize, noutputs)
          })

        if iter%1000 == 0:
          print '{0} error={1}'.format(iter, np.sqrt(cost.eval(feed_dict = {
              feature_data : predictors[:trainsize].values,
              target_data : targets[:trainsize].values.reshape(trainsize, noutputs)
          }) / trainsize))

      filename = saver.save(sess, modelfile, global_step=numiter)
      print 'Model written to {0}'.format(filename)

      print 'testerror={0}'.format(np.sqrt(cost.eval(feed_dict = {
              feature_data : predictors[trainsize:].values,
              target_data : targets[trainsize:].values.reshape(testsize, noutputs)
          }) / testsize))
    0 error=38225.6868073
    1000 error=10581.3908453
    2000 error=8598.12454908
    3000 error=7775.54995489
    4000 error=7289.19077071
    5000 error=7166.95208592
    6000 error=7115.87436393
    7000 error=7073.102492
    8000 error=7059.94672122
    9000 error=7048.97842816
    Model written to /tmp/trained_model-10000
    testerror=8181.71200578

使用神经网络似乎让我们在性能上有了额外的提升(当我运行它时,是从 8400 到 8200 —— 由于随机种子,你实际的结果可能有所不同)。

运行一个训练模型

所以,我们训练了一个模型,将其保存到一个文件中。给定三天的预期天气,让我们使用这个模型来预测出租车需求。

这里,根据那些输入,我们创建了一个 Dataframe,加载保存的模型(注意,我们必须知道模型方程 —— 它并不保存在模型文件中),并用它来预测出租车需求。

In [47]:

    input = pd.DataFrame.from_dict(data = 
                                   {'dayofweek' : [4, 5, 6],
                                    'mintemp' : [50, 36, 54],
                                    'maxtemp' : [80, 40, 75],
                                    'rain' : [0, 0.8, 0.3]})
    with tf.Session() as sess:
        filename = modelfile + '-' + str(numiter)
        saver = tf.train.Saver({'weights1' : weights1, 'biases1' : biases1, 'weights2' : weights2, 'biases2' : biases2})
        saver.restore(sess, filename)
        feature_data = tf.placeholder("float", [None, npredictors])
        predict_operation = (tf.matmul(tf.nn.relu(tf.matmul(feature_data, weights1) + biases1), weights2) + biases2) * SCALE_NUM_TRIPS
        predicted = sess.run(predict_operation, feed_dict = {
            feature_data : input.values
          })
        print predicted
    [[ 32465.81054688]
     [ 49777.13671875]
     [ 41207.85546875]]

看起来,我们应该告诉我们的一些出租车司机在周三 (day=4) 放一天假,并且在周四 (day=5) 全体出动。不足为奇 —— 预报显示周四有冷的要死的雨!

注意,周四通常是 慢 日子(周末出现出租车需求高峰),但是机器学习模型告诉我们,由于天气,这个特殊的周四会出现大量的需求。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据

关于作者

我不在是我

暂无简介

文章
评论
25 人气
更多

推荐作者

梦年海沫深

文章 0 评论 0

liaowenxiong

文章 0 评论 0

丢了幸福的猪

文章 0 评论 0

kaipeng

文章 0 评论 0

微信用户

文章 0 评论 0

独享拥抱

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文