RxJs Observable 管道内的区块链回溯逻辑
背景
我使用 NestJS 和 Observable
模式以及 HttpModule
来“观察”并最终转发 JSON-RPC 服务器(在本例中为区块链)返回的值节点。
如果您不熟悉区块链,可以将它们视为一个链表,其中列表中的每个新元素都指向前一个元素,如下所示:
let blockchain = [
[0,{value: 'something', previous: None}],
[1,{value: 'somethingelse', previous: 0}], ...
[N,{value: 'somethingsomething', previous: N-1}]
]
有时区块链系统中可能会发生“分叉”。它看起来像下面的树:
[A] <-- [B]<--- [C]
|
--- [C'] <--- [D] <--- [E] <--- ... and so on
问题
如果我的 NestJS 应用程序在时间 0 获取块 [A]
,在时间 1 获取块 [B]
并在时间 1 获取块 [ C]
在时间 2,但突然在时间 3 我得到了 [E]
,我不会得到块 [D]
和 [C ']
。这意味着我将无法检查这两个缺失块中的值。
逻辑
因为所有块都有一个指向前一个块的指针,所以我确实能够通过简单地从块 [E]
传递指向块 [D]
的指针来检索块 [D]
。类似地,从块 [D]
我随后可以获得块 [C']
。因为 [C']
有一个指向 [B]
的指针,所以我可以成功检索所有丢失的块。
我对 Observables
还很陌生,所以我不完全确定当我以这种方式使用 NestJS HttpModule
时如何递归回溯:
export class BlockchainService {
private url: string;
private top?: number;
constructor(private httpService: HttpService,
private config: ConfigService) {
this.url = this.config.get<string>('my_blockchain_url')
}
getBestBlockHash(): Observable<AxiosResponse<any>> {
return this.httpService.post(this.url, {
"method" : "getbestblockhash"
})
}
getBestBlock(): Observable<AxiosResponse<any>> {
return this.getBestBlockHash().pipe(
mergeMap((hash) => this.getBlock(hash.data.result))
)
}
getBlock(hash: string): Observable<AxiosResponse<any>> {
return this.httpService.post(this.url, {
"method" : "getblock",
"params" : {
"blockhash" : hash
}
})
}
尝试 1
因为 Observable
保存块数据,我无法评估是否回溯,或者在没有订阅
它或管道
它的情况下。
使用 Mrk Sef 下面的建议和 iif()
运算符,似乎让我更进一步,因为我可以将 getBestBlock()
Observable
作为参数传递到利用 iif()
的 checkBackTrack
函数,如下所示:
checkBackTrack(obs: Observable<AxiosResponse<any>>): Observable<AxiosResponse<any>> {
let diff: number
let previoushash: string
console.log(diff, previoushash)
obs.pipe(tap(block => {
diff = this.top - block.data.result.height
previoushash = block.data.result.previoushash
}))
console.log(diff, previoushash)
const backTrackNeeded = iif(
() => diff > 0,
this.backTrack(diff, previoushash),
obs
)
return backTrackNeeded;
}
其中 backTrakc
函数如下:
backTrack(n: number, previoushash: string): Observable<AxiosResponse<any>> {
return (n < 0) ? EMPTY : this.getBlock(previoushash).pipe(
switchMap(previousBlock => this.backTrack(n-1, previousBlock.data.result.previousblockhash)),
)
}
允许我执行以下操作: this.checkBackTrack(this.getBestBlock())
。
但是,我无法在 checkBackTrack
函数中定义 diff
和 previoushash
...另外,这会带来副作用,这是我不希望的。
Background
I am using NestJS and the Observable<AxiosResponse>
pattern with the HttpModule
to "observe" and eventually forward values returned by a JSON-RPC server, in this case a Blockchain node.
In case Blockchains are unfamiliar, they can be thought of as a linked list, where each new element in the list points to the previous element, something like this:
let blockchain = [
[0,{value: 'something', previous: None}],
[1,{value: 'somethingelse', previous: 0}], ...
[N,{value: 'somethingsomething', previous: N-1}]
]
Sometimes a "fork" can happen in a blockchain system. It would look something like the below tree:
[A] <-- [B]<--- [C]
|
--- [C'] <--- [D] <--- [E] <--- ... and so on
Problem
If my NestJS application gets Block [A]
at time 0, block [B]
at time 1 and block [C]
at time 2, but suddenly, at time 3 I get [E]
, I would not get block [D]
and [C']
. This means that I would be unable to inspect the values
in these two missing blocks.
Logic
Because all blocks have a pointer to a previous block, I do have the ability to retrieve block [D]
by simply passing the pointer to it from block [E]
. Similarly, from block [D]
I could subsequently get block [C']
. Because [C']
has a pointer to [B]
I would have successfully retrieved all missing blocks.
I am quite new to Observables
so I am not entirely sure of how I can backtrack recursively when I use the NestJS HttpModule
in this way:
export class BlockchainService {
private url: string;
private top?: number;
constructor(private httpService: HttpService,
private config: ConfigService) {
this.url = this.config.get<string>('my_blockchain_url')
}
getBestBlockHash(): Observable<AxiosResponse<any>> {
return this.httpService.post(this.url, {
"method" : "getbestblockhash"
})
}
getBestBlock(): Observable<AxiosResponse<any>> {
return this.getBestBlockHash().pipe(
mergeMap((hash) => this.getBlock(hash.data.result))
)
}
getBlock(hash: string): Observable<AxiosResponse<any>> {
return this.httpService.post(this.url, {
"method" : "getblock",
"params" : {
"blockhash" : hash
}
})
}
Attempt 1
Because the Observable
holds the block data, I cannot evaluate whether to backtrack, or not without subscribe
to it, or pipe
it.
Using Mrk Sef's proposal below and the iif()
operator, seems to take me further since I can pass the getBestBlock()
Observable
as a parameter to a checkBackTrack
function leveraging iif()
as follows:
checkBackTrack(obs: Observable<AxiosResponse<any>>): Observable<AxiosResponse<any>> {
let diff: number
let previoushash: string
console.log(diff, previoushash)
obs.pipe(tap(block => {
diff = this.top - block.data.result.height
previoushash = block.data.result.previoushash
}))
console.log(diff, previoushash)
const backTrackNeeded = iif(
() => diff > 0,
this.backTrack(diff, previoushash),
obs
)
return backTrackNeeded;
}
where the backTrakc
function looks like:
backTrack(n: number, previoushash: string): Observable<AxiosResponse<any>> {
return (n < 0) ? EMPTY : this.getBlock(previoushash).pipe(
switchMap(previousBlock => this.backTrack(n-1, previousBlock.data.result.previousblockhash)),
)
}
allows me to do the following: this.checkBackTrack(this.getBestBlock())
.
However, I am unable to define diff
and previoushash
in the checkBackTrack
function... Also, this introduces side effects, which I do not want.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
寻求解决方案
这意味着什么还不是很清楚,但我们可能处于您不确定如何进一步澄清的境地。我不确定这会回答你的问题,但也许它会帮助你进一步澄清。
一些代码
以下是我如何确保将“缺失”值合并到流中的方法。为了简单起见,这意味着是一个配对实现(例如,它假设索引单调递增,否则将会失败)。
正如您所看到的,
sparseMessage$
缺少索引为 2 和 2 的消息。 3. 以下是由denseMessage$
发出的内容:一些注释
这会创建一个新的可观察量,该可观察量会使用缺失的消息来转换原始可观察量。它不会创建单独的可观察量或数组(两者都是可能的,并且与此处所做的没有太大不同)。
这是模拟一种中间件,具有一些针对缺失值的业务逻辑。然而...
这听起来像是您担心源超时或乱序发出。当然,可以在中间件层中临时修复此问题(在旧的遗留系统中,这可能是唯一的方法),但最好尽可能避免这种设计。
此解决方案是一个没有太多额外内容的解决方案(远未达到生产级别代码)。它不进行错误处理。它不能确保索引单调递增等。它并不意味着是一个强大的解决方案。幸运的是,其中许多问题已在文档中得到解决/在 stackoverflow 上的其他问题中得到解答:)
请随意修改/澄清您的问题。如果您有单独的问题或想要完全重新构建此问题,最好提出一个新问题。
更新:我在修订版中看到的更多代码
可能您在询问如何导航异步链接列表。这更有趣,因为每个下一个值都取决于前一个值(您无法索引到链接列表,您必须跟踪链接)。
下面是一个示例,说明如何跟踪异步链表返回 4 个节点,然后以正确的顺序发出它们。
输出:
您可以使用所有常见的 RxJS 嫌疑人来创建/将其与其他流合并(我想您将在其中获取一个可以回溯的起始节点)。
在上一节中,我使用了
range
、concatMap
、&endWith
但在这里您可以将其替换为backtrackLinks
这会将所有逻辑嵌入到一个函数中,因为backtrackLinks(value, 0)
等于of (值)
。Toward a solution
What this means isn't very clear, but we might be in a position where you're not sure how to clarify further. I'm not sure this will answer your question, but perhaps it will help you clarify further.
Some code
Here is how I might ensure "missing" values are merged into a stream. This is meant to be a paired down implementation (for example, it assumes index is monotonically increasing and will fail otherwise) for the sake of simplicity.
As you can see,
sparseMessage$
is missing a message with index 2 & 3. Below is what is emitted bydenseMessage$
:Some Notes
This creates a new observable that transforms the original observable with the missing messages. It doesn't create a separate observable or array (both are possible, and not too dissimilar to what's done here).
This is emulating a sort of middle-ware with some business logic for missing values. And yet ...
this makes it sound like you're worried about the source timing out or emitting out of order. Of course it's possible to band-aid fix this in a middle-ware layer (and in an old legacy system this might be the only way), but its best to avoid this sort of design wherever possible.
This solution is the shape of a solution without much extra (nowhere near production level code). It does no error handling. It doesn't ensure indices are monotonically increasing, etc. It is not meant to be a robust solution. Fortunately, many of those issues are addressed in the documentation/answered in other questions here on stackoverflow :)
Feel free to modify/clarify your question. If you have separate questions or you want to re-frame this question entirely, it's best to just open a new question.
Update: Some more code
I see on revision that perhaps you're asking about how to navigate an asynchronous linked list. This is more interesting because each next value depends on the previous one (You can't index into a linked list, you must follow the links).
Here's an example of how you might follow an async linked list back 4 nodes and then emit them in the correct order.
The output:
You can the use all of the usual RxJS suspects to create/merge this with other streams (That's where I imagine you'll acquire a start node from which to backtrack).
In the previous section, I used
range
,concatMap
, &endWith
but here you could replace that withbacktrackLinks
This embeds all that logic in one function sincebacktrackLinks(value, 0)
equalsof(value)
.