豆豆友情提示:这是一个非官方 GitHub 代理镜像,主要用于网络测试或访问加速。请勿在此进行登录、注册或处理任何敏感信息。进行这些操作请务必访问官方网站 github.com。 Raw 内容也通过此代理提供。
Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sentry_sdk/_span_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def _to_transport_format(item: "StreamedSpan") -> "Any":
res: "dict[str, Any]" = {
"trace_id": item.trace_id,
"span_id": item.span_id,
"name": item._name,
"name": item._name if item._name is not None else "<unknown>",
Comment thread
sentrivana marked this conversation as resolved.
Outdated
"status": item._status,
"is_segment": item._is_segment(),
"start_timestamp": item._start_timestamp.timestamp(),
Expand Down
217 changes: 151 additions & 66 deletions sentry_sdk/integrations/celery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
)
from sentry_sdk.integrations.celery.utils import _now_seconds_since_epoch
from sentry_sdk.integrations.logging import ignore_logger
from sentry_sdk.traces import StreamedSpan
from sentry_sdk.tracing import BAGGAGE_HEADER_NAME, Span, TransactionSource
from sentry_sdk.tracing_utils import Baggage
from sentry_sdk.tracing_utils import Baggage, has_span_streaming_enabled
from sentry_sdk.utils import (
capture_internal_exceptions,
ensure_integration_enabled,
event_from_exception,
reraise,
)
Expand Down Expand Up @@ -162,7 +162,9 @@ def event_processor(event: "Event", hint: "Hint") -> "Optional[Event]":


def _update_celery_task_headers(
original_headers: "dict[str, Any]", span: "Optional[Span]", monitor_beat_tasks: bool
original_headers: "dict[str, Any]",
span: "Optional[Union[StreamedSpan, Span]]",
monitor_beat_tasks: bool,
) -> "dict[str, Any]":
"""
Updates the headers of the Celery task with the tracing information
Expand Down Expand Up @@ -255,7 +257,8 @@ def _wrap_task_run(f: "F") -> "F":
def apply_async(*args: "Any", **kwargs: "Any") -> "Any":
# Note: kwargs can contain headers=None, so no setdefault!
# Unsure which backend though.
integration = sentry_sdk.get_client().get_integration(CeleryIntegration)
client = sentry_sdk.get_client()
integration = client.get_integration(CeleryIntegration)
if integration is None:
return f(*args, **kwargs)

Expand All @@ -274,17 +277,28 @@ def apply_async(*args: "Any", **kwargs: "Any") -> "Any":
else:
task_name = "<unknown Celery task>"

span_streaming = has_span_streaming_enabled(client.options)

task_started_from_beat = sentry_sdk.get_isolation_scope()._name == "celery-beat"

span_mgr: "Union[Span, NoOpMgr]" = (
sentry_sdk.start_span(
op=OP.QUEUE_SUBMIT_CELERY,
name=task_name,
origin=CeleryIntegration.origin,
)
if not task_started_from_beat
else NoOpMgr()
)
span_mgr: "Union[StreamedSpan, Span, NoOpMgr]" = NoOpMgr()
if span_streaming:
if not task_started_from_beat and sentry_sdk.get_current_span() is not None:
span_mgr = sentry_sdk.traces.start_span(
name=task_name,
attributes={
"sentry.op": OP.QUEUE_SUBMIT_CELERY,
"sentry.origin": CeleryIntegration.origin,
},
)

else:
if not task_started_from_beat:
span_mgr = sentry_sdk.start_span(
op=OP.QUEUE_SUBMIT_CELERY,
name=task_name,
origin=CeleryIntegration.origin,
)

with span_mgr as span:
kwargs["headers"] = _update_celery_task_headers(
Expand All @@ -303,50 +317,74 @@ def _wrap_tracer(task: "Any", f: "F") -> "F":
# Also because in Celery 3, signal dispatch returns early if one handler
# crashes.
@wraps(f)
@ensure_integration_enabled(CeleryIntegration, f)
def _inner(*args: "Any", **kwargs: "Any") -> "Any":
client = sentry_sdk.get_client()
if client.get_integration(CeleryIntegration) is None:
return f(*args, **kwargs)

span_streaming = has_span_streaming_enabled(client.options)

with isolation_scope() as scope:
scope._name = "celery"
scope.clear_breadcrumbs()
scope.add_event_processor(_make_event_processor(task, *args, **kwargs))

transaction = None
custom_sampling_context = {
"celery_job": {
"task": task.name,
# for some reason, args[1] is a list if non-empty but a
# tuple if empty
"args": list(args[1]),
"kwargs": args[2],
}
}
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated

span: "Union[Span, StreamedSpan]"
span_ctx: "Union[StreamedSpan, Span, NoOpMgr]" = NoOpMgr()

# Celery task objects are not a thing to be trusted. Even
# something such as attribute access can fail.
with capture_internal_exceptions():
headers = args[3].get("headers") or {}
transaction = continue_trace(
headers,
op=OP.QUEUE_TASK_CELERY,
name="unknown celery task",
source=TransactionSource.TASK,
origin=CeleryIntegration.origin,
)
transaction.name = task.name
transaction.set_status(SPANSTATUS.OK)
if span_streaming:
sentry_sdk.traces.continue_trace(headers)
scope.set_custom_sampling_context(custom_sampling_context)
span = sentry_sdk.traces.start_span(
name=task.name,
parent_span=None, # make this a segment
attributes={
"sentry.origin": CeleryIntegration.origin,
"sentry.span.source": TransactionSource.TASK.value,
"sentry.op": OP.QUEUE_TASK_CELERY,
},
)

if transaction is None:
return f(*args, **kwargs)
span_ctx = span
Comment thread
cursor[bot] marked this conversation as resolved.

else:
span = continue_trace(
headers,
op=OP.QUEUE_TASK_CELERY,
name=task.name,
source=TransactionSource.TASK,
origin=CeleryIntegration.origin,
)
span.set_status(SPANSTATUS.OK)

span_ctx = sentry_sdk.start_transaction(
span,
custom_sampling_context=custom_sampling_context,
)

with sentry_sdk.start_transaction(
transaction,
custom_sampling_context={
"celery_job": {
"task": task.name,
# for some reason, args[1] is a list if non-empty but a
# tuple if empty
"args": list(args[1]),
"kwargs": args[2],
}
},
):
with span_ctx:
return f(*args, **kwargs)

return _inner # type: ignore


def _set_messaging_destination_name(task: "Any", span: "Span") -> None:
def _set_messaging_destination_name(
task: "Any", span: "Union[StreamedSpan, Span]"
) -> None:
"""Set "messaging.destination.name" tag for span"""
with capture_internal_exceptions():
delivery_info = task.request.delivery_info
Expand All @@ -355,26 +393,47 @@ def _set_messaging_destination_name(task: "Any", span: "Span") -> None:
if delivery_info.get("exchange") == "" and routing_key is not None:
# Empty exchange indicates the default exchange, meaning the tasks
# are sent to the queue with the same name as the routing key.
span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key)
if isinstance(span, StreamedSpan):
span.set_attribute(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key)
else:
span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key)


def _wrap_task_call(task: "Any", f: "F") -> "F":
# Need to wrap task call because the exception is caught before we get to
# see it. Also celery's reported stacktrace is untrustworthy.

# functools.wraps is important here because celery-once looks at this
# method's name. @ensure_integration_enabled internally calls functools.wraps,
# but if we ever remove the @ensure_integration_enabled decorator, we need
# to add @functools.wraps(f) here.
# https://github.com/getsentry/sentry-python/issues/421
@ensure_integration_enabled(CeleryIntegration, f)
@wraps(f)
def _inner(*args: "Any", **kwargs: "Any") -> "Any":
client = sentry_sdk.get_client()
if client.get_integration(CeleryIntegration) is None:
return f(*args, **kwargs)

span_streaming = has_span_streaming_enabled(client.options)

try:
with sentry_sdk.start_span(
op=OP.QUEUE_PROCESS,
name=task.name,
origin=CeleryIntegration.origin,
) as span:
span: "Union[Span, StreamedSpan]"
if span_streaming:
span = sentry_sdk.traces.start_span(
name=task.name,
attributes={
"sentry.op": OP.QUEUE_PROCESS,
"sentry.origin": CeleryIntegration.origin,
},
)
else:
span = sentry_sdk.start_span(
op=OP.QUEUE_PROCESS,
name=task.name,
origin=CeleryIntegration.origin,
)

with span:
if isinstance(span, StreamedSpan):
set_on_span = span.set_attribute
else:
set_on_span = span.set_data

_set_messaging_destination_name(task, span)

latency = None
Expand All @@ -389,19 +448,19 @@ def _inner(*args: "Any", **kwargs: "Any") -> "Any":

if latency is not None:
latency *= 1000 # milliseconds
span.set_data(SPANDATA.MESSAGING_MESSAGE_RECEIVE_LATENCY, latency)
set_on_span(SPANDATA.MESSAGING_MESSAGE_RECEIVE_LATENCY, latency)

with capture_internal_exceptions():
span.set_data(SPANDATA.MESSAGING_MESSAGE_ID, task.request.id)
set_on_span(SPANDATA.MESSAGING_MESSAGE_ID, task.request.id)

with capture_internal_exceptions():
span.set_data(
set_on_span(
SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, task.request.retries
)

with capture_internal_exceptions():
with task.app.connection() as conn:
span.set_data(
set_on_span(
SPANDATA.MESSAGING_SYSTEM,
conn.transport.driver_type,
)
Expand Down Expand Up @@ -476,8 +535,13 @@ def sentry_workloop(*args: "Any", **kwargs: "Any") -> "Any":
def _patch_producer_publish() -> None:
original_publish = Producer.publish

@ensure_integration_enabled(CeleryIntegration, original_publish)
def sentry_publish(self: "Producer", *args: "Any", **kwargs: "Any") -> "Any":
client = sentry_sdk.get_client()
if client.get_integration(CeleryIntegration) is None:
return original_publish(self, *args, **kwargs)

span_streaming = has_span_streaming_enabled(client.options)

kwargs_headers = kwargs.get("headers", {})
if not isinstance(kwargs_headers, Mapping):
# Ensure kwargs_headers is a Mapping, so we can safely call get().
Expand All @@ -487,31 +551,52 @@ def sentry_publish(self: "Producer", *args: "Any", **kwargs: "Any") -> "Any":
# method will still work.
kwargs_headers = {}

task_name = kwargs_headers.get("task")
task_name = kwargs_headers.get("task") or "<unknown Celery task>"
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used for naming the span and spans v2 must always have a name, name=None is not supported.

task_id = kwargs_headers.get("id")
retries = kwargs_headers.get("retries")

routing_key = kwargs.get("routing_key")
exchange = kwargs.get("exchange")

with sentry_sdk.start_span(
op=OP.QUEUE_PUBLISH,
name=task_name,
origin=CeleryIntegration.origin,
) as span:
span: "Union[StreamedSpan, Span, None]" = None
if span_streaming:
if sentry_sdk.get_current_span() is not None:
span = sentry_sdk.traces.start_span(
name=task_name,
attributes={
"sentry.op": OP.QUEUE_PUBLISH,
"sentry.origin": CeleryIntegration.origin,
},
)
else:
span = sentry_sdk.start_span(
op=OP.QUEUE_PUBLISH,
name=task_name,
origin=CeleryIntegration.origin,
)

if span is None:
return original_publish(self, *args, **kwargs)

with span:
if isinstance(span, StreamedSpan):
set_on_span = span.set_attribute
else:
set_on_span = span.set_data

if task_id is not None:
span.set_data(SPANDATA.MESSAGING_MESSAGE_ID, task_id)
set_on_span(SPANDATA.MESSAGING_MESSAGE_ID, task_id)

if exchange == "" and routing_key is not None:
# Empty exchange indicates the default exchange, meaning messages are
# routed to the queue with the same name as the routing key.
span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key)
set_on_span(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key)

if retries is not None:
span.set_data(SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, retries)
set_on_span(SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, retries)

with capture_internal_exceptions():
span.set_data(
set_on_span(
SPANDATA.MESSAGING_SYSTEM, self.connection.transport.driver_type
)

Expand Down
13 changes: 13 additions & 0 deletions sentry_sdk/scope.py
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,19 @@ def span(self, span: "Optional[Union[Span, StreamedSpan]]") -> None:
if transaction.source:
self._transaction_info["source"] = transaction.source

# Also set _transaction and _transaction_info in streaming mode as this
# is used for populating events and linking them to segments
if (
isinstance(span, StreamedSpan)
and not isinstance(span, NoOpStreamedSpan)
and span._is_segment()
):
self._transaction = span.name
if span._attributes.get("sentry.span.source"):
self._transaction_info["source"] = str(
span._attributes["sentry.span.source"]
Comment on lines +905 to +906
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're wondering what the str() is doing here, the answer is mypy 😎 😭

)

@property
def profile(self) -> "Optional[Profile]":
return self._profile
Expand Down
4 changes: 3 additions & 1 deletion sentry_sdk/traces.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,9 @@ def __init__(
):
self._name: str = name
self._active: bool = active
self._attributes: "Attributes" = {}
self._attributes: "Attributes" = {
"sentry.origin": "manual",
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was an oversight in the core logic -- we were never setting sentry.origin for custom instrumentation

}

if attributes:
for attribute, value in attributes.items():
Expand Down
Loading
Loading