Advertisement

使用C++编写一个DHT爬虫,实现从DHT网络爬取BT种子_c++实现dht

阅读量:

分布式哈希表(DHT)是一种基于节点架构的数据存储方案,在不依赖中心服务器的情况下实现数据分层存储和高效检索。系统中每个客户端会负责管理本地的一小部分网络路由,并存储一小份数据集合,在此过程中完成整个系统中的数据寻址与存储功能。其中BitTorrent客户端会动态加入一个DHT节点,并通过该节点与其他DHT节点通信以获取peers的信息。
简单来说,
peers是运行BitTorrent协议并监听TCP端口的客户端或服务器,

nodes是运行DHT协议并监听UDP端口的客户端或服务器。

2.3、Kademlia算法

Kademlia是一种基于DHT的技术实现方案。在Kademlia网络中,默认的距离计算采用异或运算(XOR),结果为无符号整数类型。其中,默认值定义为distance(A, B) = |A xor B|(即|A ^ B|),其中数值较小则表示两个节点之间的距离较近。(注:此处"两个"可替换为"两者"以增强表述))
其中,默认值定义为distance(A, B) = |A xor B|(即|A ^ B|),其中数值较小则表示两个节点之间的距离较近。(注:此处"两个"可替换为"两者"以增强表述)

2.4、KRPC协议

KRPC是一种用于节点间通信的协议,在其架构中采用了B编码实现了简单的RPC通信模式。
该协议通过UDP报文进行数据传输,并设计了一个消息队列机制来管理请求与响应的关系。
值得注意的是该协议不具备重传机制。

2.5、MagNet协议

Magnet Link协议是一种基于哈希算法的技术,在其体系结构中包含一个长度为20字节的哈希值。在P2P网络环境中,
参与者利用磁力链接机制获取资源的种子文件,
并依据这些种子文件进行资源下载。

3、BT下载的一些概念梳理

3.1、BT软件下载原理

该软件采用DHT协议作为基础,在分布式哈希图中实现数据分层传播机制。当系统探测到目标节点后,会立即启动BitTorrent协议进行数据分块下载,并基于获取到的种子信息完成剩余文件的获取。

3.2、迅雷获取种子的速度为什么那么快

理论上讲,在使用BT软件时需要先在DHT网络中搜索种子这一过程所需的时间是固定的。因此实现所有资源快速响应几乎是不可能的。然而迅雷能够如此快速的原因主要原因在于其维护了一个自己的种子库,并非仅仅依赖于其他用户的共享资源。

3.3、资源时效性问题

当DHT网络上所有参与该资源分享的用户(peer)均退出后...

3.4、好用的BT软件

基于BT技术的核心原理是将文件分解为多个块并通过网络分发这些块,并结合高效的多线程传输机制实现数据同步与负载均衡。因此,在一般情况下下载速度差异可能并不显著(除非某些特定类型的网络环境或服务器缓存机制影响)。广为人知的一些BT客户端包括迅雷、“uTorrent”(即Qobos)、qBittorrent、“BitTorrent Star”、“BitComet”以及Aria2等工具。如果需要更多详细信息或下载这些客户端软件,请访问百度或其他相关资源获取。

3.5、有没有已经编写好的DHT爬虫

于是乎就想看看有没有人已经用比较简单的方式实现了DHT爬虫

4、使用C++编写DHT爬虫

4.1、实现原理

以DHT节点的身份融入DHT网络并收集相关信息。通过爬取get_PEER与announcement_PEER这两种请求的数据来获取相关资源。当服务器收到这些特定类型的请求时,请立即启动相应的处理流程。采用BitTorrent协议从请求源处下载对应的种子数据,并注意该过程的成功概率较低,请确保系统具备足够的资源支持以完成任务。在实际操作过程中发现存在诸多挑战,请及时参考众多开源项目的实践案例进行学习与优化。大多数情况下都是通过访问特定的三个关键节点来完成新节点的接入。

域名 端口
router.utorrent.com 6881
router.bittorrent.com 6881
dht.transmissionbt.com 6881

4.2、实现DHT协议

4.2.1、创建UDP服务

搭建一个基于UDP协议的服务框架,并将其配置服务器运行在指定IP地址和6881端口上;其中DHT为默认配置的通信端口,请注意该服务支持用户根据需求进行端口的自定义设置;理论上支持所有合法的网络通信端口号

4.2.2、加入DHT网络

利用发送请求到这些节点上以加入DHT网络,从而以便获得这些节点的信息

复制代码
    void DhtSearch::ping\_root()
    {
    std::vector<std::pair<const char\*, const char\*>> ip_addr = 
    {
        {"router.utorrent.com",    "6881"},
        {"router.bittorrent.com",  "6881"},
        {"dht.transmissionbt.com", "6881"}
    };
    
    for (auto addr : ip_addr)
    {
        struct addrinfo hints, \*info;
        memset(&hints, 0, sizeof(hints));
        hints.ai_socktype = SOCK_DGRAM;
        hints.ai_family = AF_UNSPEC;
    
        int error = getaddrinfo(addr.first, addr.second, &hints, &info);
        if (error)
        {
            log_error << "getaddrinfo fail, error=" << error << ", errstr=" << gai\_strerror(error);
        }
        else
        {
            struct addrinfo\* p = info;
            while (p)
            {
                if (p->ai_family == AF_INET)
                {
                    send\_ping((struct sockaddr_in\*)p->ai_addr, "");
                    log_debug << addr.first << ":" << addr.second << " is AF\_INET";
                }
                else
                {
                    log_debug << addr.first << ":" << addr.second << " is no support the family(" << p->ai_family << ")";
                }
    
                p = p->ai_next;
            }
            freeaddrinfo(info);
        }
    }
    }
4.2.3、报文解析

当其他节点发送报文时(接收到),系统会立即对接收到的报文进行解析(处理)。在DHT网络中各节点之间的通信遵循B编码规范(格式),因此建议参考文章《[B编码与BT种子文件分析,以及模仿json-cpp写一个B编码解析器》》以深入理解相关机制。随后,请查看以下代码段以获取详细的解析逻辑:

复制代码
    // private
    int DhtSearch::parse(const char\* buf, int len, std::string& tid, std::string& id,
                   std::string& info_hash, unsigned short& port, std::string& nodes)
    {
    #define XX(str) \
     log\_error << str; \
     return -1
    
    int ret;
    BEncode::Value root;
    size_t start = 0;
    if (BEncode::decode(buf, start, len, &root) || root.getType() != BEncode::Value::BCODE_DICTIONARY)
    {
        XX("bencode message is invalid");
    }
    
    // tid(始终在顶层)
    {
        auto value = root.find("t");
        if (value != root.end())
        {
            if (value->getType() != BEncode::Value::BCODE_STRING)
            {
                XX("\"t\" value is must be string");
            }
            tid = value->asString();
        }
    }
    
    // y(始终在顶层)
    auto type_y = root.find("y");
    if (type_y != root.end() && type_y->getType() == BEncode::Value::BCODE_STRING)
    {
        std::string value = type_y->asString();
        if (value == "r")
            ret = REPLY;
        else if (value == "e")
        {
            XX("remote reply ERROR value");
        }
        else if (value == "q")
        {
            auto type_q = root.find("q");
            if (type_q != root.end() && type_q->getType() == BEncode::Value::BCODE_STRING)
            {
                std::string v = type_q->asString();
                if (v == "ping")
                    ret = PING;
                else if (v == "find\_node")
                    ret = FIND_NODE;
                else if (v == "get\_peers")
                    ret = GET_PEERS;
                else if (v == "announce\_peer")
                    ret = ANNOUNCE_PEER;
                else if (v == "vote" || v == "sample\_infohashes")
                    return -1;
                else
                {
                    XX("\"q\" value(" + v + ") is invaild");
                }
            }
            else
            {
                XX("not found \"q\" value");
            }
        }
        else
        {
            XX("\"y\" value(" + value + ") is invaild");
        }
    }
    else
    {
        XX("not found \"y\" value");
    }
    
    BEncode::Value::iterator body_value;
    if (ret == REPLY)
    {
        body_value = root.find("r");
        if (body_value == root.end() || body_value->getType() != BEncode::Value::BCODE_DICTIONARY)
        {
            XX("not found \"r\" value");
        }
    }
    else
    {
        body_value = root.find("a");
        if (body_value == root.end() || body_value->getType() != BEncode::Value::BCODE_DICTIONARY)
        {
            XX("not found \"a\" value");
        }
    }
    
    // id
    {
        auto value = body_value->find("id");
        if (value != body_value->end())
        {
            if (value->getType() != BEncode::Value::BCODE_STRING)
            {
                XX("\"id\" value is must be string");
            }
            id = value->asString();
            if (id.size() != 20)
                id.clear();
        }
        else
            id.clear();
    }
    
    // info\_hash
    {
        auto value = body_value->find("info\_hash");
        if (value != body_value->end())
        {
            if (value->getType() != BEncode::Value::BCODE_STRING)
            {
                XX("\"info\_hash\" value is must be string");
            }
            info_hash = value->asString();
            if (info_hash.size() != 20)
                info_hash.clear();
        }
        else
            info_hash.clear();
    }
    
    // port
    {
        auto value = body_value->find("port");
        if (value != body_value->end())
        {
            if (value->getType() != BEncode::Value::BCODE_INTEGER)
            {
                XX("\"port\" value is must be int");
            }
            port = (unsigned short)(value->asInt());
        }
        else
            port = 0;
    }
    
    // nodes
    {
        auto value = body_value->find("nodes");
        if (value != body_value->end())
        {
            if (value->getType() != BEncode::Value::BCODE_STRING)
            {
                XX("\"nodes\" value is must be string");
            }
            nodes = value->asString();
        }
        else
            nodes.clear();
    }
    return ret;
    
    #undef XX
    }
4.2.4、对不同类型报文进行处理、回复

当解析完成时,在报文有效的情况下执行后续操作。因为我们只需获取他人的种子信息而不主动发起查询请求,并不需要全面实现DHT协议即可完成任务——即无需缓存其他节点的数据信息。对于他人的请求中有效的部分予以接收;而对于无效的请求则返回一些虚假的信息给响应方。通过这种欺骗性手段或数据窃取的方式可以使构建爬虫程序所需的复杂度大幅降低,请务必看完这篇文章《[DHT协议介绍]》以便更好地理解后续内容

请求类型 回复方法
PING 直接按标准格式回复PONG就行
FIND_NODE 由于我们并没有缓存其他节点信息,来我们这里查找节点是不可能做到的,所以返回一个空的节点列表给它
GET_PEERS 这个对于我们是有用的,我们要通过GET_PEERS请求的发起者来下载种子文件,但是由于我们既没有缓存节点,也没有缓存peer,所以回复它一个空列表
ANNOUNCE_PEER 和GET_PEERS处理方式一样
REPLY 由于我们始终没有在主动查询任何资源,所以基本不太可能受到回复,收到的话检测报文中有没有nodes,有的话把里面的节点拿出来ping一遍,加入到更多的网络之中
4.2.5、隐藏自己,防止被其他节点拉进黑名单

在整个过程中涉及欺骗其他节点的行为相当普遍, 因此在每次向他人发送错误信息时建议修改自己的node ID, 以避免让其他节点将其加入黑名单

4.2.6、获取info_hash和peer

依据GET\_PEERSANNOUNCE_PEER消息中提取的信息以及客户端的地址信息即可启动BitTorrent协议以获取种子数据。(此时将客户端视为网络中的另一个参与者可能会导致较高的失败几率因为在这种情况下客户端实际上可能仅处于资源搜索阶段而并非拥有完整的资源进行下载)

4.3、实现BitTorrent协议

为了实现BitTorrent协议的运行, 必须先认真阅读并完整阅读下面两篇官方文档: http://www.bittorrent.org/beps/bep_0009.htmlhttp://www.bittorrent.org/beps/bep_0010.html. 其中关于文件共享机制的介绍极为简洁, 建议全面阅读.

4.3.1、HandShake(握手)

bep_0010协议文档中可以看出,在握手报文中所使用的消息格式由以下几个部分构成:首先是以19的ASCII码开头;接着是BitTorrent protocol这一特定协议标识;随后跟着的是\x00\x00\x00\x00\x00\x10\x00\x4C八个字节的数据;之后是经过十六进制解码得到的信息哈希值;最后是一个二十字节长的节点标识符(nodeid)。这里需要注意的是,在BitTorrent protocol协议中除了用于发送握手消息外的所有其他类型的消息前四个字节均表示该消息的实际长度(不包含长度字段)。当接收端收到一条握手消息后会返回至少68个字符的回复信息(之所以说是至少是因为后续扩展握手部分会进行详细说明),而判断对方是否已接受本方发送的握手信息可以通过检查回复信息第25位和第27位的状态来确定(这一判断依据的具体原因并未深入探究,在经过测试验证后发现该方法确实有效)

复制代码
    // 握手
    std::string handshake_message;
    handshake_message.resize(28);
    handshake_message[0] = 19;
    memcpy(&handshake_message[1], "BitTorrent protocol", 19);
    char ext[8];
    memset(ext, 0x00, sizeof(ext));
    ext[5] = 0x10;
    ext[7] = 0x04;
    memcpy(&handshake_message[20], ext, 8);
    handshake_message += m_info_hash + m_node_id;
    m_sock->send(&handshake_message[0], handshake_message.size());
    int len = m_sock->recv(buf, BUF_LEN);
    if (len < 68)
    {
        log_debug << COMMON_PART << "(handshake) message size=" << len
            << " is too short(must be >= 68)";
        delete buf;
        return false;
    }
    std::string handshake\_reply(buf, 68);
    std::string ext_message;
    if (len > 68)
        ext_message = std::string(buf + 68, len - 68);
    if (handshake_reply.substr(0, 20) != handshake_message.substr(0, 20))
    {
        log_debug << COMMON_PART << "(handshake) protocol fail, message:"
            << std::endl << dump(handshake_reply);
        delete buf;
        return false;
    }
    if ((int)handshake_reply[25] & 0x10 == 0)
    {
        log_debug << COMMON_PART << "(handshake) peer does not support extension protocol, message:"
            << std::endl << dump(handshake_reply);
        delete buf;
        return false;
    }
    if ((int)handshake_reply[27] & 0x04 == 0)
    {
        log_debug << COMMON_PART << "(handshake) peer does not support fast protocol, message:"
            << std::endl << dump(handshake_reply);
        delete buf;
        return false;
    }

下面是请求报文示例

在这里插入图片描述

以下是响应报文的一个示例,请各位自行计算一下,在从第四行第7个字节(即具体位置)开始一直到末尾的部分的长度确实达到了68以上。

在这里插入图片描述
4.3.2、Extend HandShake(扩展握手)

bep_0010中可以看到,握手之后就要进行扩展握手了,而扩展握手是至关重要的,报文消息格式为:消息长度 + MSG_ID的ASCII + EXTEND_ID的ASCII + B编码的字典{‘m’:{‘ut_metadata’:1}}
其中MSG_ID为20,由于是扩展握手,EXTEND_ID是0,完成之后,peer的响应报文里面会包含了两个我们下一步用得到的键值:ut_metadata、和metadata_size,这两个非常重要,拿到之后要找个变量存起来

注意事项:原本协议规定握手和扩展握手应分两步完成,在实际测试中发现部分node会在首次握手时发送全部数据包,并附带了扩展阶段应有的响应信息;但由于这一过程可能导致部分数据未完整传输,在编写代码初期并不了解这一细节会导致多次失败;后通过捕获网络流量才意识到这一问题;因此在最终实现时必须采取如下措施:即在首次握手后判断接收到的数据包大小;当接收到的数据包大小超过68个字节时将多余内容暂存下来随后启动扩展握手流程;完成扩展阶段后需将临时存储的内容与已接收的手势信息合并即可获得完整的扩展阶段数据。

代码实现如下:

复制代码
    // 扩展握手
    std::string ext_handshake_message;
    ext_handshake_message.append(1, 20);
    ext_handshake_message.append(1, 0);
    ext_handshake_message += "d1:md11:ut\_metadatai2ee1:v" + std::to\_string(m_v.size()) + ":" + m_v + "e";
    std::string ext_handshake_message_size_str;
    ext_handshake_message_size_str.resize(4);
    uint32\_t ext_handshake_message_size = ext_handshake_message.size();
    ext_handshake_message_size = littleByteSwap(ext_handshake_message_size);
    memcpy(&ext_handshake_message_size_str[0], &ext_handshake_message_size, 4);
    ext_handshake_message = ext_handshake_message_size_str + ext_handshake_message;
    m_sock->send(&ext_handshake_message[0], ext_handshake_message.size());
    len = 0;
    while (1)
    {
        int cur_len = m_sock->recv(buf + len, BUF_LEN - len);
        if (cur_len <= 0)
            break;
        len += cur_len;
        if (len >= BUF_LEN)
            break;
    }
    std::string ext_reply;
    if (len > 0)
        ext_reply = ext_message + std::string(buf, len);
    else if (!ext_message.empty())
        ext_reply = ext_message;
    else
    {
        log_debug << COMMON_PART << "(ext handshake) fail";
        delete buf;
        return false;
    }
    // 摘取数据
    // ut\_metadata
    size_t pos = ext_reply.find("ut\_metadata");
    if (pos == std::string::npos)
    {
        log_debug << COMMON_PART << "(ext handshake) parse ut\_metadata fail, message:"
            << std::endl << dump(ext_reply);
        delete buf;
        return false;
    }
    pos += 12;
    size_t pos_e = ext_reply.find("e", pos);
    if (pos_e == std::string::npos)
    {
        log_debug << COMMON_PART << "(ext handshake) parse ut\_metadata fail, message:"
            << std::endl << dump(ext_reply);
        delete buf;
        return false;
    }
    std::string ut_metadata_str = ext_reply.substr(pos, pos_e - pos);
    uint32\_t ut_metadata = atoi(ut_metadata_str.c\_str());
    
    // metadata\_size
    pos = ext_reply.find("metadata\_size");
    if (pos == std::string::npos)
    {
        log_debug << COMMON_PART << "(ext handshake) parse metadata\_size fail, message:"
            << std::endl << dump(ext_reply);
        delete buf;
        return false;
    }
    pos += 14;
    pos_e = ext_reply.find("e", pos);
    if (pos_e == std::string::npos)
    {
        log_debug << COMMON_PART << "(ext handshake) parse metadata\_size fail, message:"
            << std::endl << dump(ext_reply);
        delete buf;
        return false;
    }
    std::string metadata_size_str = ext_reply.substr(pos, pos_e - pos);
    int64\_t metadata_size = atoll(metadata_size_str.c\_str());

下面是请求报文示例

在这里插入图片描述

下面是正常响应报文示例

在这里插入图片描述

下述需要对响应报文进行拼接(与握手后续内容相连接),可以看出这些报文不以字母d开头(因为按照B编码规则设计的报文应当使用字母d作为标识符来表示整体作为一个对象)。

在这里插入图片描述
4.3.3、获取metadata

在握手完成并接收到了ut_metadata及metadata_size后就可以开始下载了。我们需要这两个参数的原因在于请求格式是由四个部分组成:消息长度、MSG_ID以ASCII编码表示、ut_metadata同样以ASCII形式存在以及使用B编码定义的一个字典{‘msg_type’:0,‘piece’:piece}。这里MSG_ID已知是20,在协议规定下 ut_metadata 必须设置为2(否则peers不会返回响应)。而 piece 值则表示分片标记,在 protocol 规定下每个 piece 块的数据量是16KB等于...

复制代码
    std::string data;
    int piece = 0;
    while (metadata_size > 0)
    {
        std::string get_metadata_message;
        get_metadata_message.append(1, 20);
        get_metadata_message.append(1, 2);
        get_metadata_message += "d8:msg\_typei0e5:piecei" + std::to\_string(piece) + "ee";
        std::string get_metadata_message_size_str;
        get_metadata_message_size_str.resize(4);
        uint32\_t get_metadata_message_size = get_metadata_message.size();
        get_metadata_message_size = littleByteSwap(get_metadata_message_size);
        memcpy(&get_metadata_message_size_str[0], &get_metadata_message_size, 4);
        get_metadata_message = get_metadata_message_size_str + get_metadata_message;
        m_sock->send(&get_metadata_message[0], get_metadata_message.size());
        len = 0;
        while (1)
        {
            int cur_len = m_sock->recv(buf + len, BUF_LEN - len);
            if (cur_len <= 0)
                break;
            len += cur_len;
            if (len >= BUF_LEN)
                break;
        }

全部评论 (0)

还没有任何评论哟~