Skip to content

Commit

Permalink
Adds a concept of request_id for logging/correlation purpose.
Browse files Browse the repository at this point in the history
We also measure the amount of memory taken by a split search, and log
this.
  • Loading branch information
fulmicoton committed Oct 25, 2024
1 parent 02a5b6a commit 1dd5544
Show file tree
Hide file tree
Showing 10 changed files with 73 additions and 20 deletions.
2 changes: 2 additions & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 12 additions & 6 deletions quickwit/quickwit-directories/src/caching_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,18 @@ impl CachingDirectory {
/// Warming: The resulting CacheDirectory will cache all information without ever
/// removing any item from the cache.
pub fn new_unbounded(underlying: Arc<dyn Directory>) -> CachingDirectory {
CachingDirectory {
underlying,
cache: Arc::new(ByteRangeCache::with_infinite_capacity(
&quickwit_storage::STORAGE_METRICS.shortlived_cache,
)),
}
let byte_range_cache = ByteRangeCache::with_infinite_capacity(
&quickwit_storage::STORAGE_METRICS.shortlived_cache,
);
CachingDirectory::new(underlying, Arc::new(byte_range_cache))
}

/// Creates a new CachingDirectory.
///
/// Warming: The resulting CacheDirectory will cache all information without ever
/// removing any item from the cache.
pub fn new(underlying: Arc<dyn Directory>, cache: Arc<ByteRangeCache>) -> CachingDirectory {
CachingDirectory { underlying, cache }
}
}

Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-search/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ http = { workspace = true }
itertools = { workspace = true }
mockall = { workspace = true }
once_cell = { workspace = true }
opentelemetry = { workspace = true }
postcard = { workspace = true }
prost = { workspace = true }
rayon = { workspace = true }
Expand All @@ -33,6 +34,7 @@ tokio = { workspace = true }
tokio-stream = { workspace = true }
tower = { workspace = true }
tracing = { workspace = true }
tracing-opentelemetry = { workspace = true }
ttl_cache = { workspace = true }
ulid = { workspace = true }
utoipa = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-search/src/fetch_docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ async fn fetch_docs_in_split(
index_storage,
split,
Some(doc_mapper.tokenizer_manager()),
false,
None,
)
.await
.context("open-index-for-split")?;
Expand Down
20 changes: 14 additions & 6 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ use quickwit_proto::search::{
use quickwit_query::query_ast::{BoolQuery, QueryAst, QueryAstTransformer, RangeQuery, TermQuery};
use quickwit_query::tokenizers::TokenizerManager;
use quickwit_storage::{
wrap_storage_with_cache, BundleStorage, MemorySizedCache, OwnedBytes, SplitCache, Storage,
StorageResolver, TimeoutAndRetryStorage,
wrap_storage_with_cache, BundleStorage, ByteRangeCache, MemorySizedCache, OwnedBytes,
SplitCache, Storage, StorageResolver, TimeoutAndRetryStorage,
};
use tantivy::aggregation::agg_req::{AggregationVariants, Aggregations};
use tantivy::aggregation::AggregationLimitsGuard;
Expand Down Expand Up @@ -133,7 +133,7 @@ pub(crate) async fn open_index_with_caches(
index_storage: Arc<dyn Storage>,
split_and_footer_offsets: &SplitIdAndFooterOffsets,
tokenizer_manager: Option<&TokenizerManager>,
ephemeral_unbounded_cache: bool,
ephemeral_unbounded_cache: Option<Arc<ByteRangeCache>>,
) -> anyhow::Result<Index> {
// Let's add a storage proxy to retry `get_slice` requests if they are taking too long,
// if configured in the searcher config.
Expand Down Expand Up @@ -165,8 +165,8 @@ pub(crate) async fn open_index_with_caches(

let directory = StorageDirectory::new(bundle_storage_with_cache);

let hot_directory = if ephemeral_unbounded_cache {
let caching_directory = CachingDirectory::new_unbounded(Arc::new(directory));
let hot_directory = if let Some(cache) = ephemeral_unbounded_cache {
let caching_directory = CachingDirectory::new(Arc::new(directory), cache);
HotDirectory::open(caching_directory, hotcache_bytes.read_bytes()?)?
} else {
HotDirectory::open(directory, hotcache_bytes.read_bytes()?)?
Expand Down Expand Up @@ -375,6 +375,7 @@ async fn leaf_search_single_split(
split_filter: Arc<RwLock<CanSplitDoBetter>>,
aggregations_limits: AggregationLimitsGuard,
) -> crate::Result<LeafSearchResponse> {

rewrite_request(
&mut search_request,
&split,
Expand All @@ -399,12 +400,15 @@ async fn leaf_search_single_split(
}

let split_id = split.split_id.to_string();
let byte_range_cache = Arc::new(ByteRangeCache::with_infinite_capacity(
&quickwit_storage::STORAGE_METRICS.shortlived_cache,
));
let index = open_index_with_caches(
searcher_context,
storage,
&split,
Some(doc_mapper.tokenizer_manager()),
true,
Some(byte_range_cache.clone()),
)
.await?;
let split_schema = index.schema();
Expand All @@ -425,6 +429,10 @@ async fn leaf_search_single_split(
warmup_info.simplify();

warmup(&searcher, &warmup_info).await?;

let trace_id = crate::get_trace_id();
info!(trace_id=%trace_id, split_id=%split.split_id.as_str(), num_docs=split.num_docs, input_data=byte_range_cache.get_num_bytes(), "split search input data memory");

let span = info_span!("tantivy_search");

let (search_request, leaf_search_response) = {
Expand Down
11 changes: 11 additions & 0 deletions quickwit/quickwit-search/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ mod tests;

pub use collector::QuickwitAggregations;
use metrics::SEARCH_METRICS;
use opentelemetry::trace::TraceId;
use quickwit_common::thread_pool::ThreadPool;
use quickwit_common::tower::Pool;
use quickwit_doc_mapper::DocMapper;
Expand Down Expand Up @@ -339,3 +340,13 @@ pub fn searcher_pool_for_test(
}),
)
}

pub(crate) fn get_trace_id() -> TraceId {
use opentelemetry::trace::TraceContextExt as _;
use tracing_opentelemetry::OpenTelemetrySpanExt as _;
tracing::Span::current()
.context()
.span()
.span_context()
.trace_id()
}
13 changes: 11 additions & 2 deletions quickwit/quickwit-search/src/list_terms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use quickwit_proto::search::{
SplitIdAndFooterOffsets, SplitSearchError,
};
use quickwit_proto::types::IndexUid;
use quickwit_storage::Storage;
use quickwit_storage::{ByteRangeCache, Storage};
use tantivy::schema::{Field, FieldType};
use tantivy::{ReloadPolicy, Term};
use tracing::{debug, error, info, instrument};
Expand Down Expand Up @@ -216,7 +216,16 @@ async fn leaf_list_terms_single_split(
storage: Arc<dyn Storage>,
split: SplitIdAndFooterOffsets,
) -> crate::Result<LeafListTermsResponse> {
let index = open_index_with_caches(searcher_context, storage, &split, None, true).await?;
let cache =
ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache);
let index = open_index_with_caches(
searcher_context,
storage,
&split,
None,
Some(Arc::new(cache)),
)
.await?;
let split_schema = index.schema();
let reader = index
.reader_builder()
Expand Down
13 changes: 10 additions & 3 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1107,14 +1107,16 @@ async fn refine_and_list_matches(
/// 2. Merges the search results.
/// 3. Sends fetch docs requests to multiple leaf nodes.
/// 4. Builds the response with docs and returns.
#[instrument(skip_all)]
#[instrument(skip_all, fields(request_id))]
pub async fn root_search(
searcher_context: &SearcherContext,
mut search_request: SearchRequest,
mut metastore: MetastoreServiceClient,
cluster_client: &ClusterClient,
) -> crate::Result<SearchResponse> {
info!(searcher_context = ?searcher_context, search_request = ?search_request);
let trace_id = crate::get_trace_id();

info!(trace_id=%trace_id, search_request = ?search_request);
let start_instant = tokio::time::Instant::now();
let list_indexes_metadatas_request = ListIndexesMetadataRequest {
index_id_patterns: search_request.index_id_patterns.clone(),
Expand Down Expand Up @@ -1169,9 +1171,14 @@ pub async fn root_search(
)
.await;

let elapsed = start_instant.elapsed();

info!(trace_id=%trace_id, num_docs=num_docs, num_splits=num_splits, elapsed_time_millis=%elapsed.as_millis(), "search completed");

if let Ok(search_response) = &mut search_response_result {
search_response.elapsed_time_micros = start_instant.elapsed().as_micros() as u64;
search_response.elapsed_time_micros = elapsed.as_micros() as u64;
}

let label_values = if search_response_result.is_ok() {
["success"]
} else {
Expand Down
7 changes: 5 additions & 2 deletions quickwit/quickwit-search/src/search_stream/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use quickwit_proto::search::{
LeafSearchStreamResponse, OutputFormat, SearchRequest, SearchStreamRequest,
SplitIdAndFooterOffsets,
};
use quickwit_storage::Storage;
use quickwit_storage::{ByteRangeCache, Storage};
use tantivy::columnar::{DynamicColumn, HasAssociatedColumnType};
use tantivy::fastfield::Column;
use tantivy::query::Query;
Expand Down Expand Up @@ -127,12 +127,15 @@ async fn leaf_search_stream_single_split(
&split,
);

let cache =
ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache);

let index = open_index_with_caches(
&searcher_context,
storage,
&split,
Some(doc_mapper.tokenizer_manager()),
true,
Some(Arc::new(cache)),
)
.await?;
let split_schema = index.schema();
Expand Down
5 changes: 5 additions & 0 deletions quickwit/quickwit-storage/src/cache/byte_range_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,11 @@ impl ByteRangeCache {
}
}

/// Overall amount of bytes stored in the cache.
pub fn get_num_bytes(&self) -> u64 {
self.inner.lock().unwrap().num_bytes
}

/// If available, returns the cached view of the slice.
pub fn get_slice(&self, path: &Path, byte_range: Range<usize>) -> Option<OwnedBytes> {
self.inner.lock().unwrap().get_slice(path, byte_range)
Expand Down

0 comments on commit 1dd5544

Please sign in to comment.