-
Notifications
You must be signed in to change notification settings - Fork 250
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixed AzureLogHandler with multiple processes. #1158
base: master
Are you sure you want to change the base?
Fixed AzureLogHandler with multiple processes. #1158
Conversation
Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA). View this failed invocation of the CLA check for more information. For the most up to date status, view the checks section at the bottom of the pull request. |
…/github.com/JeremyVriens/opencensus-python into tasks/fix_azure_log_handler_multiprocessing
…/github.com/JeremyVriens/opencensus-python into tasks/fix_azure_log_handler_multiprocessing
…/github.com/JeremyVriens/opencensus-python into tasks/fix_azure_log_handler_multiprocessing
@JeremyVriens |
@@ -82,7 +83,7 @@ class QueueExitEvent(QueueEvent): | |||
class Queue(object): | |||
def __init__(self, capacity): | |||
self.EXIT_EVENT = QueueExitEvent('EXIT') | |||
self._queue = queue.Queue(maxsize=capacity) | |||
self._queue = multiprocessing.Queue(maxsize=capacity) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
queue.Queue
was created to work in concurrent environments spawned with the threading
module, which is the behavior of the Azure exporters. With that being said, we probably shouldn't replace the default queue that is being used, since the original use case is supposed to be for the concurrent environments. I suggest adding the ability for the user to configure which type of queue they want by adding an option here. Then you can create a different type queue accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does work with threading, however in an environment where multiple processes are being used (see linked bug reports), the queue is not being shared between the different processes. Therefore a multiprocessing.Queue is required to get this to work.
# queue as a LogRecord object is not serializable, while an Envelope | ||
# object is. | ||
envelope = self.log_record_to_envelope(record) | ||
self._queue.put(envelope, block=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does object need to be serializable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order to share messages/resources between different processes, the multiprocessing.Queue pickles the object so that it can be piped: https://github.com/python/cpython/blob/3.10/Lib/multiprocessing/queues.py#L244
@@ -75,8 +75,7 @@ def __init__(self, **options): | |||
def _export(self, batch, event=None): # pragma: NO COVER |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add an entry to the CHANGELOG
@@ -75,8 +75,7 @@ def __init__(self, **options): | |||
def _export(self, batch, event=None): # pragma: NO COVER | |||
try: | |||
if batch: | |||
envelopes = [self.log_record_to_envelope(x) for x in batch] | |||
envelopes = self.apply_telemetry_processors(envelopes) | |||
envelopes = self.apply_telemetry_processors(batch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't work because apply_telemetry_processors
is expecting an Envelope
data type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does work; the batch is a list of Envelope
data type objects as I converted them already before putting them on the queue.
# Convert the raw LogRecord to an envelope before putting it on the | ||
# queue as a LogRecord object is not serializable, while an Envelope | ||
# object is. | ||
envelope = self.log_record_to_envelope(record) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are performance implications of doing this. We don't want to be mapping every time logger.X()
is called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is indeed a performance loss here. However it is not possible to put a raw LogRecord
on the multiprocessing.Queue as it pickles the object.
I did, but I did a commit with the wrong git email config. Will try to fix this asap when I got time. |
This is a hard to tackle issue (you can tell by the bug report discussions) and I'm not so sure if there's a different/better way of sharing the queue between different processes. |
@JeremyVriens thanks for your POC! I just came across issue #900 a few days ago, while trying to pass logs from Celery to Azure Application Insights and seeing that this was first reported 2 years ago I actually didn't expect this issue to be picked up and "fixed" in the near future. Getting this properly fixed would be really helpful for a lot of people with this kind of setup! |
This works great for monkey patching.. any plans to complete this? I don't believe python2 should be a concern at this point. @JeremyVriens is looks like this is largely just pending CLA check for you. Alternatively I suppose someone could resubmit if you are no longer watching this? |
Hi All, I faced the same issue. Regards, |
May I ask for an estimated merge date for this pull request? Thanks! |
These changes fix both #900 and #928, but I'm not so sure about the compatibility with Python2.
I changed the normal queue.Queue to a multiprocessing.Queue and converted the LogRecord to an envelope before putting it on the queue as serializing a LogRecord didn't work when it contained a traceback.