Python GPS模块:读取最新的GPS数据

发布于 2024-11-09 23:54:39 字数 789 浏览 0 评论 0原文

我一直在尝试使用 python 2.6 中的标准 GPS (gps.py) 模块。它应该充当客户端并从 Ubuntu 中运行的 gpsd 读取 GPS 数据。

根据 GPSD 网页上有关客户端设计的文档 (GPSD Client Howto),我应该能够使用以下代码(对示例稍加修改)来获取最新的 GPS 读数(经纬度是我主要感兴趣的)

from gps import *
session = gps() # assuming gpsd running with default options on port 2947
session.stream(WATCH_ENABLE|WATCH_NEWSTYLE)
report = session.next()
print report

如果我重复使用 next() ,它会从队列底部为我提供缓冲值(从什么时候开始会话已开始),而不是最新的 Gps 读数。有没有办法使用这个库获取更新的值?在某种程度上,寻求最新值的流?

有没有人有使用这个库来轮询 GPS 并获取我正在寻找的值的代码示例?

这是我想要做的:

  1. 启动会话
  2. 等待用户在我的代码中调用 gps_poll() 方法
  3. 在此方法中读取最新的 TPV(时间位置速度)报告并返回 lat long
  4. Go返回等待用户调用 gps_poll()

I have been trying to work with the standard GPS (gps.py) module in python 2.6. This is supposed to act as a client and read GPS Data from gpsd running in Ubuntu.

According to the documentation from GPSD webpage on client design (GPSD Client Howto), I should be able to use the following code (slightly modified from the example) for getting latest GPS Readings (lat long is what I am mainly interested in)

from gps import *
session = gps() # assuming gpsd running with default options on port 2947
session.stream(WATCH_ENABLE|WATCH_NEWSTYLE)
report = session.next()
print report

If I repeatedly use the next() it gives me buffered values from the bottom of the queue (from when the session was started), and not the LATEST Gps reading. Is there a way to get more recent values using this library? In a Way, seek the Stream to the latest values?

Has anyone got a code example using this library to poll the gps and get the value i am looking for ?

Here is what I am trying to do:

  1. start the session
  2. Wait for user to call the gps_poll() method in my code
  3. Inside this method read the latest TPV (Time Position Velocity) report and return lat long
  4. Go back to waiting for user to call gps_poll()

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

就此别过 2024-11-16 23:54:39

您需要做的是定期轮询“session.next()” - 这里的问题是您正在处理串行接口 - 您按照收到的顺序获得结果。由您来维护具有最新检索值的“current_value”。

如果您不轮询会话对象,最终您的 UART FIFO 将被填满,并且无论如何您都不会获得任何新值。

考虑为此使用线程,不要等待用户调用 gps_poll(),您应该进行轮询,当用户想要新值时,他们使用返回 current_value 的“get_current_value()”。

在我的脑海中,它可能是这样简单的事情:

import threading
import time
from gps import *

class GpsPoller(threading.Thread):

   def __init__(self):
       threading.Thread.__init__(self)
       self.session = gps(mode=WATCH_ENABLE)
       self.current_value = None

   def get_current_value(self):
       return self.current_value

   def run(self):
       try:
            while True:
                self.current_value = self.session.next()
                time.sleep(0.2) # tune this, you might not get values that quickly
       except StopIteration:
            pass

if __name__ == '__main__':

   gpsp = GpsPoller()
   gpsp.start()
   # gpsp now polls every .2 seconds for new data, storing it in self.current_value
   while 1:
       # In the main thread, every 5 seconds print the current value
       time.sleep(5)
       print gpsp.get_current_value() 

What you need to do is regularly poll 'session.next()' - the issue here is that you're dealing with a serial interface - you get results in the order they were received. Its up to you to maintain a 'current_value' that has the latest retrieved value.

If you don't poll the session object, eventually your UART FIFO will fill up and you won't get any new values anyway.

Consider using a thread for this, don't wait for the user to call gps_poll(), you should be polling and when the user wants a new value they use 'get_current_value()' which returns current_value.

Off the top of my head it could be something as simple as this:

import threading
import time
from gps import *

class GpsPoller(threading.Thread):

   def __init__(self):
       threading.Thread.__init__(self)
       self.session = gps(mode=WATCH_ENABLE)
       self.current_value = None

   def get_current_value(self):
       return self.current_value

   def run(self):
       try:
            while True:
                self.current_value = self.session.next()
                time.sleep(0.2) # tune this, you might not get values that quickly
       except StopIteration:
            pass

if __name__ == '__main__':

   gpsp = GpsPoller()
   gpsp.start()
   # gpsp now polls every .2 seconds for new data, storing it in self.current_value
   while 1:
       # In the main thread, every 5 seconds print the current value
       time.sleep(5)
       print gpsp.get_current_value() 
苏别ゝ 2024-11-16 23:54:39

对于使用现代版本 GPSD 并且仅在特定时间需要数据而不是流式传输的任何人来说,上述答案非常低效且过于复杂。

大多数 GPS 每秒至少发送一次位置信息。大概是因为许多基于 GPS 的应用程序需要实时更新,我见过的绝大多数 GPSD 客户端示例都使用上述方法从 GPSD 观看流并接收实时更新(或多或少与 GPS 发送更新的频率相同) 。

但是,如果(如OP的情况)您不需要流信息,而只需要在请求时(即通过用户交互或其他一些事件)最后报告的位置,那么还有更多高效且简单的方法:让gpsd缓存最新的位置信息,需要的时候再查询。

gpsd JSON 协议 有一个 ?POLL; 请求,该请求返回 gpsd 看到的最新 GPS 信息。您可以在 gpsd 开始时发送 ?WATCH={"enable":true} 消息,而不必遍历积压的 gps 消息并不断读取新消息以避免缓冲区已满。会话,然后在需要时使用?POLL;查询最新的位置信息。响应是一个 JSON 对象,其中包含 gpsd 从 GPS 看到的最新信息。

如果您使用的是 Python3,我发现最简单的方法是使用 gpsd-py3 pypi 上有可用的包。连接到 gpsd、获取最新位置信息并打印当前位置:

import gpsd
gpsd.connect()
packet = gpsd.get_current()
print(packet.position())

只要需要新位置信息,您就可以重复 gpsd.get_current() 调用,gpsd 包将在幕后执行?POLL; 调用 gpsd 并返回表示响应的对象。

使用内置的 GPS 模块来完成此操作并不是非常简单,但是还有许多其他可用的 Python 客户端,并且使用任何可以执行套接字通信的东西也相当简单,包括这个使用 telnet 的示例:

$ telnet localhost 2947
Trying ::1...
Connected to localhost.
Escape character is '^]'.
{"class":"VERSION","release":"3.16","rev":"3.16","proto_major":3,"proto_minor":11}
?WATCH={"enable":true}
{"class":"DEVICES","devices":[{"class":"DEVICE","path":"/dev/pts/10","driver":"SiRF","activated":"2018-03-02T21:14:52.687Z","flags":1,"native":1,"bps":4800,"parity":"N","stopbits":1,"cycle":1.00}]}
{"class":"WATCH","enable":true,"json":false,"nmea":false,"raw":0,"scaled":false,"timing":false,"split24":false,"pps":false}
?POLL;
{"class":"POLL","time":"2018-03-02T21:14:54.873Z","active":1,"tpv":[{"class":"TPV","device":"/dev/pts/10","mode":3,"time":"2005-06-09T14:34:53.280Z","ept":0.005,"lat":46.498332203,"lon":7.567403907,"alt":1343.165,"epx":24.829,"epy":25.326,"epv":78.615,"track":10.3788,"speed":0.091,"climb":-0.085,"eps":50.65,"epc":157.23}],"gst":[{"class":"GST","device":"/dev/pts/10","time":"1970-01-01T00:00:00.000Z","rms":0.000,"major":0.000,"minor":0.000,"orient":0.000,"lat":0.000,"lon":0.000,"alt":0.000}],"sky":[{"class":"SKY","device":"/dev/pts/10","time":"2005-06-09T14:34:53.280Z","xdop":1.66,"ydop":1.69,"vdop":3.42,"tdop":3.05,"hdop":2.40,"gdop":5.15,"pdop":4.16,"satellites":[{"PRN":23,"el":6,"az":84,"ss":0,"used":false},{"PRN":28,"el":7,"az":160,"ss":0,"used":false},{"PRN":8,"el":66,"az":189,"ss":45,"used":true},{"PRN":29,"el":13,"az":273,"ss":0,"used":false},{"PRN":10,"el":51,"az":304,"ss":29,"used":true},{"PRN":4,"el":15,"az":199,"ss":36,"used":true},{"PRN":2,"el":34,"az":241,"ss":41,"used":true},{"PRN":27,"el":71,"az":76,"ss":42,"used":true}]}]}
?POLL;
{"class":"POLL","time":"2018-03-02T21:14:58.856Z","active":1,"tpv":[{"class":"TPV","device":"/dev/pts/10","mode":3,"time":"2005-06-09T14:34:53.280Z","ept":0.005,"lat":46.498332203,"lon":7.567403907,"alt":1343.165,"epx":24.829,"epy":25.326,"epv":78.615,"track":10.3788,"speed":0.091,"climb":-0.085,"eps":50.65,"epc":157.23}],"gst":[{"class":"GST","device":"/dev/pts/10","time":"1970-01-01T00:00:00.000Z","rms":0.000,"major":0.000,"minor":0.000,"orient":0.000,"lat":0.000,"lon":0.000,"alt":0.000}],"sky":[{"class":"SKY","device":"/dev/pts/10","time":"2005-06-09T14:34:53.280Z","xdop":1.66,"ydop":1.69,"vdop":3.42,"tdop":3.05,"hdop":2.40,"gdop":5.15,"pdop":4.16,"satellites":[{"PRN":23,"el":6,"az":84,"ss":0,"used":false},{"PRN":28,"el":7,"az":160,"ss":0,"used":false},{"PRN":8,"el":66,"az":189,"ss":45,"used":true},{"PRN":29,"el":13,"az":273,"ss":0,"used":false},{"PRN":10,"el":51,"az":304,"ss":29,"used":true},{"PRN":4,"el":15,"az":199,"ss":36,"used":true},{"PRN":2,"el":34,"az":241,"ss":41,"used":true},{"PRN":27,"el":71,"az":76,"ss":42,"used":true}]}]}

The above answers are very inefficient and overly complex for anyone using modern versions of gpsd and needing data at only specific times, instead of streaming.

Most GPSes send their position information at least once per second. Presumably since many GPS-based applications desire real-time updates, the vast majority of gpsd client examples I've seen use the above method of watching a stream from gpsd and receiving realtime updates (more or less as often as the gps sends them).

However, if (as in the OP's case) you don't need streaming information but just need the last-reported position whenever it's requested (i.e. via user interaction or some other event), there's a much more efficient and simpler method: let gpsd cache the latest position information, and query it when needed.

The gpsd JSON protocol has a ?POLL; request, which returns the most recent GPS information that gpsd has seen. Instead of having to iterate over the backlog of gps messages, and continually read new messages to avoid full buffers, you can send a ?WATCH={"enable":true} message at the start of the gpsd session, and then query the latest position information whenever you need it with ?POLL;. The response is a single JSON object containing the most recent information that gpsd has seen from the GPS.

If you're using Python3, the easiest way I've found is to use the gpsd-py3 package available on pypi. To connect to gpsd, get the latest position information, and print the current position:

import gpsd
gpsd.connect()
packet = gpsd.get_current()
print(packet.position())

You can repeat the gpsd.get_current() call whenever you want new position information, and behind the scenes the gpsd package will execute the ?POLL; call to gpsd and return an object representing the response.

Doing this with the built-in gps module isn't terribly straightforward, but there are a number of other Python clients available, and it's also rather trivial to do with anything that can perform socket communication, including this example using telnet:

$ telnet localhost 2947
Trying ::1...
Connected to localhost.
Escape character is '^]'.
{"class":"VERSION","release":"3.16","rev":"3.16","proto_major":3,"proto_minor":11}
?WATCH={"enable":true}
{"class":"DEVICES","devices":[{"class":"DEVICE","path":"/dev/pts/10","driver":"SiRF","activated":"2018-03-02T21:14:52.687Z","flags":1,"native":1,"bps":4800,"parity":"N","stopbits":1,"cycle":1.00}]}
{"class":"WATCH","enable":true,"json":false,"nmea":false,"raw":0,"scaled":false,"timing":false,"split24":false,"pps":false}
?POLL;
{"class":"POLL","time":"2018-03-02T21:14:54.873Z","active":1,"tpv":[{"class":"TPV","device":"/dev/pts/10","mode":3,"time":"2005-06-09T14:34:53.280Z","ept":0.005,"lat":46.498332203,"lon":7.567403907,"alt":1343.165,"epx":24.829,"epy":25.326,"epv":78.615,"track":10.3788,"speed":0.091,"climb":-0.085,"eps":50.65,"epc":157.23}],"gst":[{"class":"GST","device":"/dev/pts/10","time":"1970-01-01T00:00:00.000Z","rms":0.000,"major":0.000,"minor":0.000,"orient":0.000,"lat":0.000,"lon":0.000,"alt":0.000}],"sky":[{"class":"SKY","device":"/dev/pts/10","time":"2005-06-09T14:34:53.280Z","xdop":1.66,"ydop":1.69,"vdop":3.42,"tdop":3.05,"hdop":2.40,"gdop":5.15,"pdop":4.16,"satellites":[{"PRN":23,"el":6,"az":84,"ss":0,"used":false},{"PRN":28,"el":7,"az":160,"ss":0,"used":false},{"PRN":8,"el":66,"az":189,"ss":45,"used":true},{"PRN":29,"el":13,"az":273,"ss":0,"used":false},{"PRN":10,"el":51,"az":304,"ss":29,"used":true},{"PRN":4,"el":15,"az":199,"ss":36,"used":true},{"PRN":2,"el":34,"az":241,"ss":41,"used":true},{"PRN":27,"el":71,"az":76,"ss":42,"used":true}]}]}
?POLL;
{"class":"POLL","time":"2018-03-02T21:14:58.856Z","active":1,"tpv":[{"class":"TPV","device":"/dev/pts/10","mode":3,"time":"2005-06-09T14:34:53.280Z","ept":0.005,"lat":46.498332203,"lon":7.567403907,"alt":1343.165,"epx":24.829,"epy":25.326,"epv":78.615,"track":10.3788,"speed":0.091,"climb":-0.085,"eps":50.65,"epc":157.23}],"gst":[{"class":"GST","device":"/dev/pts/10","time":"1970-01-01T00:00:00.000Z","rms":0.000,"major":0.000,"minor":0.000,"orient":0.000,"lat":0.000,"lon":0.000,"alt":0.000}],"sky":[{"class":"SKY","device":"/dev/pts/10","time":"2005-06-09T14:34:53.280Z","xdop":1.66,"ydop":1.69,"vdop":3.42,"tdop":3.05,"hdop":2.40,"gdop":5.15,"pdop":4.16,"satellites":[{"PRN":23,"el":6,"az":84,"ss":0,"used":false},{"PRN":28,"el":7,"az":160,"ss":0,"used":false},{"PRN":8,"el":66,"az":189,"ss":45,"used":true},{"PRN":29,"el":13,"az":273,"ss":0,"used":false},{"PRN":10,"el":51,"az":304,"ss":29,"used":true},{"PRN":4,"el":15,"az":199,"ss":36,"used":true},{"PRN":2,"el":34,"az":241,"ss":41,"used":true},{"PRN":27,"el":71,"az":76,"ss":42,"used":true}]}]}
少女情怀诗 2024-11-16 23:54:39

添加我的两分钱。

不管出于什么原因,我的树莓派将继续执行一个线程,我必须硬重置树莓派。

所以我结合了 sysnthesizerpatel 和我在 Dan Mandel 的博客上找到的答案 这里

我的 gps_poller 类如下所示:

import os 
from gps import *
from time import *
import time 
import threading 

class GpsPoller(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)
        self.session = gps(mode=WATCH_ENABLE)
        self.current_value = None 
        self.running = True 

    def get_current_value(self):
        return self.current_value

    def run(self):
        try:
            while self.running:
                self.current_value = self.session.next() 
        except StopIteration:
            pass

使用的代码如下所示:

from gps_poll import *

if __name__ == '__main__':
    gpsp = GpsPoller()
    try: 
        gpsp.start() 
        while True:
            os.system('clear')
            report = gpsp.get_current_value()
            # print report 
            try: 
                if report.keys()[0] == 'epx':
                    print report['lat']
                    print report['lon']           
                time.sleep(.5)
            except(AttributeError, KeyError):
                pass 
            time.sleep(0.5)

    except(KeyboardInterrupt, SystemExit):
        print "\nKilling Thread.."
        gpsp.running = False 
        gpsp.join()

    print "Done.\nExiting." 

您还可以在此处找到代码: 此处这里

Adding my two cents.

For whatever reason my raspberry pi would continue to execute a thread and I'd have to hard reset the pi.

So I've combined sysnthesizerpatel and an answer I found on Dan Mandel's blog here.

My gps_poller class looks like this:

import os 
from gps import *
from time import *
import time 
import threading 

class GpsPoller(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)
        self.session = gps(mode=WATCH_ENABLE)
        self.current_value = None 
        self.running = True 

    def get_current_value(self):
        return self.current_value

    def run(self):
        try:
            while self.running:
                self.current_value = self.session.next() 
        except StopIteration:
            pass

And the code in use looks like this:

from gps_poll import *

if __name__ == '__main__':
    gpsp = GpsPoller()
    try: 
        gpsp.start() 
        while True:
            os.system('clear')
            report = gpsp.get_current_value()
            # print report 
            try: 
                if report.keys()[0] == 'epx':
                    print report['lat']
                    print report['lon']           
                time.sleep(.5)
            except(AttributeError, KeyError):
                pass 
            time.sleep(0.5)

    except(KeyboardInterrupt, SystemExit):
        print "\nKilling Thread.."
        gpsp.running = False 
        gpsp.join()

    print "Done.\nExiting." 

You can also find the code here: Here and Here

很酷不放纵 2024-11-16 23:54:39

我知道它是一个旧线程,但为了每个人都能理解,您也可以使用 pyembedded python 库来实现此目的。

pip install pyembedded

from pyembedded.gps_module.gps import GPS
import time    

gps = GPS(port='COM3', baud_rate=9600)

while True:
    print(gps.get_lat_long())
    time.sleep(1)

https://pypi.org/project/pyembedded/

I know its an old thread but just for everyone understanding, you can also use pyembedded python library for this.

pip install pyembedded

from pyembedded.gps_module.gps import GPS
import time    

gps = GPS(port='COM3', baud_rate=9600)

while True:
    print(gps.get_lat_long())
    time.sleep(1)

https://pypi.org/project/pyembedded/

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文