RxJs Observable 管道内的区块链回溯逻辑

发布于 2025-01-15 05:56:08 字数 3479 浏览 4 评论 0原文

背景

我使用 NestJS 和 Observable 模式以及 HttpModule 来“观察”并最终转发 JSON-RPC 服务器(在本例中为区块链)返回的值节点。

如果您不熟悉区块链,可以将它们视为一个链表,其中列表中的每个新元素都指向前一个元素,如下所示:

let blockchain = [
  [0,{value: 'something', previous: None}], 
  [1,{value: 'somethingelse', previous: 0}], ...
  [N,{value: 'somethingsomething', previous: N-1}]
]

有时区块链系统中可能会发生“分叉”。它看起来像下面的树:

[A] <-- [B]<--- [C]
         |
         --- [C'] <--- [D] <--- [E] <--- ... and so on

问题

如果我的 NestJS 应用程序在时间 0 获取块 [A],在时间 1 获取块 [B] 并在时间 1 获取块 [ C] 在时间 2,但突然在时间 3 我得到了 [E],我不会得到块 [D][C ']。这意味着我将无法检查这两个缺失块中的值。

逻辑

因为所有块都有一个指向前一个块的指针,所以我确实能够通过简单地从块 [E] 传递指向块 [D] 的指针来检索块 [D]。类似地,从块 [D] 我随后可以获得块 [C']。因为 [C'] 有一个指向 [B] 的指针,所以我可以成功检索所有丢失的块。

我对 Observables 还很陌生,所以我不完全确定当我以这种方式使用 NestJS HttpModule 时如何递归回溯:

export class BlockchainService {
    private url: string;
    private top?: number;

    constructor(private httpService: HttpService,
                private config: ConfigService) {
        this.url = this.config.get<string>('my_blockchain_url')
    }

    getBestBlockHash(): Observable<AxiosResponse<any>> {
        return this.httpService.post(this.url, { 
            "method" : "getbestblockhash" 
        })
    }

    getBestBlock(): Observable<AxiosResponse<any>> {
        return this.getBestBlockHash().pipe(
            mergeMap((hash) => this.getBlock(hash.data.result))
        )
    }

    getBlock(hash: string): Observable<AxiosResponse<any>> {
        return this.httpService.post(this.url, {
            "method" : "getblock",
            "params" : {
                "blockhash" : hash
            }
        })
}

尝试 1

因为 Observable 保存块数据,我无法评估是否回溯,或者在没有订阅它或管道它的情况下。

使用 Mrk Sef 下面的建议和 iif() 运算符,似乎让我更进一步,因为我可以将 getBestBlock() Observable 作为参数传递到利用 iif()checkBackTrack 函数,如下所示:

    checkBackTrack(obs: Observable<AxiosResponse<any>>): Observable<AxiosResponse<any>> {
        let diff: number
        let previoushash: string
        console.log(diff, previoushash)

        obs.pipe(tap(block => {
            diff = this.top - block.data.result.height
            previoushash = block.data.result.previoushash
        }))
        console.log(diff, previoushash)

        const backTrackNeeded = iif( 
            () => diff > 0, 
            this.backTrack(diff, previoushash),
            obs 
        )
        return backTrackNeeded;
    }

其中 backTrakc 函数如下:

backTrack(n: number, previoushash: string): Observable<AxiosResponse<any>> {
        return (n < 0) ? EMPTY : this.getBlock(previoushash).pipe(
            switchMap(previousBlock => this.backTrack(n-1, previousBlock.data.result.previousblockhash)),
        )
    }

允许我执行以下操作: this.checkBackTrack(this.getBestBlock())

但是,我无法在 checkBackTrack 函数中定义 diffprevioushash...另外,这会带来副作用,这是我不希望的。

Background

I am using NestJS and the Observable<AxiosResponse> pattern with the HttpModule to "observe" and eventually forward values returned by a JSON-RPC server, in this case a Blockchain node.

In case Blockchains are unfamiliar, they can be thought of as a linked list, where each new element in the list points to the previous element, something like this:

let blockchain = [
  [0,{value: 'something', previous: None}], 
  [1,{value: 'somethingelse', previous: 0}], ...
  [N,{value: 'somethingsomething', previous: N-1}]
]

Sometimes a "fork" can happen in a blockchain system. It would look something like the below tree:

[A] <-- [B]<--- [C]
         |
         --- [C'] <--- [D] <--- [E] <--- ... and so on

Problem

If my NestJS application gets Block [A] at time 0, block [B] at time 1 and block [C] at time 2, but suddenly, at time 3 I get [E], I would not get block [D] and [C']. This means that I would be unable to inspect the values in these two missing blocks.

Logic

Because all blocks have a pointer to a previous block, I do have the ability to retrieve block [D] by simply passing the pointer to it from block [E]. Similarly, from block [D] I could subsequently get block [C']. Because [C'] has a pointer to [B] I would have successfully retrieved all missing blocks.

I am quite new to Observables so I am not entirely sure of how I can backtrack recursively when I use the NestJS HttpModule in this way:

export class BlockchainService {
    private url: string;
    private top?: number;

    constructor(private httpService: HttpService,
                private config: ConfigService) {
        this.url = this.config.get<string>('my_blockchain_url')
    }

    getBestBlockHash(): Observable<AxiosResponse<any>> {
        return this.httpService.post(this.url, { 
            "method" : "getbestblockhash" 
        })
    }

    getBestBlock(): Observable<AxiosResponse<any>> {
        return this.getBestBlockHash().pipe(
            mergeMap((hash) => this.getBlock(hash.data.result))
        )
    }

    getBlock(hash: string): Observable<AxiosResponse<any>> {
        return this.httpService.post(this.url, {
            "method" : "getblock",
            "params" : {
                "blockhash" : hash
            }
        })
}

Attempt 1

Because the Observable holds the block data, I cannot evaluate whether to backtrack, or not without subscribe to it, or pipe it.

Using Mrk Sef's proposal below and the iif() operator, seems to take me further since I can pass the getBestBlock() Observable as a parameter to a checkBackTrack function leveraging iif() as follows:

    checkBackTrack(obs: Observable<AxiosResponse<any>>): Observable<AxiosResponse<any>> {
        let diff: number
        let previoushash: string
        console.log(diff, previoushash)

        obs.pipe(tap(block => {
            diff = this.top - block.data.result.height
            previoushash = block.data.result.previoushash
        }))
        console.log(diff, previoushash)

        const backTrackNeeded = iif( 
            () => diff > 0, 
            this.backTrack(diff, previoushash),
            obs 
        )
        return backTrackNeeded;
    }

where the backTrakc function looks like:

backTrack(n: number, previoushash: string): Observable<AxiosResponse<any>> {
        return (n < 0) ? EMPTY : this.getBlock(previoushash).pipe(
            switchMap(previousBlock => this.backTrack(n-1, previousBlock.data.result.previousblockhash)),
        )
    }

allows me to do the following: this.checkBackTrack(this.getBestBlock()).

However, I am unable to define diff and previoushash in the checkBackTrack function... Also, this introduces side effects, which I do not want.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

静若繁花 2025-01-22 05:56:08

寻求解决方案

“我应该如何确保每个M个空行都被回填?”

这意味着什么还不是很清楚,但我们可能处于您不确定如何进一步澄清的境地。我不确定这会回答你的问题,但也许它会帮助你进一步澄清。

一些代码

以下是我如何确保将“缺失”值合并到流中的方法。为了简单起见,这意味着是一个配对实现(例如,它假设索引单调递增,否则将会失败)。

interface Message {
  index: number,
  value: string
}

// simulate an http response that takes 1 second to return
function httpBackupMessage(i: number): Observable<Message> {
  return timer(1000).pipe(
    map(() => ({ index: i, value: "backup something" }))
  );
}

// An observable of messages, but it's missing messages with
// index 2 & 3
const sparseMessage$: Observable<Message> = of({
  index: 0,
  value: "something"
}, {
  index: 1,
  value: "something"
}, {
  index: 4,
  value: "something"
}, {
  index: 5,
  value: "something"
});

// An observable of messages with missing messages retrieved from
// httpBackupMessage
const denseMessage$ = sparseMessage$.pipe(
  // Dummy value is ignored, but tells out logic below that index 
  // 0 (-1 + 1) is the first index.
  startWith({ index: -1, value: "dummy value" } as Message),
  pairwise(),
  concatMap(([prev, curr]) => 
    range(prev.index + 1, curr.index - (prev.index + 1)).pipe(
      concatMap(httpBackupMessage),
      endWith(curr)
    )
  )
);

// Start the observable and print the messages to the console
denseMessage$.subscribe(console.log);

正如您所看到的,sparseMessage$ 缺少索引为 2 和 2 的消息。 3. 以下是由 denseMessage$ 发出的内容:

{ index: 0, value: 'something' }
{ index: 1, value: 'something' }
{ index: 2, value: 'backup something' }
{ index: 3, value: 'backup something' }
{ index: 4, value: 'something' }
{ index: 5, value: 'something' }

一些注释

  1. 这会创建一个新的可观察量,该可观察量会使用缺失的消息来转换原始可观察量。它不会创建单独的可观察量或数组(两者都是可能的,并且与此处所做的没有太大不同)。

  2. 这是模拟一种中间件,具有一些针对缺失值的业务逻辑。然而...

现在,假设我的 httpService.post() 请求由于某种原因被延迟,并且我收到的 JSON-RPC 响应已返回索引:N + M

这听起来像是您担心源超时或乱序发出。当然,可以在中间件层中临时修复此问题(在旧的遗留系统中,这可能是唯一的方法),但最好尽可能避免这种设计。

  1. 此解决方案是一个没有太多额外内容的解决方案(远未达到生产级别代码)。它不进行错误处理。它不能确保索引单调递增等。它并不意味着是一个强大的解决方案。幸运的是,其中许多问题已在文档中得到解决/在 stackoverflow 上的其他问题中得到解答:)

  2. 请随意修改/澄清您的问题。如果您有单独的问题或想要完全重新构建此问题,最好提出一个新问题。

更新:我在修订版中看到的更多代码

可能您在询问如何导航异步链接列表。这更有趣,因为每个下一个值都取决于前一个值(您无法索引到链接列表,您必须跟踪链接)。

下面是一个示例,说明如何跟踪异步链表返回 4 个节点,然后以正确的顺序发出它们。

// This an be any sort of resource, we'll use numbers
type Link = number

// Entries can be thought of as nodes in a linked list
interface DataEntry {
  prev: Link;
  value: string;
}

// Simulate Data stored on a server, access only allowed via getData
const serverData = new Map<Link, DataEntry>([
  [ 4153, {
    prev: 4121,
    value: 'Something 5',
  }], [ 4273, {
    prev: 4153,
    value: 'Something 6',
  }], [ 4291, {
    prev: 4273,
    value: 'Something 7',
  }], [ 4300, {
    prev: 4291,
    value: 'Something 8',
  }]
]);

// Simulate a call to the server. Each call takes 50ms
function getData(pointer: Link): Observable<DataEntry> {
  return timer(50).pipe(map(() => serverData.get(pointer)));
}

// Navigate backwards from start, n times. Buffer all values and emit
// in reverse order (The value n steps back is emitted first)
function backtrackLinks(start: DataEntry, n: number): Observable<DataEntry> {
  return n < 0 ? EMPTY : getData(start.prev).pipe(
    switchMap(previous => backtrackLinks(previous, n - 1)),
    endWith(start)
  );
}

// Pretend we've aquired a node through some earlier process
const nodeNine = {prev: 4300, value: 'Something 9'}
// An example of running the `backtrackLinks` observable and 
// printing the nodes to the console
backtrackLinks(nodeNine , 4).subscribe(console.log);

输出:

{prev: 4121, value: "Something 5"}
{prev: 4153, value: "Something 6"}
{prev: 4273, value: "Something 7"}
{prev: 4291, value: "Something 8"}
{prev: 4300, value: "Something 9"}

您可以使用所有常见的 RxJS 嫌疑人来创建/将其与其他流合并(我想您将在其中获取一个可以回溯的起始节点)。

在上一节中,我使用了 rangeconcatMap、& endWith 但在这里您可以将其替换为 backtrackLinks 这会将所有逻辑嵌入到一个函数中,因为 backtrackLinks(value, 0) 等于 of (值)

Toward a solution

"How should I make sure each M empty line is backfilled?"

What this means isn't very clear, but we might be in a position where you're not sure how to clarify further. I'm not sure this will answer your question, but perhaps it will help you clarify further.

Some code

Here is how I might ensure "missing" values are merged into a stream. This is meant to be a paired down implementation (for example, it assumes index is monotonically increasing and will fail otherwise) for the sake of simplicity.

interface Message {
  index: number,
  value: string
}

// simulate an http response that takes 1 second to return
function httpBackupMessage(i: number): Observable<Message> {
  return timer(1000).pipe(
    map(() => ({ index: i, value: "backup something" }))
  );
}

// An observable of messages, but it's missing messages with
// index 2 & 3
const sparseMessage$: Observable<Message> = of({
  index: 0,
  value: "something"
}, {
  index: 1,
  value: "something"
}, {
  index: 4,
  value: "something"
}, {
  index: 5,
  value: "something"
});

// An observable of messages with missing messages retrieved from
// httpBackupMessage
const denseMessage$ = sparseMessage$.pipe(
  // Dummy value is ignored, but tells out logic below that index 
  // 0 (-1 + 1) is the first index.
  startWith({ index: -1, value: "dummy value" } as Message),
  pairwise(),
  concatMap(([prev, curr]) => 
    range(prev.index + 1, curr.index - (prev.index + 1)).pipe(
      concatMap(httpBackupMessage),
      endWith(curr)
    )
  )
);

// Start the observable and print the messages to the console
denseMessage$.subscribe(console.log);

As you can see, sparseMessage$ is missing a message with index 2 & 3. Below is what is emitted by denseMessage$:

{ index: 0, value: 'something' }
{ index: 1, value: 'something' }
{ index: 2, value: 'backup something' }
{ index: 3, value: 'backup something' }
{ index: 4, value: 'something' }
{ index: 5, value: 'something' }

Some Notes

  1. This creates a new observable that transforms the original observable with the missing messages. It doesn't create a separate observable or array (both are possible, and not too dissimilar to what's done here).

  2. This is emulating a sort of middle-ware with some business logic for missing values. And yet ...

Now, say my httpService.post() request gets delayed for some reason and the JSON-RPC response I get back has returned index: N + M

this makes it sound like you're worried about the source timing out or emitting out of order. Of course it's possible to band-aid fix this in a middle-ware layer (and in an old legacy system this might be the only way), but its best to avoid this sort of design wherever possible.

  1. This solution is the shape of a solution without much extra (nowhere near production level code). It does no error handling. It doesn't ensure indices are monotonically increasing, etc. It is not meant to be a robust solution. Fortunately, many of those issues are addressed in the documentation/answered in other questions here on stackoverflow :)

  2. Feel free to modify/clarify your question. If you have separate questions or you want to re-frame this question entirely, it's best to just open a new question.

Update: Some more code

I see on revision that perhaps you're asking about how to navigate an asynchronous linked list. This is more interesting because each next value depends on the previous one (You can't index into a linked list, you must follow the links).

Here's an example of how you might follow an async linked list back 4 nodes and then emit them in the correct order.

// This an be any sort of resource, we'll use numbers
type Link = number

// Entries can be thought of as nodes in a linked list
interface DataEntry {
  prev: Link;
  value: string;
}

// Simulate Data stored on a server, access only allowed via getData
const serverData = new Map<Link, DataEntry>([
  [ 4153, {
    prev: 4121,
    value: 'Something 5',
  }], [ 4273, {
    prev: 4153,
    value: 'Something 6',
  }], [ 4291, {
    prev: 4273,
    value: 'Something 7',
  }], [ 4300, {
    prev: 4291,
    value: 'Something 8',
  }]
]);

// Simulate a call to the server. Each call takes 50ms
function getData(pointer: Link): Observable<DataEntry> {
  return timer(50).pipe(map(() => serverData.get(pointer)));
}

// Navigate backwards from start, n times. Buffer all values and emit
// in reverse order (The value n steps back is emitted first)
function backtrackLinks(start: DataEntry, n: number): Observable<DataEntry> {
  return n < 0 ? EMPTY : getData(start.prev).pipe(
    switchMap(previous => backtrackLinks(previous, n - 1)),
    endWith(start)
  );
}

// Pretend we've aquired a node through some earlier process
const nodeNine = {prev: 4300, value: 'Something 9'}
// An example of running the `backtrackLinks` observable and 
// printing the nodes to the console
backtrackLinks(nodeNine , 4).subscribe(console.log);

The output:

{prev: 4121, value: "Something 5"}
{prev: 4153, value: "Something 6"}
{prev: 4273, value: "Something 7"}
{prev: 4291, value: "Something 8"}
{prev: 4300, value: "Something 9"}

You can the use all of the usual RxJS suspects to create/merge this with other streams (That's where I imagine you'll acquire a start node from which to backtrack).

In the previous section, I used range, concatMap, & endWith but here you could replace that with backtrackLinks This embeds all that logic in one function since backtrackLinks(value, 0) equals of(value).

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文