Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit warmup memory usage #5524

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
705 changes: 473 additions & 232 deletions quickwit/Cargo.lock

Large diffs are not rendered by default.

12 changes: 11 additions & 1 deletion quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,12 @@ pub struct SearcherConfig {
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub storage_timeout_policy: Option<StorageTimeoutPolicy>,

// TODO validate that `warmup_memory_budget` is greater than `warmup_single_split_initial_allocation`
// TODO set serde default
pub warmup_memory_budget: ByteSize,
// TODO set serde default
pub warmup_single_split_initial_allocation: ByteSize,
}

/// Configuration controlling how fast a searcher should timeout a `get_slice`
Expand Down Expand Up @@ -263,7 +269,7 @@ impl StorageTimeoutPolicy {

impl Default for SearcherConfig {
fn default() -> Self {
Self {
SearcherConfig {
fast_field_cache_capacity: ByteSize::gb(1),
split_footer_cache_capacity: ByteSize::mb(500),
partial_request_cache_capacity: ByteSize::mb(64),
Expand All @@ -274,6 +280,10 @@ impl Default for SearcherConfig {
split_cache: None,
request_timeout_secs: Self::default_request_timeout_secs(),
storage_timeout_policy: None,
// TODO change this to the method used for serde default.
warmup_memory_budget: ByteSize::gb(1),
// TODO change this to the method used for serde default.
warmup_single_split_initial_allocation: ByteSize::mb(50),
}
}
}
Expand Down
22 changes: 14 additions & 8 deletions quickwit/quickwit-directories/src/caching_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use tantivy::{Directory, HasLen};
pub struct CachingDirectory {
underlying: Arc<dyn Directory>,
// TODO fixme: that's a pretty ugly cache we have here.
cache: Arc<ByteRangeCache>,
cache: ByteRangeCache,
}

impl CachingDirectory {
Expand All @@ -42,12 +42,18 @@ impl CachingDirectory {
/// Warming: The resulting CacheDirectory will cache all information without ever
/// removing any item from the cache.
pub fn new_unbounded(underlying: Arc<dyn Directory>) -> CachingDirectory {
CachingDirectory {
underlying,
cache: Arc::new(ByteRangeCache::with_infinite_capacity(
&quickwit_storage::STORAGE_METRICS.shortlived_cache,
)),
}
let byte_range_cache = ByteRangeCache::with_infinite_capacity(
&quickwit_storage::STORAGE_METRICS.shortlived_cache,
);
CachingDirectory::new(underlying, byte_range_cache)
}

/// Creates a new CachingDirectory.
///
/// Warming: The resulting CacheDirectory will cache all information without ever
/// removing any item from the cache.
pub fn new(underlying: Arc<dyn Directory>, cache: ByteRangeCache) -> CachingDirectory {
CachingDirectory { underlying, cache }
}
}

Expand All @@ -59,7 +65,7 @@ impl fmt::Debug for CachingDirectory {

struct CachingFileHandle {
path: PathBuf,
cache: Arc<ByteRangeCache>,
cache: ByteRangeCache,
underlying_filehandle: Arc<dyn FileHandle>,
}

Expand Down
10 changes: 10 additions & 0 deletions quickwit/quickwit-proto/protos/quickwit/search.proto
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,14 @@ message LeafSearchRequest {
repeated string index_uris = 9;
}

message ResourceStats {
uint64 short_lived_cache_num_bytes = 1;
uint64 split_num_docs = 2;
uint64 warmup_microsecs = 3;
uint64 cpu_thread_pool_wait_microsecs = 4;
uint64 cpu_microsecs = 5;
}

/// LeafRequestRef references data in LeafSearchRequest to deduplicate data.
message LeafRequestRef {
// The ordinal of the doc_mapper in `LeafSearchRequest.doc_mappers`
Expand Down Expand Up @@ -479,6 +487,8 @@ message LeafSearchResponse {

// postcard serialized intermediate aggregation_result.
optional bytes intermediate_aggregation_result = 6;

ResourceStats resource_stats = 8;
}

message SnippetRequest {
Expand Down
17 changes: 17 additions & 0 deletions quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion quickwit/quickwit-search/src/cluster_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use tracing::{debug, error, info, warn};
use crate::retry::search::LeafSearchRetryPolicy;
use crate::retry::search_stream::{LeafSearchStreamRetryPolicy, SuccessfulSplitIds};
use crate::retry::{retry_client, DefaultRetryPolicy, RetryPolicy};
use crate::{SearchError, SearchJobPlacer, SearchServiceClient};
use crate::{merge_resource_stats_it, SearchError, SearchJobPlacer, SearchServiceClient};

/// Maximum number of put requests emitted to perform a replicated given PUT KV.
const MAX_PUT_KV_ATTEMPTS: usize = 6;
Expand Down Expand Up @@ -317,6 +317,13 @@ fn merge_original_with_retry_leaf_search_response(
(Some(left), None) => Some(left),
(None, None) => None,
};
let mut stats = [
original_response.resource_stats.as_ref(),
retry_response.resource_stats.as_ref(),
]
.into_iter()
.flat_map(|el_opt| el_opt);
let resource_stats = merge_resource_stats_it(&mut stats);
Ok(LeafSearchResponse {
intermediate_aggregation_result,
num_hits: original_response.num_hits + retry_response.num_hits,
Expand All @@ -326,6 +333,7 @@ fn merge_original_with_retry_leaf_search_response(
partial_hits: original_response.partial_hits,
num_successful_splits: original_response.num_successful_splits
+ retry_response.num_successful_splits,
resource_stats,
})
}

Expand Down
23 changes: 20 additions & 3 deletions quickwit/quickwit-search/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use itertools::Itertools;
use quickwit_common::binary_heap::{SortKeyMapper, TopK};
use quickwit_doc_mapper::WarmupInfo;
use quickwit_proto::search::{
LeafSearchResponse, PartialHit, SearchRequest, SortByValue, SortOrder, SortValue,
SplitSearchError,
LeafSearchResponse, PartialHit, ResourceStats, SearchRequest, SortByValue, SortOrder,
SortValue, SplitSearchError,
};
use quickwit_proto::types::SplitId;
use serde::Deserialize;
Expand All @@ -40,7 +40,7 @@ use tantivy::{DateTime, DocId, Score, SegmentOrdinal, SegmentReader, TantivyErro

use crate::find_trace_ids_collector::{FindTraceIdsCollector, FindTraceIdsSegmentCollector, Span};
use crate::top_k_collector::{specialized_top_k_segment_collector, QuickwitSegmentTopKCollector};
use crate::GlobalDocAddress;
use crate::{merge_resource_stats, merge_resource_stats_it, GlobalDocAddress};

#[derive(Clone, Debug)]
pub(crate) enum SortByComponent {
Expand Down Expand Up @@ -586,13 +586,15 @@ impl SegmentCollector for QuickwitSegmentCollector {
}
None => None,
};

Ok(LeafSearchResponse {
intermediate_aggregation_result,
num_hits: self.num_hits,
partial_hits,
failed_splits: Vec::new(),
num_attempted_splits: 1,
num_successful_splits: 1,
resource_stats: None,
})
}
}
Expand Down Expand Up @@ -918,6 +920,11 @@ fn merge_leaf_responses(
return Ok(leaf_responses.pop().unwrap());
}

let mut resource_stats_it = leaf_responses
.iter()
.flat_map(|leaf_response| leaf_response.resource_stats.as_ref());
let merged_resource_stats = merge_resource_stats_it(&mut resource_stats_it);

let merged_intermediate_aggregation_result: Option<Vec<u8>> =
merge_intermediate_aggregation_result(
aggregations_opt,
Expand Down Expand Up @@ -959,6 +966,7 @@ fn merge_leaf_responses(
failed_splits,
num_attempted_splits,
num_successful_splits,
resource_stats: merged_resource_stats,
})
}

Expand Down Expand Up @@ -1182,6 +1190,7 @@ pub(crate) struct IncrementalCollector {
num_attempted_splits: u64,
num_successful_splits: u64,
start_offset: usize,
resource_stats: ResourceStats,
}

impl IncrementalCollector {
Expand All @@ -1202,6 +1211,7 @@ impl IncrementalCollector {
failed_splits: Vec::new(),
num_attempted_splits: 0,
num_successful_splits: 0,
resource_stats: ResourceStats::default(),
}
}

Expand All @@ -1214,8 +1224,13 @@ impl IncrementalCollector {
num_attempted_splits,
intermediate_aggregation_result,
num_successful_splits,
resource_stats,
} = leaf_response;

if let Some(leaf_resource_stats) = &resource_stats {
merge_resource_stats(leaf_resource_stats, &mut self.resource_stats);
}

self.num_hits += num_hits;
self.top_k_hits.add_entries(partial_hits.into_iter());
self.failed_splits.extend(failed_splits);
Expand Down Expand Up @@ -1265,6 +1280,7 @@ impl IncrementalCollector {
num_attempted_splits: self.num_attempted_splits,
num_successful_splits: self.num_successful_splits,
intermediate_aggregation_result,
resource_stats: Some(self.resource_stats),
})
}
}
Expand Down Expand Up @@ -1771,6 +1787,7 @@ mod tests {
num_attempted_splits: 3,
num_successful_splits: 3,
intermediate_aggregation_result: None,
resource_stats: None,
}],
);

Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-search/src/fetch_docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ async fn fetch_docs_in_split(
index_storage,
split,
Some(doc_mapper.tokenizer_manager()),
false,
None,
)
.await
.context("open-index-for-split")?;
Expand Down
Loading
Loading