Skip to content

Commit

Permalink
Lambda search endpoints using a warp adapter (#4805)
Browse files Browse the repository at this point in the history
* Lambda search endpoints using a warp adapter

* Fix request id span

* Make logging 30% more compact

* Minor improvements

* Propagete poll_ready to warp svc

* Upgrade lambda_runtime
  • Loading branch information
rdettai authored Apr 3, 2024
1 parent d097326 commit 24dc8d5
Show file tree
Hide file tree
Showing 19 changed files with 613 additions and 322 deletions.
12 changes: 5 additions & 7 deletions distribution/lambda/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ bench-index:
done

bench-search-term:
export QW_LAMBDA_LOG_SPAN_BOUNDARIES=true
export QW_LAMBDA_ENABLE_VERBOSE_JSON_LOGS=true
mem_sizes=( 1024 2048 4096 8192 )
for mem_size in "$${mem_sizes[@]}"
do
Expand All @@ -121,7 +121,7 @@ bench-search-term:
done

bench-search-histogram:
export QW_LAMBDA_LOG_SPAN_BOUNDARIES=true
export QW_LAMBDA_ENABLE_VERBOSE_JSON_LOGS=true
mem_sizes=( 1024 2048 4096 8192 )
for mem_size in "$${mem_sizes[@]}"
do
Expand All @@ -133,15 +133,13 @@ bench-search-histogram:
bench-search:
for run in {1..30}
do
export QW_LAMBDA_DISABLE_SEARCH_CACHE=true
$(MAKE) bench-search-term
$(MAKE) bench-search-histogram
export QW_LAMBDA_DISABLE_SEARCH_CACHE=false
export QW_LAMBDA_PARTIAL_REQUEST_CACHE_CAPACITY=0
$(MAKE) bench-search-term
$(MAKE) bench-search-histogram
export QW_LAMBDA_DISABLE_SEARCH_CACHE=false
export QW_LAMBDA_PARTIAL_REQUEST_CACHE_CAPACITY=64MB
$(MAKE) bench-search-term
$(MAKE) bench-search-histogram
done

test-mock-data-endpoints:
python -c 'from cdk import cli; cli.test_mock_data_endpoints()'
107 changes: 87 additions & 20 deletions distribution/lambda/cdk/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from dataclasses import dataclass
from functools import cache
from io import BytesIO
from urllib.parse import urlparse

import boto3
import botocore.config
Expand All @@ -29,6 +30,8 @@
retries={"max_attempts": 0}, read_timeout=60 * 15
)
session = boto3.Session(region_name=region)
mock_sales_index_id = "mock-sales"
hdfs_logs_index_id = "hdfs-logs"


@cache
Expand All @@ -39,19 +42,27 @@ def _get_cloudformation_output_value(stack_name: str, export_name: str) -> str:
print(f"Stack {stack_name} not identified uniquely, found {stacks}")
outputs = stacks[0]["Outputs"]
for output in outputs:
if output["ExportName"] == export_name:
if "ExportName" in output and output["ExportName"] == export_name:
return output["OutputValue"]
else:
print(f"Export name {export_name} not found in stack {stack_name}")
exit(1)


def _decompress_if_gzip(payload: bytes, headers: dict) -> str:
if headers.get("content-encoding", "") == "gzip":
return gzip.GzipFile(mode="rb", fileobj=BytesIO(payload)).read().decode()
else:
return payload.decode()


@dataclass
class LambdaResult:
function_error: str
log_tail: str
payload: str
raw_size_bytes: int
status_code: int

@staticmethod
def from_lambda_response(lambda_resp: dict) -> "LambdaResult":
Expand All @@ -61,28 +72,28 @@ def from_lambda_response(lambda_resp: dict) -> "LambdaResult":
log_tail=base64.b64decode(lambda_resp["LogResult"]).decode(),
payload=payload,
raw_size_bytes=len(payload),
status_code=0,
)

@staticmethod
def from_lambda_gateway_response(lambda_resp: dict) -> "LambdaResult":
gw_str = lambda_resp["Payload"].read().decode()
gw_obj = json.loads(gw_str)
payload = gw_obj["body"]
if gw_obj["isBase64Encoded"]:
if "body" in gw_obj:
payload = gw_obj["body"]
status_code = gw_obj["statusCode"]
else:
payload = gw_str
status_code = -1
if gw_obj.get("isBase64Encoded", False):
dec_payload = base64.b64decode(payload)
if gw_obj.get("headers", {}).get("content-encoding", "") == "gzip":
payload = (
gzip.GzipFile(mode="rb", fileobj=BytesIO(dec_payload))
.read()
.decode()
)
else:
payload = dec_payload.decode()
payload = _decompress_if_gzip(dec_payload, gw_obj.get("headers", {}))
return LambdaResult(
function_error=lambda_resp.get("FunctionError", ""),
log_tail=base64.b64decode(lambda_resp["LogResult"]).decode(),
payload=payload,
raw_size_bytes=len(gw_str),
status_code=status_code,
)

def extract_report(self) -> str:
Expand All @@ -108,12 +119,13 @@ def _format_lambda_output(
if lambda_result.function_error != "":
print("\n## FUNCTION ERROR:")
print(lambda_result.function_error)
print("\n## LOG TAIL:")
print(lambda_result.log_tail)
print("\n## RAW RESPONSE SIZE (BYTES):")
ratio = lambda_result.raw_size_bytes / len(lambda_result.payload)
print(f"{lambda_result.raw_size_bytes} ({ratio:.1f}x the final payload)")
print("\n## RESPONSE:")
if len(lambda_result.payload) == 0:
ratio = "empty payload"
else:
ratio = f"{(lambda_result.raw_size_bytes / len(lambda_result.payload)):.1f}x the final payload"
print(f"{lambda_result.raw_size_bytes} ({ratio})")
print(f"\n## RESPONSE [{lambda_result.status_code}]:")
payload_size = len(lambda_result.payload)
print(lambda_result.payload[:max_resp_size])
if payload_size > max_resp_size:
Expand Down Expand Up @@ -184,6 +196,7 @@ def invoke_hdfs_indexer() -> LambdaResult:

def _invoke_searcher(
stack_name: str,
index_id: str,
function_export_name: str,
payload: str,
download_logs: bool,
Expand All @@ -198,9 +211,14 @@ def _invoke_searcher(
LogType="Tail",
Payload=json.dumps(
{
"headers": {"Content-Type": "application/json"},
"resource": f"/api/v1/{index_id}/search",
"path": f"/api/v1/{index_id}/search",
"httpMethod": "POST",
"headers": {
"Content-Type": "application/json",
},
"requestContext": {
"http": {"method": "POST"},
"httpMethod": "POST",
},
"body": payload,
"isBase64Encoded": False,
Expand All @@ -218,6 +236,7 @@ def _invoke_searcher(
def invoke_hdfs_searcher(payload: str, download_logs: bool = True) -> LambdaResult:
return _invoke_searcher(
app.HDFS_STACK_NAME,
hdfs_logs_index_id,
hdfs_stack.SEARCHER_FUNCTION_NAME_EXPORT_NAME,
payload,
download_logs,
Expand Down Expand Up @@ -249,7 +268,6 @@ def get_logs(
last_event_id = event["eventId"]
yield event["message"]
if event["message"].startswith("REPORT"):
print(event["message"])
lower_time_bound = int(event["timestamp"])
last_event_id = "REPORT"
break
Expand Down Expand Up @@ -277,13 +295,15 @@ def download_logs_to_file(request_id: str, function_name: str, invoke_start: flo
int(invoke_start * 1000),
):
f.write(log)
print(f"Logs written to lambda.{request_id}.log")
except Exception as e:
print(f"Failed to download logs: {e}")


def invoke_mock_data_searcher():
_invoke_searcher(
app.MOCK_DATA_STACK_NAME,
mock_sales_index_id,
mock_data_stack.SEARCHER_FUNCTION_NAME_EXPORT_NAME,
"""{"query": "id:1", "sort_by": "ts", "max_hits": 10}""",
True,
Expand Down Expand Up @@ -321,7 +341,9 @@ def print_mock_data_metastore():
app.MOCK_DATA_STACK_NAME, mock_data_stack.INDEX_STORE_BUCKET_NAME_EXPORT_NAME
)
s3 = session.client("s3")
response = s3.get_object(Bucket=bucket_name, Key="index/mock-sales/metastore.json")
response = s3.get_object(
Bucket=bucket_name, Key=f"index/{mock_sales_index_id}/metastore.json"
)
print(response["Body"].read().decode())


Expand Down Expand Up @@ -387,3 +409,48 @@ def benchmark_hdfs_search(payload: str):
with open(f"lambda-bench.log", "a+") as f:
f.write(json.dumps(bench_result))
f.write("\n")


def test_mock_data_endpoints():
apigw_url = _get_cloudformation_output_value(
app.MOCK_DATA_STACK_NAME, mock_data_stack.API_GATEWAY_EXPORT_NAME
)

def req(method, path, body=None, expected_status=200):
conn = http.client.HTTPSConnection(urlparse(apigw_url).netloc)
conn.request(
method,
path,
body,
headers={"x-api-key": os.getenv("SEARCHER_API_KEY")},
)
response = conn.getresponse()
print(f"{method} {path}")
headers = {k: v for (k, v) in response.getheaders()}
body = _decompress_if_gzip(response.read(), headers)
if response.status != expected_status:
print(f"[{response.status}] => {body}")
exit(1)
else:
print(f"[{response.status}] => {json.dumps(json.loads(body))[0:100]}")

req("GET", f"/api/v1/{mock_sales_index_id}/search?query=animal")
req(
"POST",
f"/api/v1/{mock_sales_index_id}/search",
'{"query":"quantity:>5", "max_hits": 10}',
)
req("GET", f"/api/v1/_elastic/{mock_sales_index_id}/_search?q=animal")
req(
"POST",
f"/api/v1/_elastic/{mock_sales_index_id}/_search",
'{"query":{"bool":{"must":[{"range":{"quantity":{"gt":5}}}]}},"size":10}',
)
req("GET", f"/api/v1/_elastic/{mock_sales_index_id}/_field_caps?fields=quantity")
# expected errors
req(
"GET",
f"/api/v1/_elastic/{mock_sales_index_id}/_search?query=animal",
expected_status=400,
)
req("GET", f"/api/v1/_elastic/_search?q=animal", expected_status=501)
15 changes: 10 additions & 5 deletions distribution/lambda/cdk/stacks/examples/mock_data_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
SEARCHER_FUNCTION_NAME_EXPORT_NAME = "mock-data-searcher-function-name"
INDEX_STORE_BUCKET_NAME_EXPORT_NAME = "mock-data-index-store-bucket-name"
SOURCE_BUCKET_NAME_EXPORT_NAME = "mock-data-source-bucket-name"
API_GATEWAY_EXPORT_NAME = "mock-data-api-gateway-url"


class Source(Construct):
Expand Down Expand Up @@ -98,11 +99,12 @@ def __init__(
searcher_integration = aws_apigateway.LambdaIntegration(
qw_svc.searcher.lambda_function
)
search_resource = (
api.root.add_resource("v1").add_resource(index_id).add_resource("search")
)
search_resource = api.root.add_resource("v1").add_resource("{proxy+}")
search_resource.add_method("POST", searcher_integration, api_key_required=True)
api_deployment = aws_apigateway.Deployment(self, "api-deployment", api=api)
search_resource.add_method("GET", searcher_integration, api_key_required=True)
# Change the deployment id (api-deployment-x) each time the API changes,
# otherwise changes are not deployed.
api_deployment = aws_apigateway.Deployment(self, "api-deployment-1", api=api)
api_stage = aws_apigateway.Stage(
self, "api", deployment=api_deployment, stage_name="api"
)
Expand All @@ -122,7 +124,10 @@ def __init__(
api.deployment_stage = api_stage

aws_cdk.CfnOutput(
self, "search-api-url", value=api.url.rstrip("/") + search_resource.path
self,
"search-api-url",
value=api.url.rstrip("/") + search_resource.path,
export_name=API_GATEWAY_EXPORT_NAME,
)


Expand Down
2 changes: 1 addition & 1 deletion distribution/lambda/resources/hdfs-logs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Index config file for hdfs-logs dataset.
#

version: 0.6
version: 0.7

index_id: hdfs-logs

Expand Down
2 changes: 1 addition & 1 deletion distribution/lambda/resources/mock-sales.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Index config file for mock-sales data generator.
#

version: 0.6
version: 0.7

index_id: mock-sales

Expand Down
Loading

0 comments on commit 24dc8d5

Please sign in to comment.