refactor(amber): centralize uncaught-exception console reporting#5783
Conversation
Move the operator-facing ERROR console-message construction for an uncaught exception out of DataProcessor._report_exception into ConsoleMessageManager.report_exception(worker_id, exc_info), and have DataProcessor delegate to it. Behavior-preserving: the same ERROR ConsoleMessage (title = exception's final line, message = full traceback, source = module:func:line of the raising frame) is queued on the same buffer. Centralizing it lets other uncaught-exception paths report identically. Adds a unit test pinning the ERROR message construction; the existing data_processor test (console messages after a UDF raises) still passes, confirming behavior is preserved.
There was a problem hiding this comment.
Pull request overview
This PR refactors Amber’s Python worker runtime to centralize construction of operator-facing console ERROR messages for uncaught UDF exceptions in ConsoleMessageManager, with DataProcessor delegating to that API. This keeps exception console reporting consistent across runtime paths and pins the behavior with a focused unit test.
Changes:
- Add
ConsoleMessageManager.report_exception(worker_id, exc_info)to build/queue an ERRORConsoleMessagefromexc_info. - Remove
DataProcessor’s inline exception-to-console formatting and delegate toConsoleMessageManager. - Add a new unit test to validate the emitted ERROR message fields (type, title, traceback body, and
module:func:linesource).
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
amber/src/main/python/core/architecture/managers/console_message_manager.py |
Introduces report_exception and moves exception-to-console-message construction into the manager. |
amber/src/main/python/core/runnables/data_processor.py |
Removes _report_exception and delegates uncaught-exception console reporting to ConsoleMessageManager. |
amber/src/test/python/core/architecture/managers/test_console_message_manager.py |
Adds a unit test that pins the ERROR console message created by report_exception. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #5783 +/- ##
============================================
- Coverage 53.97% 53.97% -0.01%
+ Complexity 2787 2782 -5
============================================
Files 1100 1101 +1
Lines 42593 42591 -2
Branches 4584 4583 -1
============================================
- Hits 22989 22987 -2
- Misses 18267 18268 +1
+ Partials 1337 1336 -1
*This pull request uses carry forward flags. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
|
| config | throughput | MB/s | latency | max Δ latest / 7d | |
|---|---|---|---|---|---|
| 🔴 | bs=10 sw=10 sl=64 | 399 | 0.244 | 24,196/32,879/32,879 us | 🔴 +12.5% / 🟢 -6.0% |
| 🔴 | bs=100 sw=10 sl=64 | 926 | 0.565 | 106,005/148,419/148,419 us | 🔴 +22.5% / 🔴 +6.2% |
| ⚪ | bs=1000 sw=10 sl=64 | 1,095 | 0.668 | 916,141/956,530/956,530 us | ⚪ within ±5% / 🟢 -6.5% |
Baseline details
Latest main 2e0dd7c from same runner
| config | metric | PR | latest main | 7d avg | Δ latest | Δ 7d |
|---|---|---|---|---|---|---|
| bs=10 sw=10 sl=64 | throughput | 399 tuples/sec | 423 tuples/sec | 410.82 tuples/sec | -5.7% | -2.9% |
| bs=10 sw=10 sl=64 | MB/s | 0.244 MB/s | 0.258 MB/s | 0.251 MB/s | -5.4% | -2.7% |
| bs=10 sw=10 sl=64 | p50 | 24,196 us | 21,507 us | 23,785 us | +12.5% | +1.7% |
| bs=10 sw=10 sl=64 | p95 | 32,879 us | 35,689 us | 34,980 us | -7.9% | -6.0% |
| bs=10 sw=10 sl=64 | p99 | 32,879 us | 35,689 us | 34,980 us | -7.9% | -6.0% |
| bs=100 sw=10 sl=64 | throughput | 926 tuples/sec | 980 tuples/sec | 891.94 tuples/sec | -5.5% | +3.8% |
| bs=100 sw=10 sl=64 | MB/s | 0.565 MB/s | 0.598 MB/s | 0.544 MB/s | -5.5% | +3.8% |
| bs=100 sw=10 sl=64 | p50 | 106,005 us | 101,651 us | 112,277 us | +4.3% | -5.6% |
| bs=100 sw=10 sl=64 | p95 | 148,419 us | 121,154 us | 139,802 us | +22.5% | +6.2% |
| bs=100 sw=10 sl=64 | p99 | 148,419 us | 121,154 us | 139,802 us | +22.5% | +6.2% |
| bs=1000 sw=10 sl=64 | throughput | 1,095 tuples/sec | 1,110 tuples/sec | 1,041 tuples/sec | -1.4% | +5.2% |
| bs=1000 sw=10 sl=64 | MB/s | 0.668 MB/s | 0.677 MB/s | 0.635 MB/s | -1.3% | +5.1% |
| bs=1000 sw=10 sl=64 | p50 | 916,141 us | 904,823 us | 972,714 us | +1.3% | -5.8% |
| bs=1000 sw=10 sl=64 | p95 | 956,530 us | 949,962 us | 1,023,057 us | +0.7% | -6.5% |
| bs=1000 sw=10 sl=64 | p99 | 956,530 us | 949,962 us | 1,023,057 us | +0.7% | -6.5% |
Raw CSV
config_idx,batch_size,schema_width,string_len,num_batches,total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec,lat_p50_us,lat_p95_us,lat_p99_us
0,10,10,64,20,501.17,200,128000,399,0.244,24196.14,32879.10,32879.10
1,100,10,64,20,2159.35,2000,1280000,926,0.565,106004.84,148418.71,148418.71
2,1000,10,64,20,18260.94,20000,12800000,1095,0.668,916141.21,956530.29,956530.29Address Copilot review: the 4th tuple element from traceback.extract_tb (the source line text) is unused; bind it to _ to avoid an unused-local (F841).
…e#5783) Keep loop-feb's console_message_manager.py identical to apache#5783 so it drops cleanly from loop-feb's diff when that PR merges.
…nager Per review (Yicong): ConsoleMessageManager should only manage messages (its interfaces are all at the `msg: ConsoleMessage` level), not construct them. Move the ERROR-message construction out of ConsoleMessageManager.report_exception into a new factory core.util.console_message.error_message.create_error_console_message that returns a ConsoleMessage. DataProcessor builds via the factory and queues it with the manager's existing put_message, so the manager stays purely about message management. ConsoleMessageManager reverts to message-management-only.
Automated Reviewer SuggestionsBased on the
|
…le_message util apache#5783 moved error-message construction out of ConsoleMessageManager into core.util.console_message.error_message.create_error_console_message (per Yicong's review). Sync loop-feb's copies so they drop cleanly when apache#5783 merges: revert console_message_manager.py to message-management-only, add the util + its test, and route both callers (DataProcessor and the loop-condition path in main_loop.py) through create_error_console_message + put_message.
Per review (Yicong): cover more failure paths - - source points to the deepest (raising) frame, not the catch frame - chained `raise ... from ...` reports the active error with the full chain - an exception with no message uses the bare class name as the title - a missing traceback (exception never raised) is still reported The last case needs a small guard so the error reporter never itself throws on a degenerate exc_info; it now falls back to an empty source.
…che#5783) ### What changes were proposed in this PR? `DataProcessor` built the operator-facing ERROR console message for an uncaught UDF exception inline (`_report_exception`). This moves that construction into a small factory — `core.util.console_message.error_message.create_error_console_message(worker_id, exc_info) -> ConsoleMessage`. `DataProcessor` builds the message via the factory and queues it through the existing `ConsoleMessageManager.put_message`. Per review, `ConsoleMessageManager` stays purely about **message management** (its interfaces are all at the `msg: ConsoleMessage` level); message *construction* lives in the util. Behavior-preserving: the same ERROR `ConsoleMessage` is produced — | field | value | |---|---| | `msg_type` | `ConsoleMessageType.ERROR` | | `title` | the exception's final line (e.g. `ValueError: ...`) | | `message` | the full formatted traceback | | `source` | `module:func:line` of the raising frame | Centralizing the factory lets other uncaught-exception paths report identically (the loop operators' main-loop condition evaluation reuses it in a follow-up). ### Any related issues, documentation, discussions? Split out of apache#5700 (loop operators) to keep that PR focused; the refactor is independent and behavior-preserving on `main`. ### How was this PR tested? - New `test_error_message.py::test_builds_error_console_message_from_exc_info` pins the factory output (worker id, ERROR type, title, traceback body, `module:func:line` source) — written test-first. - The existing `test_data_processor.py` (asserts console messages after a UDF raises) still passes unchanged, confirming the delegation preserves behavior. - `cd amber && pytest -m "not integration"` on the affected files: 12 passed; `black --check` clean. ### Was this PR authored or co-authored using generative AI tooling? Co-authored with Claude Opus 4.8 in compliance with ASF.
What changes were proposed in this PR?
DataProcessorbuilt the operator-facing ERROR console message for an uncaught UDF exception inline (_report_exception). This moves that construction into a small factory —core.util.console_message.error_message.create_error_console_message(worker_id, exc_info) -> ConsoleMessage.DataProcessorbuilds the message via the factory and queues it through the existingConsoleMessageManager.put_message.Per review,
ConsoleMessageManagerstays purely about message management (its interfaces are all at themsg: ConsoleMessagelevel); message construction lives in the util.Behavior-preserving: the same ERROR
ConsoleMessageis produced —msg_typeConsoleMessageType.ERRORtitleValueError: ...)messagesourcemodule:func:lineof the raising frameCentralizing the factory lets other uncaught-exception paths report identically (the loop operators' main-loop condition evaluation reuses it in a follow-up).
Any related issues, documentation, discussions?
Split out of #5700 (loop operators) to keep that PR focused; the refactor is independent and behavior-preserving on
main.How was this PR tested?
test_error_message.py::test_builds_error_console_message_from_exc_infopins the factory output (worker id, ERROR type, title, traceback body,module:func:linesource) — written test-first.test_data_processor.py(asserts console messages after a UDF raises) still passes unchanged, confirming the delegation preserves behavior.cd amber && pytest -m "not integration"on the affected files: 12 passed;black --checkclean.Was this PR authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.8 in compliance with ASF.