Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add S3 Downstream Span Pointers #587

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions src/trace/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Context } from "aws-lambda";

import { patchHttp, unpatchHttp } from "./patch-http";

import { extractTriggerTags, extractHTTPStatusCodeTag } from "./trigger";
import { extractTriggerTags, extractHTTPStatusCodeTag, parseEventSource } from "./trigger";
import { ColdStartTracerConfig, ColdStartTracer } from "./cold-start-tracer";
import { logDebug, tagObject } from "../utils";
import { didFunctionColdStart, isProactiveInitialization } from "../utils/cold-start";
Expand All @@ -17,6 +17,7 @@ import { TraceContext, TraceContextService, TraceSource } from "./trace-context-
import { StepFunctionContext, StepFunctionContextService } from "./step-function-service";
import { XrayService } from "./xray-service";
import { AUTHORIZING_REQUEST_ID_HEADER } from "./context/extractors/http";
import { getSpanPointerAttributes } from "../utils/span-pointers";
export type TraceExtractor = (event: any, context: Context) => Promise<TraceContext> | TraceContext;

export interface TraceConfig {
Expand Down Expand Up @@ -80,6 +81,7 @@ export class TraceListener {
private wrappedCurrentSpan?: SpanWrapper;
private triggerTags?: { [key: string]: string };
private lambdaSpanParentContext?: SpanContext;
private spanPointerAttributesList: object[] = [];

public get currentTraceHeaders() {
return this.contextService.currentTraceHeaders;
Expand Down Expand Up @@ -131,8 +133,14 @@ export class TraceListener {

this.lambdaSpanParentContext = this.inferredSpan?.span || parentSpanContext;
this.context = context;
this.triggerTags = extractTriggerTags(event, context);
const eventSource = parseEventSource(event);
this.triggerTags = extractTriggerTags(event, context, eventSource);
this.stepFunctionContext = StepFunctionContextService.instance().context;

const result = getSpanPointerAttributes(eventSource, event);
if (result) {
this.spanPointerAttributesList.push(...result);
}
}

/**
Expand Down Expand Up @@ -195,6 +203,12 @@ export class TraceListener {
}
}
}

if (this.wrappedCurrentSpan) {
for (const attributes of this.spanPointerAttributesList) {
this.wrappedCurrentSpan.span.addSpanPointer(attributes);
}
}
return false;
}

Expand Down
9 changes: 6 additions & 3 deletions src/trace/trigger.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,17 @@ describe("parseEventSource", () => {
it("extracts all trigger tags", () => {
for (let event of events) {
const eventData = JSON.parse(readFileSync(`./event_samples/${event.file}`, "utf8"));
const triggerTags = extractTriggerTags(eventData, mockContext);
const eventSource = parseEventSource(event);
const triggerTags = extractTriggerTags(eventData, mockContext, eventSource);
expect(triggerTags).toEqual(event.result);
}
});

it("extracts the status code if API Gateway, ALB, or Function URL, otherwise do nothing, for buffered functions", () => {
for (const event of events) {
const eventData = JSON.parse(readFileSync(`./event_samples/${event.file}`, "utf8"));
const triggerTags = extractTriggerTags(eventData, mockContext);
const eventSource = parseEventSource(event);
const triggerTags = extractTriggerTags(eventData, mockContext, eventSource);
const isResponseStreamingFunction = false;
for (const response of bufferedResponses) {
const statusCode = extractHTTPStatusCodeTag(triggerTags, response.responseBody, isResponseStreamingFunction);
Expand All @@ -198,7 +200,8 @@ describe("parseEventSource", () => {
it("extracts the status code if API Gateway, ALB, or Function URL, otherwise do nothing, for streaming functions", () => {
for (let event of events) {
const eventData = JSON.parse(readFileSync(`./event_samples/${event.file}`, "utf8"));
const triggerTags = extractTriggerTags(eventData, mockContext);
const eventSource = parseEventSource(event);
const triggerTags = extractTriggerTags(eventData, mockContext, eventSource);
const isResponseStreamingFunction = true;
for (const response of streamingResponses) {
const statusCode = extractHTTPStatusCodeTag(triggerTags, response.responseBody, isResponseStreamingFunction);
Expand Down
3 changes: 1 addition & 2 deletions src/trace/trigger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,8 @@ function extractHTTPTags(event: APIGatewayEvent | APIGatewayProxyEventV2 | ALBEv
/**
* extractTriggerTags parses the trigger event object for tags to be added to the span metadata
*/
export function extractTriggerTags(event: any, context: Context) {
export function extractTriggerTags(event: any, context: Context, eventSource: eventTypes | undefined) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Quality Violation

Unexpected any. Specify a different type. (...read more)

Do not use the any type, as it is too broad and can lead to unexpected behavior.

View in Datadog  Leave us feedback  Documentation

let triggerTags: { [key: string]: string } = {};
const eventSource = parseEventSource(event);
if (eventSource) {
triggerTags["function_trigger.event_source"] = eventSource;

Expand Down
151 changes: 151 additions & 0 deletions src/utils/span-pointers.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import { getSpanPointerAttributes } from "./span-pointers";
import { eventTypes } from "../trace/trigger";
import { SPAN_LINK_KIND, S3_PTR_KIND, SPAN_POINTER_DIRECTION } from "dd-trace/packages/dd-trace/src/span_pointers";
import * as spanPointers from "dd-trace/packages/dd-trace/src/span_pointers";

// Mock the external dependencies
jest.mock("./log", () => ({
logDebug: jest.fn(),
}));

describe("span-pointers utils", () => {
const mockS3PointerHash = "mock-hash-123";

beforeEach(() => {
// Mock the generateS3PointerHash function
jest.spyOn(spanPointers, "generateS3PointerHash").mockReturnValue(mockS3PointerHash);
});

afterEach(() => {
jest.clearAllMocks();
});

describe("getSpanPointerAttributes", () => {
it("returns undefined when eventSource is undefined", () => {
const result = getSpanPointerAttributes(undefined, {});
expect(result).toBeUndefined();
});

it("returns undefined for unsupported event types", () => {
const result = getSpanPointerAttributes("unsupported" as eventTypes, {});
expect(result).toBeUndefined();
});

describe("S3 event processing", () => {
it("processes single S3 record correctly", () => {
const event = {
Records: [
{
s3: {
bucket: { name: "test-bucket" },
object: {
key: "test-key",
eTag: "test-etag",
},
},
},
],
};

const expected = [
{
"ptr.kind": S3_PTR_KIND,
"ptr.dir": SPAN_POINTER_DIRECTION.UPSTREAM,
"ptr.hash": mockS3PointerHash,
"link.kind": SPAN_LINK_KIND,
},
];

const result = getSpanPointerAttributes(eventTypes.s3, event);
expect(result).toEqual(expected);
expect(spanPointers.generateS3PointerHash).toHaveBeenCalledWith("test-bucket", "test-key", "test-etag");
});

it("processes multiple S3 records correctly", () => {
const event = {
Records: [
{
s3: {
bucket: { name: "bucket1" },
object: {
key: "key1",
eTag: "etag1",
},
},
},
{
s3: {
bucket: { name: "bucket2" },
object: {
key: "key2",
eTag: "etag2",
},
},
},
],
};

const expected = [
{
"ptr.kind": S3_PTR_KIND,
"ptr.dir": SPAN_POINTER_DIRECTION.UPSTREAM,
"ptr.hash": mockS3PointerHash,
"link.kind": SPAN_LINK_KIND,
},
{
"ptr.kind": S3_PTR_KIND,
"ptr.dir": SPAN_POINTER_DIRECTION.UPSTREAM,
"ptr.hash": mockS3PointerHash,
"link.kind": SPAN_LINK_KIND,
},
];

const result = getSpanPointerAttributes(eventTypes.s3, event);
expect(result).toEqual(expected);
});

it("handles empty Records array", () => {
const event = { Records: [] };
const result = getSpanPointerAttributes(eventTypes.s3, event);
expect(result).toEqual([]);
});

it("handles missing Records property", () => {
const event = {};
const result = getSpanPointerAttributes(eventTypes.s3, event);
expect(result).toEqual([]);
});

it("skips invalid records but processes valid ones", () => {
const event = {
Records: [
{
// Invalid record missing s3 property
},
{
s3: {
bucket: { name: "valid-bucket" },
object: {
key: "valid-key",
eTag: "valid-etag",
},
},
},
],
};

const expected = [
{
"ptr.kind": S3_PTR_KIND,
"ptr.dir": SPAN_POINTER_DIRECTION.UPSTREAM,
"ptr.hash": mockS3PointerHash,
"link.kind": SPAN_LINK_KIND,
},
];

const result = getSpanPointerAttributes(eventTypes.s3, event);
expect(result).toEqual(expected);
});
});
});
});
74 changes: 74 additions & 0 deletions src/utils/span-pointers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { eventTypes } from "../trace/trigger";
import { logDebug } from "./log";
import {
SPAN_LINK_KIND,
S3_PTR_KIND,
SPAN_POINTER_DIRECTION,
generateS3PointerHash,
} from "dd-trace/packages/dd-trace/src/span_pointers";

Check failure on line 8 in src/utils/span-pointers.ts

View workflow job for this annotation

GitHub Actions / unit-test (16.14)

Cannot find module 'dd-trace/packages/dd-trace/src/span_pointers' or its corresponding type declarations.

Check failure on line 8 in src/utils/span-pointers.ts

View workflow job for this annotation

GitHub Actions / unit-test (18.12)

Cannot find module 'dd-trace/packages/dd-trace/src/span_pointers' or its corresponding type declarations.

Check failure on line 8 in src/utils/span-pointers.ts

View workflow job for this annotation

GitHub Actions / unit-test (20.9)

Cannot find module 'dd-trace/packages/dd-trace/src/span_pointers' or its corresponding type declarations.

interface SpanPointerAttributes {
"ptr.kind": string;
"ptr.dir": string;
"ptr.hash": string;
"link.kind": string;
}

/**
* Computes span pointer attributes
*
* @param {eventTypes} eventSource - The type of event being processed (e.g., S3, DynamoDB).
* @param {any} event - The event object containing source-specific data.
* @returns {SpanPointerAttributes[] | undefined} An array of span pointer attribute objects, or undefined if none could be computed.
*/
export function getSpanPointerAttributes(
eventSource: eventTypes | undefined,
event: any,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Quality Violation

Unexpected any. Specify a different type. (...read more)

Do not use the any type, as it is too broad and can lead to unexpected behavior.

View in Datadog  Leave us feedback  Documentation

): SpanPointerAttributes[] | undefined {
if (!eventSource) {
return;
}

switch (eventSource) {
case eventTypes.s3:
return processS3Event(event);
default:
logDebug(`Event type ${eventSource} not supported by span pointers.`);
return;
}
}

function processS3Event(event: any): SpanPointerAttributes[] {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Quality Violation

Unexpected any. Specify a different type. (...read more)

Do not use the any type, as it is too broad and can lead to unexpected behavior.

View in Datadog  Leave us feedback  Documentation

const records = event.Records || [];
const spanPointerAttributesList = [];
const linkKind = SPAN_LINK_KIND;

for (const record of records) {
const eventName = record.eventName;
if (!["ObjectCreated:Put", "ObjectCreated:Copy", "ObjectCreated:CompleteMultipartUpload"].includes(eventName)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why restrict us to only these particular object created types instead of ObjectCreated:*?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only support creating span pointers for these three operations on the upstream case. I guess it doesn't hurt to create span pointers downstream in other cases; we could change the check to

if (!eventName.startsWith("ObjectCreated"))

continue;
}
// Values are stored in the same place, regardless of AWS SDK v2/v3 or the event type.
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html
const s3Event = record?.s3;
const bucketName = s3Event?.bucket?.name;
const objectKey = s3Event?.object?.key;
const eTag = s3Event?.object?.eTag;

if (!bucketName || !objectKey || !eTag) {
logDebug("Unable to calculate span pointer hash because of missing parameters.");
continue;
}

const pointerHash = generateS3PointerHash(bucketName, objectKey, eTag);
const spanPointerAttributes = {
"ptr.kind": S3_PTR_KIND,
"ptr.dir": SPAN_POINTER_DIRECTION.UPSTREAM,
"ptr.hash": pointerHash,
"link.kind": linkKind,
nhulston marked this conversation as resolved.
Show resolved Hide resolved
};
spanPointerAttributesList.push(spanPointerAttributes);
}

return spanPointerAttributesList;
}
Loading