在 python 中录制流媒体并保存网络广播

发布于 2024-10-03 09:59:48 字数 157 浏览 9 评论 0原文

我正在寻找一个 python 片段来读取互联网广播流(.asx、.pls 等)并将其保存到文件中。

最终的项目是 cron'ed 脚本,它将录制一两个小时的网络广播,然后将其传输到我的手机上,以便在通勤时播放。 (3g 在我的通勤路上有点不稳定)

欢迎任何片段或指点。

I am looking for a python snippet to read an internet radio stream(.asx, .pls etc) and save it to a file.

The final project is cron'ed script that will record an hour or two of internet radio and then transfer it to my phone for playback during my commute. (3g is kind of spotty along my commute)

any snippits or pointers are welcome.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(6

陌生 2024-10-10 09:59:48

以下内容对我有用,使用 requests 库来处理 http 请求。

import requests

stream_url = 'http://your-stream-source.com/stream'

r = requests.get(stream_url, stream=True)

with open('stream.mp3', 'wb') as f:
    try:
        for block in r.iter_content(1024):
            f.write(block)
    except KeyboardInterrupt:
        pass

这会将流保存到 stream.mp3 文件中,直到您使用 ctrl+C 中断它。

The following has worked for me using the requests library to handle the http request.

import requests

stream_url = 'http://your-stream-source.com/stream'

r = requests.get(stream_url, stream=True)

with open('stream.mp3', 'wb') as f:
    try:
        for block in r.iter_content(1024):
            f.write(block)
    except KeyboardInterrupt:
        pass

That will save a stream to the stream.mp3 file until you interrupt it with ctrl+C.

深白境迁sunset 2024-10-10 09:59:48

因此,在修补和使用它之后,我发现 Streamripper 工作得最好。这是我使用的命令

streamripper http://yp.shoutcast.com/sbin/tunein-station.pls?id=1377200 -d ./streams -l 10800 -a tb$FNAME

So after tinkering and playing with it Ive found Streamripper to work best. This is the command i use

streamripper http://yp.shoutcast.com/sbin/tunein-station.pls?id=1377200 -d ./streams -l 10800 -a tb$FNAME
总以为 2024-10-10 09:59:48

如果您发现 Python 3 中的 requests 或 urllib.request 调用无法保存流,因为您收到“ICY 200 OK”返回而不是“HTTP/1.0 200 OK”标头,则需要告诉底层函数 ICY 200好啦好啦!

您可以有效地做的是在打开流后、处理标头之前拦截处理读取状态的例程。

只需将这样的例程放在您的流打开代码上方即可。

def NiceToICY(self):
    class InterceptedHTTPResponse():
        pass
    import io
    line = self.fp.readline().replace(b"ICY 200 OK\r\n", b"HTTP/1.0 200 OK\r\n")
    InterceptedSelf = InterceptedHTTPResponse()
    InterceptedSelf.fp = io.BufferedReader(io.BytesIO(line))
    InterceptedSelf.debuglevel = self.debuglevel
    InterceptedSelf._close_conn = self._close_conn
    return ORIGINAL_HTTP_CLIENT_READ_STATUS(InterceptedSelf)

然后在打开 URL 之前将这些行放在主例程的开头。

ORIGINAL_HTTP_CLIENT_READ_STATUS = urllib.request.http.client.HTTPResponse._read_status
urllib.request.http.client.HTTPResponse._read_status = NiceToICY

他们将覆盖标准例程(仅这一次)并在打开流时运行 NiceToICY 函数来代替正常状态检查。 NiceToICY 替换无法识别的状态响应,然后复制“真实”_read_status 函数所需的原始响应的相关位。最后调用原始函数,并将其中的值传递回调用者,其他一切都照常进行。

我发现这是解决状态消息导致错误的问题的最简单方法。希望它对您也有用。

If you find that your requests or urllib.request call in Python 3 fails to save a stream because you receive "ICY 200 OK" in return instead of an "HTTP/1.0 200 OK" header, you need to tell the underlying functions ICY 200 OK is OK!

What you can effectively do is intercept the routine that handles reading the status after opening the stream, just before processing the headers.

Simply put a routine like this above your stream opening code.

def NiceToICY(self):
    class InterceptedHTTPResponse():
        pass
    import io
    line = self.fp.readline().replace(b"ICY 200 OK\r\n", b"HTTP/1.0 200 OK\r\n")
    InterceptedSelf = InterceptedHTTPResponse()
    InterceptedSelf.fp = io.BufferedReader(io.BytesIO(line))
    InterceptedSelf.debuglevel = self.debuglevel
    InterceptedSelf._close_conn = self._close_conn
    return ORIGINAL_HTTP_CLIENT_READ_STATUS(InterceptedSelf)

Then put these lines at the start of your main routine, before you open the URL.

ORIGINAL_HTTP_CLIENT_READ_STATUS = urllib.request.http.client.HTTPResponse._read_status
urllib.request.http.client.HTTPResponse._read_status = NiceToICY

They will override the standard routine (this one time only) and run the NiceToICY function in place of the normal status check when it has opened the stream. NiceToICY replaces the unrecognised status response, then copies across the relevant bits of the original response which are needed by the 'real' _read_status function. Finally the original is called and the values from that are passed back to the caller and everything else continues as normal.

I have found this to be the simplest way to get round the problem of the status message causing an error. Hope it's useful for you, too.

桃扇骨 2024-10-10 09:59:48

我知道这已经是一年前的事了,但这仍然是一个可行的问题,我最近一直在摆弄这个问题。

大多数网络广播电台都会为您提供下载类型的选项,我选择 MP3 版本,然后从原始套接字读取信息并将其写入文件。诀窍是计算出下载与播放歌曲相比的速度有多快,以便您可以在读/写大小上取得平衡。这将在您的缓冲区定义中。

现在您已经有了文件,只需将其保留在驱动器(记录)上就可以了,但是大多数播放器会从文件中删除已播放的块,并在流媒体停止时从驱动器和内存中清除文件。

我使用了文件存档中的一些代码片段,没有压缩应用程序来处理大量文件处理、播放、缓冲魔法。其流程非常相似。如果您编写一些 sudo 代码(我强烈推荐),您可以看到相似之处。

I am aware this is a year old, but this is still a viable question, which I have recently been fiddling with.

Most internet radio stations will give you an option of type of download, I choose the MP3 version, then read the info from a raw socket and write it to a file. The trick is figuring out how fast your download is compared to playing the song so you can create a balance on the read/write size. This would be in your buffer def.

Now that you have the file, it is fine to simply leave it on your drive (record), but most players will delete from file the already played chunk and clear the file out off the drive and ram when streaming is stopped.

I have used some code snippets from a file archive without compression app to handle a lot of the file file handling, playing, buffering magic. It's very similar in how the process flows. If you write up some sudo-code (which I highly recommend) you can see the similarities.

山川志 2024-10-10 09:59:48

我只熟悉shoutcast流的工作原理(这将是您提到的.pls文件):

您下载pls文件,它只是一个播放列表。它的格式相当简单,因为它只是一个指向真实流所在位置的文本文件。

您可以连接到该流,因为它只是 HTTP,可以传输 MP3 或 AAC。供您使用时,只需将获得的每个字节保存到文件中,您将获得可传输到 MP3 播放器的 MP3 或 AAC 文件。

Shoutcast 有一项可选附加功能:元数据。您可以在此处了解其工作原理,但实际上并不需要。

如果您想要一个执行此操作的示例应用程序,请告诉我,我稍后会弥补一些内容。

I'm only familiar with how shoutcast streaming works (which would be the .pls file you mention):

You download the pls file, which is just a playlist. It's format is fairly simple as it's just a text file that points to where the real stream is.

You can connect to that stream as it's just HTTP, that streams either MP3 or AAC. For your use, just save every byte you get to a file and you'll get an MP3 or AAC file you can transfer to your mp3 player.

Shoutcast has one addition that is optional: metadata. You can find how that works here, but is not really needed.

If you want a sample application that does this, let me know and I'll make up something later.

吐个泡泡 2024-10-10 09:59:48

https://stackoverflow.com/users/1543257/dingleshttps://stackoverflow.com/a/41338150),以下是如何使用异步 HTTP 客户端库 - aiohttp:

import functools

import aiohttp
from aiohttp.client_proto import ResponseHandler
from aiohttp.http_parser import HttpResponseParserPy


class ICYHttpResponseParser(HttpResponseParserPy):
    def parse_message(self, lines):
        if lines[0].startswith(b"ICY "):
            lines[0] = b"HTTP/1.0 " + lines[0][4:]
        return super().parse_message(lines)


class ICYResponseHandler(ResponseHandler):
    def set_response_params(
        self,
        *,
        timer = None,
        skip_payload = False,
        read_until_eof = False,
        auto_decompress = True,
        read_timeout = None,
        read_bufsize = 2 ** 16,
        timeout_ceil_threshold = 5,
    ) -> None:
        # this is a copy of the implementation from here:
        # https://github.com/aio-libs/aiohttp/blob/v3.8.1/aiohttp/client_proto.py#L137-L165
        self._skip_payload = skip_payload

        self._read_timeout = read_timeout
        self._reschedule_timeout()

        self._timeout_ceil_threshold = timeout_ceil_threshold

        self._parser = ICYHttpResponseParser(
            self,
            self._loop,
            read_bufsize,
            timer=timer,
            payload_exception=aiohttp.ClientPayloadError,
            response_with_body=not skip_payload,
            read_until_eof=read_until_eof,
            auto_decompress=auto_decompress,
        )

        if self._tail:
            data, self._tail = self._tail, b""
            self.data_received(data)


class ICYConnector(aiohttp.TCPConnector):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._factory = functools.partial(ICYResponseHandler, loop=self._loop)

然后可以按如下方式使用:

session = aiohttp.ClientSession(connector=ICYConnector())
async with session.get("url") as resp:
    print(resp.status)

是的,它使用了一些私有类和属性,但这是更改 HTTP 规范一部分的某些内容的处理的唯一解决方案,并且(理论上)永远不需要更改库的用户...

考虑到所有事情,我想说,与猴子修补相比,这仍然相当干净,猴子修补会导致所有请求的行为发生变化(特别是对于 asyncio 来说,在请求之前设置和在请求之后重置并不能保证当向 ICY 发出请求时,其他东西不会发出请求)。这样,您可以将 ClientSession 对象专门用于对使用 ICY 状态行进行响应的服务器的请求。

请注意,这会对使用 ICYConnector 发出的请求带来性能损失 - 为了使其正常工作,我使用 HttpResponseParser 的纯 Python 实现,这会比较慢与 aiohttp 默认使用并用 C 语言编写的那个相比。如果不提供整个库,这实际上无法以不同的方式完成,因为解析状态行的行为深深隐藏在 C 代码中。

In line with the answer from https://stackoverflow.com/users/1543257/dingles (https://stackoverflow.com/a/41338150), here's how you can achieve the same result with the asynchronous HTTP client library - aiohttp:

import functools

import aiohttp
from aiohttp.client_proto import ResponseHandler
from aiohttp.http_parser import HttpResponseParserPy


class ICYHttpResponseParser(HttpResponseParserPy):
    def parse_message(self, lines):
        if lines[0].startswith(b"ICY "):
            lines[0] = b"HTTP/1.0 " + lines[0][4:]
        return super().parse_message(lines)


class ICYResponseHandler(ResponseHandler):
    def set_response_params(
        self,
        *,
        timer = None,
        skip_payload = False,
        read_until_eof = False,
        auto_decompress = True,
        read_timeout = None,
        read_bufsize = 2 ** 16,
        timeout_ceil_threshold = 5,
    ) -> None:
        # this is a copy of the implementation from here:
        # https://github.com/aio-libs/aiohttp/blob/v3.8.1/aiohttp/client_proto.py#L137-L165
        self._skip_payload = skip_payload

        self._read_timeout = read_timeout
        self._reschedule_timeout()

        self._timeout_ceil_threshold = timeout_ceil_threshold

        self._parser = ICYHttpResponseParser(
            self,
            self._loop,
            read_bufsize,
            timer=timer,
            payload_exception=aiohttp.ClientPayloadError,
            response_with_body=not skip_payload,
            read_until_eof=read_until_eof,
            auto_decompress=auto_decompress,
        )

        if self._tail:
            data, self._tail = self._tail, b""
            self.data_received(data)


class ICYConnector(aiohttp.TCPConnector):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._factory = functools.partial(ICYResponseHandler, loop=self._loop)

This can then be used as follows:

session = aiohttp.ClientSession(connector=ICYConnector())
async with session.get("url") as resp:
    print(resp.status)

Yes, it's using a few private classes and attributes but this is the only solution to change the handling of something that's part of HTTP spec and (theoretically) should not ever need to be changed by the library's user...

All things considered, I would say this is still rather clean in comparison to monkey patching which would cause the behavior to be changed for all requests (especially true for asyncio where setting before and resetting after a request does not guarantee that something else won't make a request while request to ICY is being made). This way, you can dedicate a ClientSession object specifically for requests to servers that respond with the ICY status line.

Note that this comes with a performance penalty for requests made with ICYConnector - in order for this to work, I am using the pure Python implementation of HttpResponseParser which is going to be slower than the one that aiohttp uses by default and is written in C. This cannot really be done differently without vendoring the whole library as the behavior for parsing status line is deeply hidden in the C code.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文