dnsPython 中的 TXT 记录

发布于 2024-09-11 21:59:14 字数 3811 浏览 9 评论 0原文

我已经实现了一个简单的 DNS 服务器。 它只是用 TXT 记录进行响应。我正在主持 脚本作为 example.com 的 NS 服务器。 NS服务器是xyzk它 当我发出类似以下内容时工作正常:

$ dig demo.example.com @xyzk

; <<>> DiG 9.3.6-P1-RedHat-9.3.6-4.P1.el5_4.1 <<>> demo.example.com @x.y.z.k
;; global options:  printcmd
;; Got answer:
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 10028
;; flags: qr rd ra; QUERY: 1, ANSWER: 1, AUTHORITY: 2, ADDITIONAL: 2

;; QUESTION SECTION:
;demo.example.com. IN A

;; ANSWER SECTION:
demo.example.com. 1000 IN TXT "test message"

但当我向其中一个提出完全相同的问题时它不起作用 公共名称服务器(例如 Sun 的 DNS,位于 4.2.2.1):

$ dig demo.example.com @4.2.2.1 (当我发出 $ dig demo.example 时,我得到同样的结果。 com @4.2.2.1 TXT

; <<>> DiG 9.3.6-P1-RedHat-9.3.6-4.P1.el5_4.1 <<>> demo.example.com
;; global options:  printcmd
;; Got answer:
;; ->>HEADER<<- opcode: QUERY, status: SERVFAIL, id: 10905
;; flags: qr rd ra; QUERY: 1, ANSWER: 0, AUTHORITY: 0, ADDITIONAL: 0

;; QUESTION SECTION:
;demo.example.com. IN A

;; Query time: 5837 msec
;; SERVER: 172.16.1.1#53(172.16.1.1)
;; WHEN: Tue Jul 20 04:01:27 2010
;; MSG SIZE  rcvd: 75

你们知道出了什么问题吗?有趣的是如果我改变 响应类型为 CNAME,而不是 TXT, 效果很好。

def dns_respond(resp, q, data_type):
    rrset = dns.rrset.from_text(q.name, 1000,
           dns.rdataclass.IN, dns.rdatatype.TXT, 'test message')
    resp.answer.append(rrset)
    return resp

def dns_ok(resp, q, data = None, msg = ''):
    return dns_respond(resp = resp, q = q, data_type = 'OK')

def dns_error(resp, q):
    return dns_respond(resp = resp, q = q, data_type = 'Error')

def requestHandler(address, message):
    resp = None
    message_id = ord(message[0]) * 256 + ord(message[1])
    logging.debug('msg id = ' + str(message_id))
    if message_id in serving_ids:
        # the request is already taken, drop this message
        logging.debug('I am already serving this request.')
        return
    serving_ids.append(message_id)
    msg = dns.message.from_wire(message)
    op = msg.opcode()
    if op == 0:
        # standard and inverse query
        qs = msg.question
        if len(qs) > 0:
            q = qs[0]
            logging.debug('request is ' + str(q))
            if q.rdtype == dns.rdatatype.A:
                resp = std_qry(msg)
            else:
                # not implemented
                #resp = std_qry(msg)
                resp = make_response(qry=msg, RCODE=4)   
    else:
        # not implemented
        resp = make_response(qry=msg, RCODE=4)   

    if resp:
        s.sendto(resp.to_wire(), address)

def std_qry(msg):
    qs = msg.question
    logging.debug(str(len(qs)) + ' questions.')

    answers = []
    nxdomain = False
    for q in qs:
        resp = make_response(qry=msg)
        logging.critical('Sending...')
        return dns_ok(resp, q)

def make_response(qry=None, id=None, RCODE=0):
    if qry is None and id is None:
        raise Exception, 'bad use of make_response'
    if qry is None:
        resp = dns.message.Message(id)
        # QR = 1
        resp.flags |= dns.flags.QR
        if RCODE != 1:
            raise Exception, 'bad use of make_response'
    else:
        resp = dns.message.make_response(qry)
    #resp.flags |= dns.flags.AA
    resp.flags |= dns.flags.RA
    resp.set_rcode(RCODE)
    return resp
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(('', 53))
logging.debug('binded to UDP port 53.')
serving_ids = []

while True:
    logging.debug('waiting requests.')
    message, address = s.recvfrom(1024)
    logging.debug('serving a request.')
    requestHandler(address, message)

I have implemented a simple DNS server.
It simply responds with a TXT record. I am hosting the
script as the NS server for example.com. The NS server is x.y.z.k. It
works fine when I issue something like:

$ dig demo.example.com @x.y.z.k

; <<>> DiG 9.3.6-P1-RedHat-9.3.6-4.P1.el5_4.1 <<>> demo.example.com @x.y.z.k
;; global options:  printcmd
;; Got answer:
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 10028
;; flags: qr rd ra; QUERY: 1, ANSWER: 1, AUTHORITY: 2, ADDITIONAL: 2

;; QUESTION SECTION:
;demo.example.com. IN A

;; ANSWER SECTION:
demo.example.com. 1000 IN TXT "test message"

but it does not work when I ask the very same question from one of the
public name servers (For example Sun's DNS at 4.2.2.1):

$ dig demo.example.com @4.2.2.1 (I get the same thing when I issue $ dig demo.example.com @4.2.2.1 TXT)

; <<>> DiG 9.3.6-P1-RedHat-9.3.6-4.P1.el5_4.1 <<>> demo.example.com
;; global options:  printcmd
;; Got answer:
;; ->>HEADER<<- opcode: QUERY, status: SERVFAIL, id: 10905
;; flags: qr rd ra; QUERY: 1, ANSWER: 0, AUTHORITY: 0, ADDITIONAL: 0

;; QUESTION SECTION:
;demo.example.com. IN A

;; Query time: 5837 msec
;; SERVER: 172.16.1.1#53(172.16.1.1)
;; WHEN: Tue Jul 20 04:01:27 2010
;; MSG SIZE  rcvd: 75

Do you guys have any idea what is wrong? Interestingly if I change
the response type to something like CNAME, instead of TXT,
it works fine.

def dns_respond(resp, q, data_type):
    rrset = dns.rrset.from_text(q.name, 1000,
           dns.rdataclass.IN, dns.rdatatype.TXT, 'test message')
    resp.answer.append(rrset)
    return resp

def dns_ok(resp, q, data = None, msg = ''):
    return dns_respond(resp = resp, q = q, data_type = 'OK')

def dns_error(resp, q):
    return dns_respond(resp = resp, q = q, data_type = 'Error')

def requestHandler(address, message):
    resp = None
    message_id = ord(message[0]) * 256 + ord(message[1])
    logging.debug('msg id = ' + str(message_id))
    if message_id in serving_ids:
        # the request is already taken, drop this message
        logging.debug('I am already serving this request.')
        return
    serving_ids.append(message_id)
    msg = dns.message.from_wire(message)
    op = msg.opcode()
    if op == 0:
        # standard and inverse query
        qs = msg.question
        if len(qs) > 0:
            q = qs[0]
            logging.debug('request is ' + str(q))
            if q.rdtype == dns.rdatatype.A:
                resp = std_qry(msg)
            else:
                # not implemented
                #resp = std_qry(msg)
                resp = make_response(qry=msg, RCODE=4)   
    else:
        # not implemented
        resp = make_response(qry=msg, RCODE=4)   

    if resp:
        s.sendto(resp.to_wire(), address)

def std_qry(msg):
    qs = msg.question
    logging.debug(str(len(qs)) + ' questions.')

    answers = []
    nxdomain = False
    for q in qs:
        resp = make_response(qry=msg)
        logging.critical('Sending...')
        return dns_ok(resp, q)

def make_response(qry=None, id=None, RCODE=0):
    if qry is None and id is None:
        raise Exception, 'bad use of make_response'
    if qry is None:
        resp = dns.message.Message(id)
        # QR = 1
        resp.flags |= dns.flags.QR
        if RCODE != 1:
            raise Exception, 'bad use of make_response'
    else:
        resp = dns.message.make_response(qry)
    #resp.flags |= dns.flags.AA
    resp.flags |= dns.flags.RA
    resp.set_rcode(RCODE)
    return resp
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(('', 53))
logging.debug('binded to UDP port 53.')
serving_ids = []

while True:
    logging.debug('waiting requests.')
    message, address = s.recvfrom(1024)
    logging.debug('serving a request.')
    requestHandler(address, message)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

迷途知返 2024-09-18 21:59:14

我可以看到一些真正的错误会阻止此工作:

  1. 您正在将 TXT 记录返回到 A 记录查询 - 这可能是表演的阻碍。
  2. 您应该返回 AA 标志(权威答案)而不是 RA
  3. 您的标题显示权威和附加部分中有两个答案,但
  4. 没有一旦实际提供服务,就不会从列表中删除提供服务的 ID

此外,还有一些与协议相关的问题需要修复:

  1. 您需要能够返回 NS 记录和 SOA< /code> 询问时记录,否则您的代表团可能无法可靠地工作
  2. 查询匹配应使用整个(源 IP、源端口、queryID)元组,而不仅仅是(queryID)
  3. 如果问题名称位于正确的域中,但不是正确的子域,您应该返回 NXDOMAIN (rcode = 3)
  4. 如果问题名称与正确的子域匹配,但记录类型错误,您应该返回 NOERROR (rcode = 0),而不是NOTIMPL
  5. 如果问题名称根本不是正确域的一部分,您应该返回 REFUSED (rcode = 5)

I can see a few real bugs that'll stop this working:

  1. You're returning a TXT record to an A record query - this is probably the show stopper.
  2. You should be returning the AA flag (authoritative answer) instead of RA
  3. Your headers say there are two answers in the authority and additional sections, but there aren't
  4. You're not removing served IDs from the list once they've actually been served

In addition there are a few protocol-related issues that need fixing:

  1. You need to be able to return NS records and SOA records when asked, or your delegation may not work reliably
  2. Query matching should use the whole (source IP, source port, queryID) tuple into account, not just (queryID)
  3. If the question name is in the right domain, but not the right subdomain you should return NXDOMAIN (rcode = 3)
  4. If the question name matches the right subdomain but the wrong record type you should return NOERROR (rcode = 0) instead of NOTIMPL
  5. If the question name isn't a part of the right domain at all you should return REFUSED (rcode = 5)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文