如何停止、终止、停止或关闭使用 Twitter Stream 给出的流示例上的 PycURL 请求

发布于 2024-11-08 03:29:30 字数 2748 浏览 2 评论 0原文

我目前正在 cURLing twitter API 流 (http://stream.twitter.com/1/statuses/sample.json),因此我不断接收数据。我希望在从流中检索到 X 个对象后停止卷曲流(在示例中我给出 10 作为任意数字)。

您可以在下面的代码中看到我如何尝试关闭连接。 curling.perform() 下面的代码永远不会执行,因为它是连续的数据流。所以我尝试关闭 body_callback 中的流,但是因为 Perform() 当前正在运行,所以我无法调用 close()。

任何帮助将不胜感激。

代码:

# Imports
import pycurl # Used for doing cURL request
import base64 # Used to encode username and API Key
import json # Used to break down the json objects

# Settings to access stream and API
userName = 'twitter_username' # My username
password = 'twitter_password' # My API Key
apiURL = 'http://stream.twitter.com/1/statuses/sample.json' # the twitter api
tweets = [] # An array of Tweets

# Methods to do with the tweets array
def how_many_tweets():
    print 'Collected: ',len(tweets)
    return len(tweets)

class Tweet:
    def __init__(self):
        self.raw = ''
        self.id = ''
        self.content = ''

    def decode_json(self):
        return True

    def set_id(self):
        return True

    def set_content(self):
        return True

    def set_raw(self, data):
        self.raw = data

# Class to print out the stream as it comes from the API
class Stream:
    def __init__(self):
        self.tweetBeingRead =''

    def body_callback(self, buf):
        # This gets whole Tweets, and adds them to an array called tweets
        if(buf.startswith('{"in_reply_to_status_id_str"')): # This is the start of a tweet
            # Added Tweet to Global Array Tweets
            print 'Added:' # Priniting output to console
            print self.tweetBeingRead # Printing output to console
            theTweetBeingProcessed = Tweet() # Create a new Tweet Object
            theTweetBeingProcessed.set_raw(self.tweetBeingRead) # Set its raw value to tweetBeingRead
            tweets.append(theTweetBeingProcessed) # Add it to the global array of tweets
            # Start processing a new tweet
            self.tweet = buf # Start a new tweet from scratch
        else:
            self.tweetBeingRead = self.tweetBeingRead+buf
        if(how_many_tweets()>10):
            try:
                curling.close() # This is where the problem lays. I want to close the stream
            except Exception as CurlError:
                print ' Tried closing stream: ',CurlError

# Used to initiate the cURLing of the Data Sift streams
datastream = Stream()
curling = pycurl.Curl()
curling.setopt(curling.URL, apiURL)
curling.setopt(curling.HTTPHEADER, ['Authorization: '+base64.b64encode(userName+":"+password)])
curling.setopt(curling.WRITEFUNCTION, datastream.body_callback)
curling.perform() # This is cURLing starts
print 'I cant reach here.'
curling.close() # This never gets called. :(

Im currently cURLing the twitter API stream (http://stream.twitter.com/1/statuses/sample.json), so am constantly receiving data. I wish to stop cURLing the stream once i have retrieved X number of objects from it (in the example I give 10 as an arbitrary number).

You can see how I have attempted to close the connection in the code below. The code below curling.perform() never executes, due to the fact that it is a continuous stream of data. So I attempted to close the stream in the body_callback, however because perform() is currently running i can not invoke close().

Any help would be appreciated.

Code:

# Imports
import pycurl # Used for doing cURL request
import base64 # Used to encode username and API Key
import json # Used to break down the json objects

# Settings to access stream and API
userName = 'twitter_username' # My username
password = 'twitter_password' # My API Key
apiURL = 'http://stream.twitter.com/1/statuses/sample.json' # the twitter api
tweets = [] # An array of Tweets

# Methods to do with the tweets array
def how_many_tweets():
    print 'Collected: ',len(tweets)
    return len(tweets)

class Tweet:
    def __init__(self):
        self.raw = ''
        self.id = ''
        self.content = ''

    def decode_json(self):
        return True

    def set_id(self):
        return True

    def set_content(self):
        return True

    def set_raw(self, data):
        self.raw = data

# Class to print out the stream as it comes from the API
class Stream:
    def __init__(self):
        self.tweetBeingRead =''

    def body_callback(self, buf):
        # This gets whole Tweets, and adds them to an array called tweets
        if(buf.startswith('{"in_reply_to_status_id_str"')): # This is the start of a tweet
            # Added Tweet to Global Array Tweets
            print 'Added:' # Priniting output to console
            print self.tweetBeingRead # Printing output to console
            theTweetBeingProcessed = Tweet() # Create a new Tweet Object
            theTweetBeingProcessed.set_raw(self.tweetBeingRead) # Set its raw value to tweetBeingRead
            tweets.append(theTweetBeingProcessed) # Add it to the global array of tweets
            # Start processing a new tweet
            self.tweet = buf # Start a new tweet from scratch
        else:
            self.tweetBeingRead = self.tweetBeingRead+buf
        if(how_many_tweets()>10):
            try:
                curling.close() # This is where the problem lays. I want to close the stream
            except Exception as CurlError:
                print ' Tried closing stream: ',CurlError

# Used to initiate the cURLing of the Data Sift streams
datastream = Stream()
curling = pycurl.Curl()
curling.setopt(curling.URL, apiURL)
curling.setopt(curling.HTTPHEADER, ['Authorization: '+base64.b64encode(userName+":"+password)])
curling.setopt(curling.WRITEFUNCTION, datastream.body_callback)
curling.perform() # This is cURLing starts
print 'I cant reach here.'
curling.close() # This never gets called. :(

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

冰葑 2024-11-15 03:29:30

您可以通过返回与传入的数字不同的数字来中止写入回调。 (默认情况下,它对待返回“无”与返回传递给它的相同数字相同)

当您中止它时,整个传输将被视为已完成,并且您的执行()调用正确返回。

然后,由于传输被中止,该传输将返回错误。

You can abort the write callback by returning a number that isn't the same amount as was passed in to it. (By default it treats returning 'None' the same as returning the same number as was passed in to it)

When you abort it, the entire transfer will be considered done and your perform() call returns properly.

That transfer will then return an error as the transfer was aborted.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文