Joblib软件包 - 为什么Parallel()给出 *不正确 *输出?更改n_jobs = 1 to n_jobs = 3更改输出
我强调了“不正确”,因为问题不是并行()与我的代码一样慢或速度运行较慢,而是输出结果是不同的。
我正在Google Colab工作,并且正在尝试并行化一个前循环(具体来说,我试图并行化嵌套交叉验证环的外环,如果有帮助的话)。
基本上,该代码正在运行一个外面的前线,该图应沿数据表移动一个窗口,在该表中,模型经过不同的参数(内部循环),确定的最佳参数,然后对所有模型进行了训练,然后对所有模型进行了训练。在“数据并在数据的未触及倍数上进行了测试。
这大约是我的代码:
from joblib import Parallel, delayed
def NestedCV(outer):
traindataforthisiteration = data[0:(1000+outer*500),:]
testdataforthisiteration = data[(1000+outer*500):(1000+(outer+1)*500),:]
bestresult = 0
bestparam = 0
paramoptions = [1,2,3]
for i in paramoptions:
# Testing every parameter option, recording result, updating "bestparam" and "bestresult"
# if better result is encountered on each iteration i
innertrain = traindataforthisiteration[0:(1000+outer*500)/2,:]
innertest = traindataforthisiteration[(1000+outer*500)/2:(1000+outer*500),:]
innermodel = model(i).fit(innertrain)
# Next line runs trained model on innertest, obtaining some performance metric as output
performance = accuracy(model.predict(innertest))
if performance > bestresult:
bestresult = performance
bestparam = i
finalmodel = model(bestparam).fit(traindataforthisiteration)
results = finalmodel.predict(testdataforthisiteration)
return results, bestparam
我想并行化此循环(因此,我不会顺序运行3个外折,而是同时运行它们,因为它们彼此独立,因为每个折叠都选择了参数单独)。
这是我所做的并行化:
inputs = range(3)
results = Parallel(n_jobs=3)(delayed(NestedCV)(i) for i in inputs)
现在,这就是我不了解的。当我运行此代码时,与依次或任何其他方式运行代码时相比,输出是不同的(可能是错误的)。以下是我尝试运行代码的方法:
- 从“ def nestedcv”内部复制并粘贴代码,而直接运行它,而无需函数。只需设置“ outer = 0”并运行代码。
- 运行“ NestedCV(0)”,“ NestedCV(1)”和“ NestedCV(2)”单独
- 运行“ Inputs = range = range(3)”,然后“结果= parallel( n_jobs = 1 )(在输入中为i的延迟(nestedcv)(i))。注意,我更改了n_jobs = 1,所以大概它不再是并行运行了吗?
以上所有的结果给出了相同的结果,并且结果与我运行“结果=并行(n_jobs = 3)(delayed(nestedcv)(i)为i在输入中)的结果不同。
我非常感谢您对这里发生的事情以及我的错误是任何见解。
编辑:我不是Google COLAB的经常用户,而且我通常不会并行运行它(我从字面上抓住了(n_jobs = 3)(延迟(...来自博客的代码)。也就是说,也就是说。这个错误可能是我的超级基础,但我无法弄清楚它可能是什么。 不是。
(n)“在随机订单中应该给出不一致的结果,对吗?但是它 多个输出。
如果有帮助,这是我的实际代码(仍然有点简化)
def NestedCV(outer):
lrparams = [0.0003, 0.0001]
batchsizes = [256]
gammaoptions = [0.9]
starttimestep = 250000
increment = 100000
# Remember that it's in range(inctimes), where timesteps=starttimestep when inctime==0
inctimes = 1
outertrainltc = ltctrain.iloc[0:(outercvstart+outer*outerfoldsize),:]
outertrainpriceltc = ltcpricestrain[0:(outercvstart+outer*outerfoldsize)]
outertest = train.iloc[((outercvstart+outer*outerfoldsize)+1):(outercvstart+(outer+1)*outerfoldsize),:]
outertestprice = pricestrain[((outercvstart+outer*outerfoldsize)+1):(outercvstart+(outer+1)*outerfoldsize)]
bestlr = 0
bestf = 0
bestr = 0
bestg = 0
bestrew = -np.inf
for lr in lrparams:
for r in batchsizes:
for g in gammaoptions:
innerres1 = []
innerres2 = []
for inner in range(2):
lastind = (len(outertrain.index)-3*innerfoldsize+inner*innerfoldsize)
# Train first stage (rule learning stage) of model
env = StockPredEnv(df1=outertrainltc.iloc[0:lastind,:], priceseries1=list(outertrainpriceltc[0:lastind]),
episodelen= 2225,
totaltimesteps=starttimestep)
env = ActionMasker(env, mask_fn)
model = MaskablePPO(MaskableActorCriticPolicy, env, verbose=0, learning_rate=lr, gamma=g, batch_size=r, seed=10)
for rep in range(inctimes):
if rep==0:
model.learn(starttimestep)
else:
model.learn(increment)
# Testing out on test data
# Changing env to be based on test data
env = StockPredEnv(df1=outertrainltc.iloc[lastind:(lastind+innerfoldsize),:],
priceseries1=list(outertrainpriceltc[lastind:(lastind+innerfoldsize)])
episodelen= len(list(outertrainprice[lastind:(lastind+innerfoldsize)]))-1,
totaltimesteps=starttimestep+rep*increment)
env = ActionMasker(env, mask_fn)
obs = env.reset().astype('float')
done = False
score = 0
while not done:
action_masks = get_action_masks(env)
action, _ = model.predict(obs, action_masks=action_masks, deterministic=True)
obs, reward, done, info = env.step(action[0])
score += reward
if inner==0:
innerres1.append(score)
else:
innerres2.append(score)
# Calculating mean score for each timestep
meanscores = []
for m in range(len(innerres1)):
meanscores.append(np.mean([innerres1[m], innerres2[m]]))
# Calculating best result and corresponding timesteps
topmean = -np.inf
toptimesteps = 0
for m in range(len(meanscores)):
if meanscores[m] > topmean:
topmean = meanscores[m]
toptimesteps = starttimestep + m*increment
# Checking result vs. previous results, recording if hyperparams are better
if topmean > bestrew:
bestrew = topmean
bestlr = lr
bestf = toptimesteps
bestr = r
bestg = g
# Recording best params
bestparams = (bestlr, bestf, bestr, bestg)
cvbestrew = bestrew
# Training model on all innner data to run on outer test fold
# Train first stage (rule learning stage) of model
env = StockPredEnv(df1=outertrainltc,
priceseries1=list(outertrainpriceltc),
episodelen= 2225,
totaltimesteps=bestf)
env = ActionMasker(env, mask_fn)
model = MaskablePPO(MaskableActorCriticPolicy, env, verbose=0,
learning_rate=bestlr, gamma=bestg, batch_size=bestr, seed=10)
model.learn(bestf)
# Testing out on test data
# Changing env to include test data
env = StockPredEnv(df1=outertrainltc,
priceseries1=list(outertrainpriceltc),
episodelen= len(list(outertestprice))-1,
totaltimesteps=len(list(outertestprice)))
env = ActionMasker(env, mask_fn)
lastepisodeactions = []
obs = env.reset().astype('float')
done = False
score = 0
while not done:
action_masks = get_action_masks(env)
action, _ = model.predict(obs, action_masks=action_masks,
deterministic=True)
obs, reward, done, info = env.step(action[0])
score += reward
lastepisodeactions.append(action)
outerfoldrew = score
tradeactions = lastepisodeactions
return bestparams, outerfoldrew, tradeactions
。完全相同的输入。这表明问题不是“种族条件”,对吗?否则,3个并行的输出都会有所不同吗?这可能是什么意思?
I emphasized "incorrect" because the problem isn't that Parallel() is giving a slower or same-speed run as my code, it's that the output result is different.
I am working in Google Colab, and I am trying to parallelize a for-loop (specifically, I'm trying to parallelize the outer loop of a nested cross-validation loop, if that helps).
Basically, the code is running an outer for-loop which is supposed to shift a window along a data table, where models are trained with different parameters (inner for loop), best parameters determined, and then one model is trained on all the "preceding" data and tested on an untouched fold of the data.
Here is roughly my code:
from joblib import Parallel, delayed
def NestedCV(outer):
traindataforthisiteration = data[0:(1000+outer*500),:]
testdataforthisiteration = data[(1000+outer*500):(1000+(outer+1)*500),:]
bestresult = 0
bestparam = 0
paramoptions = [1,2,3]
for i in paramoptions:
# Testing every parameter option, recording result, updating "bestparam" and "bestresult"
# if better result is encountered on each iteration i
innertrain = traindataforthisiteration[0:(1000+outer*500)/2,:]
innertest = traindataforthisiteration[(1000+outer*500)/2:(1000+outer*500),:]
innermodel = model(i).fit(innertrain)
# Next line runs trained model on innertest, obtaining some performance metric as output
performance = accuracy(model.predict(innertest))
if performance > bestresult:
bestresult = performance
bestparam = i
finalmodel = model(bestparam).fit(traindataforthisiteration)
results = finalmodel.predict(testdataforthisiteration)
return results, bestparam
I wanted to parallelize this for-loop (so, instead of running the 3 outer folds sequentially, I would run them all at the same time since they are independent of one another, because parameters are selected for each fold individually).
Here is what I did to parallelize:
inputs = range(3)
results = Parallel(n_jobs=3)(delayed(NestedCV)(i) for i in inputs)
Now here's what I don't understand. When I run this code, the output is different (and presumably wrong) compared to when I run the code sequentially or ANY other way. Here are the ways I have tried running the code:
- Copy and paste the code from inside the "def NestedCV" and just running it straight, without a function. Just setting "outer=0" and running the code.
- Run "NestedCV(0)", "NestedCV(1)" and "NestedCV(2)" separately
- Run "inputs = range(3)" and then "results = Parallel(n_jobs=1)(delayed(NestedCV)(i) for i in inputs)" . Notice, I changed n_jobs=1, so presumably it's not REALLY being run in parallel anymore?
All of the above gave the same result, and that result was different than when I run "results = Parallel(n_jobs=3)(delayed(NestedCV)(i) for i in inputs)".
I would greatly appreciate any insight as to what is going on here and what my mistake is.
EDIT: I'm not a frequent user of Google Colab, and I don't often run things in parallel (I literally grabbed that Parallel(n_jobs=3)(delayed(... code from a blog). That's to say, the mistake is probably something super basic on my part, but I cannot figure out what it could possibly be. I thought "maybe the loop iterations are somehow dependent on each other, so running in parallel doesn't work" but then running "NestedCV(n)" in random orders should give inconsistent result, right? But it doesn't..
EDIT 2: My function has multiple outputs. Is this relevant? Can I somehow be capturing the output incorrectly? I changed my func to reflect the multiple outputs.
EDIT 3: In case it helps, here is my actual code (still a bit simplified). It's for a Reinforcement Learning agent, coded using Stable-Baselines3.
def NestedCV(outer):
lrparams = [0.0003, 0.0001]
batchsizes = [256]
gammaoptions = [0.9]
starttimestep = 250000
increment = 100000
# Remember that it's in range(inctimes), where timesteps=starttimestep when inctime==0
inctimes = 1
outertrainltc = ltctrain.iloc[0:(outercvstart+outer*outerfoldsize),:]
outertrainpriceltc = ltcpricestrain[0:(outercvstart+outer*outerfoldsize)]
outertest = train.iloc[((outercvstart+outer*outerfoldsize)+1):(outercvstart+(outer+1)*outerfoldsize),:]
outertestprice = pricestrain[((outercvstart+outer*outerfoldsize)+1):(outercvstart+(outer+1)*outerfoldsize)]
bestlr = 0
bestf = 0
bestr = 0
bestg = 0
bestrew = -np.inf
for lr in lrparams:
for r in batchsizes:
for g in gammaoptions:
innerres1 = []
innerres2 = []
for inner in range(2):
lastind = (len(outertrain.index)-3*innerfoldsize+inner*innerfoldsize)
# Train first stage (rule learning stage) of model
env = StockPredEnv(df1=outertrainltc.iloc[0:lastind,:], priceseries1=list(outertrainpriceltc[0:lastind]),
episodelen= 2225,
totaltimesteps=starttimestep)
env = ActionMasker(env, mask_fn)
model = MaskablePPO(MaskableActorCriticPolicy, env, verbose=0, learning_rate=lr, gamma=g, batch_size=r, seed=10)
for rep in range(inctimes):
if rep==0:
model.learn(starttimestep)
else:
model.learn(increment)
# Testing out on test data
# Changing env to be based on test data
env = StockPredEnv(df1=outertrainltc.iloc[lastind:(lastind+innerfoldsize),:],
priceseries1=list(outertrainpriceltc[lastind:(lastind+innerfoldsize)])
episodelen= len(list(outertrainprice[lastind:(lastind+innerfoldsize)]))-1,
totaltimesteps=starttimestep+rep*increment)
env = ActionMasker(env, mask_fn)
obs = env.reset().astype('float')
done = False
score = 0
while not done:
action_masks = get_action_masks(env)
action, _ = model.predict(obs, action_masks=action_masks, deterministic=True)
obs, reward, done, info = env.step(action[0])
score += reward
if inner==0:
innerres1.append(score)
else:
innerres2.append(score)
# Calculating mean score for each timestep
meanscores = []
for m in range(len(innerres1)):
meanscores.append(np.mean([innerres1[m], innerres2[m]]))
# Calculating best result and corresponding timesteps
topmean = -np.inf
toptimesteps = 0
for m in range(len(meanscores)):
if meanscores[m] > topmean:
topmean = meanscores[m]
toptimesteps = starttimestep + m*increment
# Checking result vs. previous results, recording if hyperparams are better
if topmean > bestrew:
bestrew = topmean
bestlr = lr
bestf = toptimesteps
bestr = r
bestg = g
# Recording best params
bestparams = (bestlr, bestf, bestr, bestg)
cvbestrew = bestrew
# Training model on all innner data to run on outer test fold
# Train first stage (rule learning stage) of model
env = StockPredEnv(df1=outertrainltc,
priceseries1=list(outertrainpriceltc),
episodelen= 2225,
totaltimesteps=bestf)
env = ActionMasker(env, mask_fn)
model = MaskablePPO(MaskableActorCriticPolicy, env, verbose=0,
learning_rate=bestlr, gamma=bestg, batch_size=bestr, seed=10)
model.learn(bestf)
# Testing out on test data
# Changing env to include test data
env = StockPredEnv(df1=outertrainltc,
priceseries1=list(outertrainpriceltc),
episodelen= len(list(outertestprice))-1,
totaltimesteps=len(list(outertestprice)))
env = ActionMasker(env, mask_fn)
lastepisodeactions = []
obs = env.reset().astype('float')
done = False
score = 0
while not done:
action_masks = get_action_masks(env)
action, _ = model.predict(obs, action_masks=action_masks,
deterministic=True)
obs, reward, done, info = env.step(action[0])
score += reward
lastepisodeactions.append(action)
outerfoldrew = score
tradeactions = lastepisodeactions
return bestparams, outerfoldrew, tradeactions
EDIT 4: I tried running 3 instances of the function in parallel with the exact same input. And interestingly, the outputs were all exactly the same**, but still different than when I run it sequentially**. This suggests that the problem isn't a "race condition", right? Otherwise the 3 parallelized outputs would all be different? What could this mean?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论