Advertisement

DORIS 文件读取(研究中)

阅读量:

前言

此文档旨在记录研究doris 1.1.1版本的后端文件读取流程。目前仅涵盖部分流程步骤。正在逐步完善中。

文章目录

先说说Doris的数据存储结构

  1. tablet 实际上存储于 be 的物理架构中,并且通常包含多个副本以保证数据冗余性。
  2. rowset 实际上是对 tablet 下的数据集合执行的一次变更操作。这种变更包括但不限于数据导入、删除及更新等基本操作。一个 tablet 可能拥有多个 rowset,在 Doris 系统中会根据需求对相邻版本的 rowset 进行 compaction 合并处理;而对于 group 或 unique 数据模型,则会相应地执行聚合操作。
  3. segment 可以视为 rowset 下的一个独立数据段,在文件层次上保存当前 segment 的具体列数据及相关索引信息。
  4. column 实际上构成了 segment 中的数据列式组织结构,在此架构下包含了 data 和 index 类型的内容。
  5. page 实际上将列数据划分为若干独立的分块存储单元,并根据需要区分 data page 和 index page 两种类型。

1. BE端OlapScanNode文件读取逻辑

文件目录 doris/be/src/exec/olap_scan_node.cpp

启动文件读取过程的状态信息由OlapScanNode::scan_thread()函数执行,并接收来自RuntimeState* state的状态更新

复制代码
 Status OlapScanNode::start_scan_thread(RuntimeState* state) {

    
     if (_scan_ranges.empty()) {
    
     _transfer_done = true;
    
     return Status::OK();
    
     }
    
  
    
     // ranges constructed from scan keys
    
     std::vector<std::unique_ptr<OlapScanRange>> cond_ranges;
    
     RETURN_IF_ERROR(_scan_keys.get_key_range(&cond_ranges));
    
     // if we can't get ranges from conditions, we give it a total range
    
     if (cond_ranges.empty()) {
    
     cond_ranges.emplace_back(new OlapScanRange());
    
     }
    
  
    
     bool need_split = true;
    
     // If we have ranges more than 64, there is no need to call
    
     // ShowHint to split ranges
    
     if (limit() != -1 || cond_ranges.size() > 64) {
    
     need_split = false;
    
     }
    
  
    
     int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size());
    
     std::unordered_set<std::string> disk_set;
    
     for (auto& scan_range : _scan_ranges) {
    
     auto tablet_id = scan_range->tablet_id;
    
     int32_t schema_hash = strtoul(scan_range->schema_hash.c_str(), nullptr, 10);
    
     std::string err;
    
     TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(
    
             tablet_id, schema_hash, true, &err);
    
     if (tablet == nullptr) {
    
         std::stringstream ss;
    
         ss << "failed to get tablet: " << tablet_id << " with schema hash: " << schema_hash
    
            << ", reason: " << err;
    
         LOG(WARNING) << ss.str();
    
         return Status::InternalError(ss.str());
    
     }
    
     std::vector<std::unique_ptr<OlapScanRange>>* ranges = &cond_ranges;
    
     std::vector<std::unique_ptr<OlapScanRange>> split_ranges;
    
     if (need_split && !tablet->all_beta()) {
    
         auto st = get_hints(tablet, *scan_range, config::doris_scan_range_row_count,
    
                             _scan_keys.begin_include(), _scan_keys.end_include(), cond_ranges,
    
                             &split_ranges, _runtime_profile.get());
    
         if (st.ok()) {
    
             ranges = &split_ranges;
    
         }
    
     }
    
     // In order to avoid the problem of too many scanners caused by small tablets,
    
     // in addition to scanRange, we also need to consider the size of the tablet when
    
     // creating the scanner. One scanner is used for every 1Gb, and the final scanner_per_tablet
    
     // takes the minimum value calculated by scanrange and size.
    
     int size_based_scanners_per_tablet = 1;
    
     if (config::doris_scan_range_max_mb > 0) {
    
         size_based_scanners_per_tablet = std::max(
    
                 1, (int)tablet->tablet_footprint() / config::doris_scan_range_max_mb << 20);
    
     }
    
     int ranges_per_scanner =
    
             std::max(1, (int)ranges->size() /
    
                                 std::min(scanners_per_tablet, size_based_scanners_per_tablet));
    
     int num_ranges = ranges->size();
    
     for (int i = 0; i < num_ranges;) {
    
         std::vector<OlapScanRange*> scanner_ranges;
    
         scanner_ranges.push_back((*ranges)[i].get());
    
         ++i;
    
         for (int j = 1; i < num_ranges && j < ranges_per_scanner &&
    
                         (*ranges)[i]->end_include == (*ranges)[i - 1]->end_include;
    
              ++j, ++i) {
    
             scanner_ranges.push_back((*ranges)[i].get());
    
         }
    
         OlapScanner* scanner = new OlapScanner(state, this, _olap_scan_node.is_preaggregation,
    
                                                _need_agg_finalize, *scan_range);
    
         scanner->set_batch_size(_batch_size);
    
         // add scanner to pool before doing prepare.
    
         // so that scanner can be automatically deconstructed if prepare failed.
    
         _scanner_pool.add(scanner);
    
         RETURN_IF_ERROR(scanner->prepare(*scan_range, scanner_ranges, _olap_filter,
    
                                          _bloom_filters_push_down));
    
  
    
         _olap_scanners.push_back(scanner);
    
         disk_set.insert(scanner->scan_disk());
    
     }
    
     }
    
     COUNTER_SET(_num_disks_accessed_counter, static_cast<int64_t>(disk_set.size()));
    
     COUNTER_SET(_num_scanners, static_cast<int64_t>(_olap_scanners.size()));
    
  
    
     // PAIN_LOG(_olap_scanners.size());
    
     // init progress
    
     std::stringstream ss;
    
     ss << "ScanThread complete (node=" << id() << "):";
    
     _progress = ProgressUpdater(ss.str(), _olap_scanners.size(), 1);
    
  
    
     _transfer_thread = std::make_shared<std::thread>(&OlapScanNode::transfer_thread, this, state);
    
  
    
     return Status::OK();
    
 }

其中以下部分依次获取tablet元数据信息,并且这些tablet的元数据信息均存放在Rocksdb中

复制代码
  TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(

    
             tablet_id, schema_hash, true, &err);

再然后创建真正的数据scanner

复制代码
 OlapScanner* scanner = new OlapScanner(state, this, _olap_scan_node.is_preaggregation,

    
                                                _need_agg_finalize, *scan_range);

其中OlapScanner在的prepare函数创建Rowset

复制代码
 Status OlapScanner::prepare(

    
     const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& key_ranges,
    
     const std::vector<TCondition>& filters,
    
     const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>&
    
             bloom_filters)

其中prepare函数中的下面这么代码会真正的创建Rowset

复制代码
 OLAPStatus acquire_reader_st =

    
                 _tablet->capture_rs_readers(rd_version, &_tablet_reader_params.rs_readers, _mem_tracker);

再然后是tablet类中的函数调用

复制代码
 OLAPStatus Tablet::capture_rs_readers(const std::vector<Version>& version_path,

    
                                   std::vector<RowsetReaderSharedPtr>* rs_readers,
    
                                   std::shared_ptr<MemTracker> parent_tracker)

2. in-memory属性在BE端产生的效果

该段落旨在阐述DORIS BE端如何实现对in-memory属性的数据缓存机制。其原理是BE端在读取数据时采用LRU缓存策略。具体而言,在读取操作开始时

doris/be/src/olap/rowset/segment_v2/segment_iterator.cpp

复制代码
 Status SegmentIterator::_read_columns(const std::vector<ColumnId>& column_ids, RowBlockV2* block,

    
                                   size_t row_offset, size_t nrows) {
    
     for (auto cid : column_ids) {
    
     auto column_block = block->column_block(cid);
    
     ColumnBlockView dst(&column_block, row_offset);
    
     size_t rows_read = nrows;
    
     RETURN_IF_ERROR(_column_iterators[cid]->next_batch(&rows_read, &dst));
    
     DCHECK_EQ(nrows, rows_read);
    
     }
    
     return Status::OK();
    
 }

读取page数据

doris/be/src/olap/rowset/segment_v2/column_reader.cpp

复制代码
 Status ColumnReader::read_page(const ColumnIteratorOptions& iter_opts, const PagePointer& pp,

    
                            PageHandle* handle, Slice* page_body, PageFooterPB* footer,
    
                            BlockCompressionCodec* codec) {
    
     iter_opts.sanity_check();
    
     PageReadOptions opts;
    
     opts.rblock = iter_opts.rblock;
    
     opts.page_pointer = pp;
    
     opts.codec = codec;
    
     opts.stats = iter_opts.stats;
    
     opts.verify_checksum = _opts.verify_checksum;
    
     opts.use_page_cache = iter_opts.use_page_cache;
    
     opts.kept_in_memory = _opts.kept_in_memory;
    
     opts.type = iter_opts.type;
    
     opts.encoding_info = _encoding_info;
    
  
    
     return PageIO::read_and_decompress_page(opts, handle, page_body, footer);
    
 }

其中决定是否启用in-memory从内存读取的关键是,其中的

复制代码
复制代码
 Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle* handle,

    
                                     Slice* body, PageFooterPB* footer) {
    
     opts.sanity_check();
    
     opts.stats->total_pages_num++;
    
  
    
     auto cache = StoragePageCache::instance();
    
     PageCacheHandle cache_handle;
    
     StoragePageCache::CacheKey cache_key(opts.rblock->path_desc().filepath, opts.page_pointer.offset);
    
     if (opts.use_page_cache && cache->is_cache_available(opts.type) && cache->lookup(cache_key, &cache_handle, opts.type)) {
    
     // we find page in cache, use it
    
     *handle = PageHandle(std::move(cache_handle));
    
     opts.stats->cached_pages_num++;
    
     // parse body and footer
    
     Slice page_slice = handle->data();
    
     uint32_t footer_size = decode_fixed32_le((uint8_t*)page_slice.data + page_slice.size - 4);
    
     std::string footer_buf(page_slice.data + page_slice.size - 4 - footer_size, footer_size);
    
     if (!footer->ParseFromString(footer_buf)) {
    
         return Status::Corruption("Bad page: invalid footer");
    
     }
    
     *body = Slice(page_slice.data, page_slice.size - 4 - footer_size);
    
     return Status::OK();
    
     }
    
  
    
     // every page contains 4 bytes footer length and 4 bytes checksum
    
     const uint32_t page_size = opts.page_pointer.size;
    
     if (page_size < 8) {
    
     return Status::Corruption(strings::Substitute("Bad page: too small size ($0)", page_size));
    
     }
    
  
    
     // hold compressed page at first, reset to decompressed page later
    
     std::unique_ptr<char[]> page(new char[page_size]);
    
     Slice page_slice(page.get(), page_size);
    
     {
    
     SCOPED_RAW_TIMER(&opts.stats->io_ns);
    
     RETURN_IF_ERROR(opts.rblock->read(opts.page_pointer.offset, page_slice));
    
     opts.stats->compressed_bytes_read += page_size;
    
     }
    
  
    
     if (opts.verify_checksum) {
    
     uint32_t expect = decode_fixed32_le((uint8_t*)page_slice.data + page_slice.size - 4);
    
     uint32_t actual = crc32c::Value(page_slice.data, page_slice.size - 4);
    
     if (expect != actual) {
    
         return Status::Corruption(strings::Substitute(
    
                 "Bad page: checksum mismatch (actual=$0 vs expect=$1)", actual, expect));
    
     }
    
     }
    
  
    
     // remove checksum suffix
    
     page_slice.size -= 4;
    
     // parse and set footer
    
     uint32_t footer_size = decode_fixed32_le((uint8_t*)page_slice.data + page_slice.size - 4);
    
     if (!footer->ParseFromArray(page_slice.data + page_slice.size - 4 - footer_size, footer_size)) {
    
     return Status::Corruption("Bad page: invalid footer");
    
     }
    
  
    
     uint32_t body_size = page_slice.size - 4 - footer_size;
    
     if (body_size != footer->uncompressed_size()) { // need decompress body
    
     if (opts.codec == nullptr) {
    
         return Status::Corruption("Bad page: page is compressed but codec is NO_COMPRESSION");
    
     }
    
     SCOPED_RAW_TIMER(&opts.stats->decompress_ns);
    
     std::unique_ptr<char[]> decompressed_page(
    
             new char[footer->uncompressed_size() + footer_size + 4]);
    
  
    
     // decompress page body
    
     Slice compressed_body(page_slice.data, body_size);
    
     Slice decompressed_body(decompressed_page.get(), footer->uncompressed_size());
    
     RETURN_IF_ERROR(opts.codec->decompress(compressed_body, &decompressed_body));
    
     if (decompressed_body.size != footer->uncompressed_size()) {
    
         return Status::Corruption(strings::Substitute(
    
                 "Bad page: record uncompressed size=$0 vs real decompressed size=$1",
    
                 footer->uncompressed_size(), decompressed_body.size));
    
     }
    
     // append footer and footer size
    
     memcpy(decompressed_body.data + decompressed_body.size, page_slice.data + body_size,
    
            footer_size + 4);
    
     // free memory of compressed page
    
     page = std::move(decompressed_page);
    
     page_slice = Slice(page.get(), footer->uncompressed_size() + footer_size + 4);
    
     opts.stats->uncompressed_bytes_read += page_slice.size;
    
     } else {
    
     opts.stats->uncompressed_bytes_read += body_size;
    
     }
    
  
    
     if (opts.encoding_info) {
    
     auto* pre_decoder = opts.encoding_info->get_data_page_pre_decoder();
    
     if (pre_decoder) {
    
         RETURN_IF_ERROR(pre_decoder->decode(
    
                 &page, &page_slice,
    
                 footer->data_page_footer().nullmap_size() + footer_size + 4));
    
     }
    
     }
    
  
    
     *body = Slice(page_slice.data, page_slice.size - 4 - footer_size);
    
     if (opts.use_page_cache && cache->is_cache_available(opts.type)) {
    
     // insert this page into cache and return the cache handle
    
     cache->insert(cache_key, page_slice, &cache_handle, opts.type, opts.kept_in_memory);
    
     *handle = PageHandle(std::move(cache_handle));
    
     } else {
    
     *handle = PageHandle(page_slice);
    
     }
    
     page.release(); // memory now managed by handle
    
     return Status::OK();
    
 }

3 rowset与segment

复制代码
复制代码
 OLAPStatus BetaRowsetReader::init(RowsetReaderContext* read_context) {

    
  
    
  // load segments
    
     RETURN_NOT_OK(SegmentLoader::instance()->load_segments(
    
         _rowset, &_segment_cache_handle, read_context->reader_type == ReaderType::READER_QUERY));
    
  
    
 }

Betarowset -> create_reader(){

复制代码

}

参考文档:

1

2 https://www.jianshu.com/p/141ad958832d

3 [新一代列式存储格式 Parquet_教练 我想要踢球的博客-博客 Parquet]( "新一代列式存储格式 Parquet_教练 我想要踢球的博客-浏览器 Parquet")

Doris全面解析

Doris全面解析

Doris全面解析

5 该方法的核心原理_doris-长流影_长流影博客

Doris

Doris

7 Doris Stream Load原理解析 - 墨天轮

全部评论 (0)

还没有任何评论哟~