diff --git a/distribution/lambda/Makefile b/distribution/lambda/Makefile
index 3ccd3633ea9..5b121197dd9 100644
--- a/distribution/lambda/Makefile
+++ b/distribution/lambda/Makefile
@@ -111,7 +111,7 @@ bench-index:
done
bench-search-term:
- export QW_LAMBDA_LOG_SPAN_BOUNDARIES=true
+ export QW_LAMBDA_ENABLE_VERBOSE_JSON_LOGS=true
mem_sizes=( 1024 2048 4096 8192 )
for mem_size in "$${mem_sizes[@]}"
do
@@ -121,7 +121,7 @@ bench-search-term:
done
bench-search-histogram:
- export QW_LAMBDA_LOG_SPAN_BOUNDARIES=true
+ export QW_LAMBDA_ENABLE_VERBOSE_JSON_LOGS=true
mem_sizes=( 1024 2048 4096 8192 )
for mem_size in "$${mem_sizes[@]}"
do
@@ -133,15 +133,13 @@ bench-search-histogram:
bench-search:
for run in {1..30}
do
- export QW_LAMBDA_DISABLE_SEARCH_CACHE=true
- $(MAKE) bench-search-term
- $(MAKE) bench-search-histogram
- export QW_LAMBDA_DISABLE_SEARCH_CACHE=false
export QW_LAMBDA_PARTIAL_REQUEST_CACHE_CAPACITY=0
$(MAKE) bench-search-term
$(MAKE) bench-search-histogram
- export QW_LAMBDA_DISABLE_SEARCH_CACHE=false
export QW_LAMBDA_PARTIAL_REQUEST_CACHE_CAPACITY=64MB
$(MAKE) bench-search-term
$(MAKE) bench-search-histogram
done
+
+test-mock-data-endpoints:
+ python -c 'from cdk import cli; cli.test_mock_data_endpoints()'
diff --git a/distribution/lambda/cdk/cli.py b/distribution/lambda/cdk/cli.py
index 0cb9d522770..ab9460c91ba 100644
--- a/distribution/lambda/cdk/cli.py
+++ b/distribution/lambda/cdk/cli.py
@@ -14,6 +14,7 @@
from dataclasses import dataclass
from functools import cache
from io import BytesIO
+from urllib.parse import urlparse
import boto3
import botocore.config
@@ -29,6 +30,8 @@
retries={"max_attempts": 0}, read_timeout=60 * 15
)
session = boto3.Session(region_name=region)
+mock_sales_index_id = "mock-sales"
+hdfs_logs_index_id = "hdfs-logs"
@cache
@@ -39,19 +42,27 @@ def _get_cloudformation_output_value(stack_name: str, export_name: str) -> str:
print(f"Stack {stack_name} not identified uniquely, found {stacks}")
outputs = stacks[0]["Outputs"]
for output in outputs:
- if output["ExportName"] == export_name:
+ if "ExportName" in output and output["ExportName"] == export_name:
return output["OutputValue"]
else:
print(f"Export name {export_name} not found in stack {stack_name}")
exit(1)
+def _decompress_if_gzip(payload: bytes, headers: dict) -> str:
+ if headers.get("content-encoding", "") == "gzip":
+ return gzip.GzipFile(mode="rb", fileobj=BytesIO(payload)).read().decode()
+ else:
+ return payload.decode()
+
+
@dataclass
class LambdaResult:
function_error: str
log_tail: str
payload: str
raw_size_bytes: int
+ status_code: int
@staticmethod
def from_lambda_response(lambda_resp: dict) -> "LambdaResult":
@@ -61,28 +72,28 @@ def from_lambda_response(lambda_resp: dict) -> "LambdaResult":
log_tail=base64.b64decode(lambda_resp["LogResult"]).decode(),
payload=payload,
raw_size_bytes=len(payload),
+ status_code=0,
)
@staticmethod
def from_lambda_gateway_response(lambda_resp: dict) -> "LambdaResult":
gw_str = lambda_resp["Payload"].read().decode()
gw_obj = json.loads(gw_str)
- payload = gw_obj["body"]
- if gw_obj["isBase64Encoded"]:
+ if "body" in gw_obj:
+ payload = gw_obj["body"]
+ status_code = gw_obj["statusCode"]
+ else:
+ payload = gw_str
+ status_code = -1
+ if gw_obj.get("isBase64Encoded", False):
dec_payload = base64.b64decode(payload)
- if gw_obj.get("headers", {}).get("content-encoding", "") == "gzip":
- payload = (
- gzip.GzipFile(mode="rb", fileobj=BytesIO(dec_payload))
- .read()
- .decode()
- )
- else:
- payload = dec_payload.decode()
+ payload = _decompress_if_gzip(dec_payload, gw_obj.get("headers", {}))
return LambdaResult(
function_error=lambda_resp.get("FunctionError", ""),
log_tail=base64.b64decode(lambda_resp["LogResult"]).decode(),
payload=payload,
raw_size_bytes=len(gw_str),
+ status_code=status_code,
)
def extract_report(self) -> str:
@@ -108,12 +119,13 @@ def _format_lambda_output(
if lambda_result.function_error != "":
print("\n## FUNCTION ERROR:")
print(lambda_result.function_error)
- print("\n## LOG TAIL:")
- print(lambda_result.log_tail)
print("\n## RAW RESPONSE SIZE (BYTES):")
- ratio = lambda_result.raw_size_bytes / len(lambda_result.payload)
- print(f"{lambda_result.raw_size_bytes} ({ratio:.1f}x the final payload)")
- print("\n## RESPONSE:")
+ if len(lambda_result.payload) == 0:
+ ratio = "empty payload"
+ else:
+ ratio = f"{(lambda_result.raw_size_bytes / len(lambda_result.payload)):.1f}x the final payload"
+ print(f"{lambda_result.raw_size_bytes} ({ratio})")
+ print(f"\n## RESPONSE [{lambda_result.status_code}]:")
payload_size = len(lambda_result.payload)
print(lambda_result.payload[:max_resp_size])
if payload_size > max_resp_size:
@@ -184,6 +196,7 @@ def invoke_hdfs_indexer() -> LambdaResult:
def _invoke_searcher(
stack_name: str,
+ index_id: str,
function_export_name: str,
payload: str,
download_logs: bool,
@@ -198,9 +211,14 @@ def _invoke_searcher(
LogType="Tail",
Payload=json.dumps(
{
- "headers": {"Content-Type": "application/json"},
+ "resource": f"/api/v1/{index_id}/search",
+ "path": f"/api/v1/{index_id}/search",
+ "httpMethod": "POST",
+ "headers": {
+ "Content-Type": "application/json",
+ },
"requestContext": {
- "http": {"method": "POST"},
+ "httpMethod": "POST",
},
"body": payload,
"isBase64Encoded": False,
@@ -218,6 +236,7 @@ def _invoke_searcher(
def invoke_hdfs_searcher(payload: str, download_logs: bool = True) -> LambdaResult:
return _invoke_searcher(
app.HDFS_STACK_NAME,
+ hdfs_logs_index_id,
hdfs_stack.SEARCHER_FUNCTION_NAME_EXPORT_NAME,
payload,
download_logs,
@@ -249,7 +268,6 @@ def get_logs(
last_event_id = event["eventId"]
yield event["message"]
if event["message"].startswith("REPORT"):
- print(event["message"])
lower_time_bound = int(event["timestamp"])
last_event_id = "REPORT"
break
@@ -277,6 +295,7 @@ def download_logs_to_file(request_id: str, function_name: str, invoke_start: flo
int(invoke_start * 1000),
):
f.write(log)
+ print(f"Logs written to lambda.{request_id}.log")
except Exception as e:
print(f"Failed to download logs: {e}")
@@ -284,6 +303,7 @@ def download_logs_to_file(request_id: str, function_name: str, invoke_start: flo
def invoke_mock_data_searcher():
_invoke_searcher(
app.MOCK_DATA_STACK_NAME,
+ mock_sales_index_id,
mock_data_stack.SEARCHER_FUNCTION_NAME_EXPORT_NAME,
"""{"query": "id:1", "sort_by": "ts", "max_hits": 10}""",
True,
@@ -321,7 +341,9 @@ def print_mock_data_metastore():
app.MOCK_DATA_STACK_NAME, mock_data_stack.INDEX_STORE_BUCKET_NAME_EXPORT_NAME
)
s3 = session.client("s3")
- response = s3.get_object(Bucket=bucket_name, Key="index/mock-sales/metastore.json")
+ response = s3.get_object(
+ Bucket=bucket_name, Key=f"index/{mock_sales_index_id}/metastore.json"
+ )
print(response["Body"].read().decode())
@@ -387,3 +409,48 @@ def benchmark_hdfs_search(payload: str):
with open(f"lambda-bench.log", "a+") as f:
f.write(json.dumps(bench_result))
f.write("\n")
+
+
+def test_mock_data_endpoints():
+ apigw_url = _get_cloudformation_output_value(
+ app.MOCK_DATA_STACK_NAME, mock_data_stack.API_GATEWAY_EXPORT_NAME
+ )
+
+ def req(method, path, body=None, expected_status=200):
+ conn = http.client.HTTPSConnection(urlparse(apigw_url).netloc)
+ conn.request(
+ method,
+ path,
+ body,
+ headers={"x-api-key": os.getenv("SEARCHER_API_KEY")},
+ )
+ response = conn.getresponse()
+ print(f"{method} {path}")
+ headers = {k: v for (k, v) in response.getheaders()}
+ body = _decompress_if_gzip(response.read(), headers)
+ if response.status != expected_status:
+ print(f"[{response.status}] => {body}")
+ exit(1)
+ else:
+ print(f"[{response.status}] => {json.dumps(json.loads(body))[0:100]}")
+
+ req("GET", f"/api/v1/{mock_sales_index_id}/search?query=animal")
+ req(
+ "POST",
+ f"/api/v1/{mock_sales_index_id}/search",
+ '{"query":"quantity:>5", "max_hits": 10}',
+ )
+ req("GET", f"/api/v1/_elastic/{mock_sales_index_id}/_search?q=animal")
+ req(
+ "POST",
+ f"/api/v1/_elastic/{mock_sales_index_id}/_search",
+ '{"query":{"bool":{"must":[{"range":{"quantity":{"gt":5}}}]}},"size":10}',
+ )
+ req("GET", f"/api/v1/_elastic/{mock_sales_index_id}/_field_caps?fields=quantity")
+ # expected errors
+ req(
+ "GET",
+ f"/api/v1/_elastic/{mock_sales_index_id}/_search?query=animal",
+ expected_status=400,
+ )
+ req("GET", f"/api/v1/_elastic/_search?q=animal", expected_status=501)
diff --git a/distribution/lambda/cdk/stacks/examples/mock_data_stack.py b/distribution/lambda/cdk/stacks/examples/mock_data_stack.py
index 4822e69723a..2ddfc9350cd 100644
--- a/distribution/lambda/cdk/stacks/examples/mock_data_stack.py
+++ b/distribution/lambda/cdk/stacks/examples/mock_data_stack.py
@@ -17,6 +17,7 @@
SEARCHER_FUNCTION_NAME_EXPORT_NAME = "mock-data-searcher-function-name"
INDEX_STORE_BUCKET_NAME_EXPORT_NAME = "mock-data-index-store-bucket-name"
SOURCE_BUCKET_NAME_EXPORT_NAME = "mock-data-source-bucket-name"
+API_GATEWAY_EXPORT_NAME = "mock-data-api-gateway-url"
class Source(Construct):
@@ -98,11 +99,12 @@ def __init__(
searcher_integration = aws_apigateway.LambdaIntegration(
qw_svc.searcher.lambda_function
)
- search_resource = (
- api.root.add_resource("v1").add_resource(index_id).add_resource("search")
- )
+ search_resource = api.root.add_resource("v1").add_resource("{proxy+}")
search_resource.add_method("POST", searcher_integration, api_key_required=True)
- api_deployment = aws_apigateway.Deployment(self, "api-deployment", api=api)
+ search_resource.add_method("GET", searcher_integration, api_key_required=True)
+ # Change the deployment id (api-deployment-x) each time the API changes,
+ # otherwise changes are not deployed.
+ api_deployment = aws_apigateway.Deployment(self, "api-deployment-1", api=api)
api_stage = aws_apigateway.Stage(
self, "api", deployment=api_deployment, stage_name="api"
)
@@ -122,7 +124,10 @@ def __init__(
api.deployment_stage = api_stage
aws_cdk.CfnOutput(
- self, "search-api-url", value=api.url.rstrip("/") + search_resource.path
+ self,
+ "search-api-url",
+ value=api.url.rstrip("/") + search_resource.path,
+ export_name=API_GATEWAY_EXPORT_NAME,
)
diff --git a/distribution/lambda/resources/hdfs-logs.yaml b/distribution/lambda/resources/hdfs-logs.yaml
index ceccba394b5..3538359a0f5 100644
--- a/distribution/lambda/resources/hdfs-logs.yaml
+++ b/distribution/lambda/resources/hdfs-logs.yaml
@@ -2,7 +2,7 @@
# Index config file for hdfs-logs dataset.
#
-version: 0.6
+version: 0.7
index_id: hdfs-logs
diff --git a/distribution/lambda/resources/mock-sales.yaml b/distribution/lambda/resources/mock-sales.yaml
index 46d7405c055..7831039745c 100644
--- a/distribution/lambda/resources/mock-sales.yaml
+++ b/distribution/lambda/resources/mock-sales.yaml
@@ -2,7 +2,7 @@
# Index config file for mock-sales data generator.
#
-version: 0.6
+version: 0.7
index_id: mock-sales
diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock
index 6d63cdac3ab..5458641e695 100644
--- a/quickwit/Cargo.lock
+++ b/quickwit/Cargo.lock
@@ -258,6 +258,19 @@ dependencies = [
"futures-core",
]
+[[package]]
+name = "async-compression"
+version = "0.3.15"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "942c7cd7ae39e91bde4820d74132e9862e62c2f386c3aa90ccf55949f5bad63a"
+dependencies = [
+ "flate2",
+ "futures-core",
+ "memchr",
+ "pin-project-lite",
+ "tokio",
+]
+
[[package]]
name = "async-compression"
version = "0.4.6"
@@ -735,6 +748,22 @@ dependencies = [
"tracing",
]
+[[package]]
+name = "aws_lambda_events"
+version = "0.12.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "03611508dd1e514e311caec235b581c99a4cb66fa1771bd502819eed69894f12"
+dependencies = [
+ "base64 0.21.7",
+ "bytes",
+ "http 0.2.12",
+ "http-body 0.4.6",
+ "http-serde 1.1.3",
+ "query_map",
+ "serde",
+ "serde_json",
+]
+
[[package]]
name = "aws_lambda_events"
version = "0.15.0"
@@ -909,6 +938,12 @@ version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
+[[package]]
+name = "base64"
+version = "0.20.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0ea22880d78093b0cbe17c89f64a7d457941e65759157ec6cb31a31d652b05e5"
+
[[package]]
name = "base64"
version = "0.21.7"
@@ -3318,24 +3353,21 @@ dependencies = [
[[package]]
name = "lambda_http"
-version = "0.10.0"
+version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ef8fafd7a4ce0bc6093cf1bed3dcdfc1239c27df1e79e3f2154f4d3299d4f60e"
+checksum = "2505c4a24f5a8d8ac66a87691215ec1f79736c5bc6e62bb921788dca9753f650"
dependencies = [
- "aws_lambda_events",
+ "aws_lambda_events 0.12.1",
"base64 0.21.7",
"bytes",
"encoding_rs",
"futures",
- "futures-util",
- "http 1.1.0",
- "http-body 1.0.0",
- "http-body-util",
- "hyper 1.2.0",
- "lambda_runtime",
+ "http 0.2.12",
+ "http-body 0.4.6",
+ "hyper 0.14.28",
+ "lambda_runtime 0.8.3",
"mime",
"percent-encoding",
- "pin-project-lite",
"serde",
"serde_json",
"serde_urlencoded",
@@ -3345,9 +3377,33 @@ dependencies = [
[[package]]
name = "lambda_runtime"
-version = "0.10.0"
+version = "0.8.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "deca8f65d7ce9a8bfddebb49d7d91b22e788a59ca0c5190f26794ab80ed7a702"
+dependencies = [
+ "async-stream",
+ "base64 0.20.0",
+ "bytes",
+ "futures",
+ "http 0.2.12",
+ "http-body 0.4.6",
+ "http-serde 1.1.3",
+ "hyper 0.14.28",
+ "lambda_runtime_api_client 0.8.0",
+ "serde",
+ "serde_json",
+ "serde_path_to_error",
+ "tokio",
+ "tokio-stream",
+ "tower",
+ "tracing",
+]
+
+[[package]]
+name = "lambda_runtime"
+version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cc2904c10fbeaf07aa317fc96a0e28e89c80ed12f7949ed06afd7869b21fef32"
+checksum = "276c835f2217fac810a97f2ed8eabfe9be71afe4f3ffd8671b05cb528e95ff8a"
dependencies = [
"async-stream",
"base64 0.21.7",
@@ -3359,21 +3415,35 @@ dependencies = [
"http-serde 2.0.0",
"hyper 1.2.0",
"hyper-util",
- "lambda_runtime_api_client",
+ "lambda_runtime_api_client 0.11.0",
+ "pin-project",
"serde",
"serde_json",
"serde_path_to_error",
"tokio",
"tokio-stream",
"tower",
+ "tower-layer",
"tracing",
]
[[package]]
name = "lambda_runtime_api_client"
-version = "0.10.0"
+version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1364cd67281721d2a9a4444ba555cf4d74a195e647061fa4ccac46e6f5c3b0ae"
+checksum = "690c5ae01f3acac8c9c3348b556fc443054e9b7f1deaf53e9ebab716282bf0ed"
+dependencies = [
+ "http 0.2.12",
+ "hyper 0.14.28",
+ "tokio",
+ "tower-service",
+]
+
+[[package]]
+name = "lambda_runtime_api_client"
+version = "0.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "722b02764422524d3f49a934b570f7c567f811eda1f9c4bdebebcfae1bad4f23"
dependencies = [
"bytes",
"futures-channel",
@@ -5704,7 +5774,7 @@ version = "0.8.0"
dependencies = [
"anyhow",
"arc-swap",
- "async-compression",
+ "async-compression 0.4.6",
"async-trait",
"aws-sdk-kinesis",
"bytes",
@@ -5887,12 +5957,14 @@ name = "quickwit-lambda"
version = "0.8.0"
dependencies = [
"anyhow",
- "aws_lambda_events",
+ "aws_lambda_events 0.15.0",
"chitchat",
"chrono",
"flate2",
+ "http 0.2.12",
"lambda_http",
- "lambda_runtime",
+ "lambda_runtime 0.11.1",
+ "mime_guess",
"once_cell",
"opentelemetry",
"opentelemetry-otlp",
@@ -5912,6 +5984,7 @@ dependencies = [
"quickwit-storage",
"quickwit-telemetry",
"rand 0.8.5",
+ "reqwest",
"serde",
"serde_json",
"time",
@@ -5919,6 +5992,7 @@ dependencies = [
"tracing",
"tracing-opentelemetry",
"tracing-subscriber",
+ "warp",
]
[[package]]
@@ -8360,7 +8434,7 @@ version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140"
dependencies = [
- "async-compression",
+ "async-compression 0.4.6",
"bitflags 2.5.0",
"bytes",
"futures-core",
@@ -8928,6 +9002,7 @@ version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1e92e22e03ff1230c03a1a8ee37d2f89cd489e2e541b7550d6afad96faed169"
dependencies = [
+ "async-compression 0.3.15",
"bytes",
"futures-channel",
"futures-util",
diff --git a/quickwit/quickwit-lambda/Cargo.toml b/quickwit/quickwit-lambda/Cargo.toml
index 68ad0de2683..abb15cbcca0 100644
--- a/quickwit/quickwit-lambda/Cargo.toml
+++ b/quickwit/quickwit-lambda/Cargo.toml
@@ -24,8 +24,10 @@ aws_lambda_events = "0.15.0"
chitchat = { workspace = true }
chrono = { workspace = true }
flate2 = { workspace = true }
-lambda_http = "0.10.0"
-lambda_runtime = "0.10.0"
+http = { workspace = true }
+lambda_http = "0.8.0"
+lambda_runtime = "0.11.1"
+mime_guess = { workspace = true }
once_cell = { workspace = true }
opentelemetry = { workspace = true }
opentelemetry-otlp = { workspace = true, features = [
@@ -34,6 +36,7 @@ opentelemetry-otlp = { workspace = true, features = [
"http-proto",
] }
rand = { workspace = true }
+reqwest = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
time = { workspace = true }
@@ -41,6 +44,8 @@ tokio = { workspace = true }
tracing = { workspace = true }
tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true, features = ["json"] }
+warp = { workspace = true , features = ["compression-gzip"] }
+
quickwit-actors = { workspace = true }
quickwit-cli = { workspace = true }
diff --git a/quickwit/quickwit-lambda/src/bin/searcher.rs b/quickwit/quickwit-lambda/src/bin/searcher.rs
index 564ea4e6653..af06c9e0698 100644
--- a/quickwit/quickwit-lambda/src/bin/searcher.rs
+++ b/quickwit/quickwit-lambda/src/bin/searcher.rs
@@ -17,13 +17,15 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-use lambda_http::{run, service_fn};
use quickwit_lambda::logger;
-use quickwit_lambda::searcher::handler;
+use quickwit_lambda::searcher::{setup_searcher_api, warp_lambda};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
logger::setup_lambda_tracer(tracing::Level::INFO)?;
- let func = service_fn(handler);
- run(func).await.map_err(|e| anyhow::anyhow!(e))
+ let routes = setup_searcher_api().await?;
+ let warp_service = warp::service(routes);
+ warp_lambda::run(warp_service)
+ .await
+ .map_err(|e| anyhow::anyhow!(e))
}
diff --git a/quickwit/quickwit-lambda/src/environment.rs b/quickwit/quickwit-lambda/src/environment.rs
index 57f339d351d..ea916eacde4 100644
--- a/quickwit/quickwit-lambda/src/environment.rs
+++ b/quickwit/quickwit-lambda/src/environment.rs
@@ -24,10 +24,11 @@ use once_cell::sync::Lazy;
pub static INDEX_ID: Lazy =
Lazy::new(|| var("QW_LAMBDA_INDEX_ID").expect("QW_LAMBDA_INDEX_ID must be set"));
-/// Configure the fmt tracing subscriber to log span boundaries. This is very verbose and is
-/// only used to generate advanced KPIs from Lambda runs (e.g for blogpost benchmarks)
-pub static LOG_SPAN_BOUNDARIES: Lazy =
- Lazy::new(|| var("QW_LAMBDA_LOG_SPAN_BOUNDARIES").is_ok_and(|v| v.as_str() == "true"));
+/// Configure the fmt tracing subscriber to log as json and include span
+/// boundaries. This is very verbose and is only used to generate advanced KPIs
+/// from Lambda runs (e.g for blog post benchmarks)
+pub static ENABLE_VERBOSE_JSON_LOGS: Lazy =
+ Lazy::new(|| var("QW_LAMBDA_ENABLE_VERBOSE_JSON_LOGS").is_ok_and(|v| v.as_str() == "true"));
pub static OPENTELEMETRY_URL: Lazy