-
Notifications
You must be signed in to change notification settings - Fork 184
W-11939773-scatterGatherEdits-duke #2388
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
dukesphere
wants to merge
17
commits into
v4.4
Choose a base branch
from
W-11939773-scatterGatherEdits-duke
base: v4.4
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
65e0b49
W-11939773-scatterGatherEdits-duke
dukesphere eac0773
Update modules/ROOT/pages/scatter-gather-concept.adoc
dukesphere f1db137
Update modules/ROOT/pages/scatter-gather-concept.adoc
dukesphere 756632b
Update modules/ROOT/pages/scatter-gather-concept.adoc
dukesphere 7fe5e53
W-11939773-scatterGatherEdits-duke
dukesphere 06d3c28
Update modules/ROOT/pages/scatter-gather-concept.adoc
dukesphere 381fc2b
W-11939773-scatterGatherEdits-duke
dukesphere 88ed6f6
W-11939773-scatterGatherEdits-duke
dukesphere 4b930ca
Update modules/ROOT/pages/scatter-gather-concept.adoc
dukesphere 2b5490f
W-11939773-scatterGatherEdits-duke
dukesphere 5a1771c
W-11939773-scatterGatherEdits-duke fix merge conflict
dukesphere 3ca33f3
W-11939773-scatterGatherEdits-duke add ex
dukesphere 33eb801
W-11939773-scatterGatherEdits-duke ex
dukesphere 6067884
W-11939773-scatterGatherEdits-duke ex
dukesphere db14177
W-11939773-scatterGatherEdits-duke ex
dukesphere fd17db9
W-11939773-scatterGatherEdits-duke ex
dukesphere 5595d6a
W-11939773-scatterGatherEdits-duke log error
dukesphere File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,11 +4,9 @@ include::_attributes.adoc[] | |
| endif::[] | ||
| :page-aliases: scatter-gather-xml-reference.adoc | ||
|
|
||
| The Scatter-Gather component is a routing event processor that processes a Mule event through different parallel processing routes that contain different event processors. Each route receives a reference to the Mule event and executes a sequence of one or more event processors. Each of these routes uses a separate thread to execute the event processors, and the resulting Mule event can be either the same Mule event without modifications or a new Mule event with its own payload, attributes, and variables. The Scatter-Gather component then combines the Mule events returned by each processing route into a new Mule event that is passed to the next event processor only after every route completes successfully. | ||
| The Scatter-Gather component (`<scatter-gather />`) is a routing event processor that processes a Mule event through different parallel processing routes that contain different event processors, which are connector operations or other Mule components. Each route receives a reference to the Mule event and executes a sequence of one or more event processors. Each of these routes uses a separate thread to execute the event processors on the input, and the resulting Mule event can be either the same Mule event without modifications or a new Mule event with its own payload, attributes, and variables. The Scatter-Gather component then combines the Mule events returned by each processing route into a new Mule event object that passes to the next event processor only after every route completes successfully. | ||
|
|
||
| The Scatter-Gather component executes each route in parallel, not sequentially. Parallel execution of routes can greatly increase the efficiency of your Mule application and may provide more information than sequential processing. | ||
|
|
||
| The Scatter-Gather component works with repeatable streams. It does not process nonrepeatable streams, which can be read only once before they are lost. By default, all streams are repeatable in Mule unless a component's streaming strategy is configured to be nonrepeatable. | ||
| The Scatter-Gather component executes each route in parallel, not sequentially. Parallel execution of routes can greatly increase the efficiency of your Mule application and can provide more information than sequential processing. | ||
|
|
||
| The following diagram details the behavior of the Scatter-Gather component: | ||
|
|
||
|
|
@@ -19,28 +17,118 @@ image::mruntime-scatter-gather.png[Diagram of Scatter-Gather Component] | |
| . Each of the processing routes starts executing in parallel. After all processors inside a route finish processing, the route returns a Mule event, which can be either the same Mule event without modifications or a new Mule event created by the processors in the route as a result of the modifications applied. | ||
| . After all processing routes have finished execution, the Scatter-Gather component creates a new Mule event that combines all resulting Mule events from each route, and then passes the new Mule event to the next component in the flow. | ||
|
|
||
| WARNING: Configure your Scatter-Gather component to have at least two routes; otherwise, your Mule application throws an exception and does not start. | ||
| WARNING: Configure your Scatter-Gather component to have at least two routes. Otherwise, your Mule application throws an exception and does not start. | ||
|
|
||
| To understand how scatter-gather processing of a payload occurs, assume that the component receives the message payload `"hello scatter gather"` from Set Payload (`<set-payload />`): | ||
|
|
||
| [source, xml] | ||
| ---- | ||
| <flow name="w-scatter-gather-exampleFlow" > | ||
| <!-- Scheduler --> | ||
| <scheduler doc:name="Scheduler" > | ||
| <scheduling-strategy > | ||
| <fixed-frequency frequency="75" timeUnit="SECONDS"/> | ||
| </scheduling-strategy> | ||
| </scheduler> | ||
| <!-- Set Payload -->> | ||
| <set-payload value="hello scatter gather" doc:name="Set Payload" /> | ||
| <!-- Scatter-Gather --> | ||
| <scatter-gather doc:name="Scatter-Gather" > | ||
| <!-- first route --> | ||
| <route > | ||
| <set-payload value='#[payload ++ " first route"]' doc:name="Set Payload" /> | ||
| </route> | ||
| <!-- second route --> | ||
| <route > | ||
| <set-payload value='#[payload ++ " second route"]' doc:name="Set Payload" /> | ||
| </route> | ||
| </scatter-gather> | ||
| <!-- Logger --> | ||
| <logger level="INFO" doc:name="Logger1" | ||
| message="#[%dw 2.0 output application/json --- payload]"/> | ||
| <!-- Logger --> | ||
| <logger level="INFO" doc:name="Logger2" | ||
| message='#[%dw 2.0 output application/json --- valuesOf(payload mapObject { ($$) : $}).payload]'/> | ||
| </flow> | ||
| ---- | ||
|
|
||
| * The first `<route />` within Scatter-Gather adds the string `" first route"` to the input `payload`. | ||
| * The second `<route />` within Scatter-Gather adds the string `" second route"` to the same input `payload`. | ||
|
|
||
| The first Logger after the component prints the output `payload` of Scatter-Gather, which is an object containing key-value pairs for each route. The key provides the index of the route (`0` or `1`), and the value provides a set of key-value pairs for the payload, attributes, and other properties of the event, _excluding_ Mule variables: | ||
|
|
||
| [source, JSON] | ||
| ---- | ||
| INFO ...LoggerMessageProcessor: | ||
| { | ||
| "0": { | ||
| "inboundAttachmentNames": [ | ||
|
|
||
| ], | ||
| "exceptionPayload": null, | ||
| "inboundPropertyNames": [ | ||
|
|
||
| ], | ||
| "outboundAttachmentNames": [ | ||
|
|
||
| ], | ||
| "payload": "hello scatter gather first route", | ||
| "outboundPropertyNames": [ | ||
|
|
||
| ], | ||
| "attributes": null | ||
| }, | ||
| "1": { | ||
| "inboundAttachmentNames": [ | ||
|
|
||
| ], | ||
| "exceptionPayload": null, | ||
| "inboundPropertyNames": [ | ||
|
|
||
| ], | ||
| "outboundAttachmentNames": [ | ||
|
|
||
| ], | ||
| "payload": "hello scatter gather second route", | ||
| "outboundPropertyNames": [ | ||
|
|
||
| ], | ||
| "attributes": null | ||
| } | ||
| } | ||
| ---- | ||
|
|
||
| To access and manipulate such a payload, you can use a DataWeave function such as xref:dataweave::dw-core-functions-mapobject.adoc[mapObject]. The second Logger in the example uses `valuesOf(payload mapObject { ($$) : $}).payload` to print the payloads of each key in the output object: | ||
|
|
||
| [source, JSON] | ||
| ---- | ||
| INFO ...LoggerMessageProcessor: | ||
| [ | ||
| "hello scatter gather first route", | ||
| "hello scatter gather second route" | ||
| ] | ||
| ---- | ||
|
|
||
| == Variable Propagation | ||
|
|
||
| Every route starts with the same initial variable values. Modifications to a variable within a specific route do not affect other routes. So, if a variable is added or modified in one route, then, after aggregation, the value is defined by that route. If a variable is added or modified by more than one route, the value is added to a list of all the values defined for that variable within all the routes, for example: | ||
| Every route starts with the same initial variable values. Modifications to a variable within a specific route do not affect other routes. So, if a variable is added or modified in one route, then, after aggregation, the value is defined by that route. If a variable is added or modified by more than one route, the value is added to an array of all the values defined for that variable within all the routes, for example: | ||
|
|
||
| [source,xml,linenums] | ||
| ---- | ||
| <set-variable variableName="var1" value="var1"/> | ||
| <set-variable variableName="var2" value="var2"/> | ||
| <scatter-gather doc:name="Scatter-Gather" doc:id="abc665e0-6119-4ecb-9f8b-52dbcbb1d488" > | ||
| <route > | ||
| <set-variable variableName="var2" value="newValue"/> | ||
| <route > | ||
| <set-variable variableName="var2" value="newValue"/> | ||
| <set-variable variableName="var3" value="appleVal"/> | ||
| </route> | ||
| <route > | ||
| <set-variable variableName="var3" value="bananaVal"/> | ||
| </route> | ||
| <route > | ||
| <set-variable variableName="var3" value="otherVal"/> | ||
| </route> | ||
| <route > | ||
| <set-variable variableName="var3" value="bananaVal"/> | ||
| </route> | ||
| <route > | ||
| <set-variable variableName="var3" value="otherVal"/> | ||
| <set-variable variableName="var4" value="val4"/> | ||
| </route> | ||
| </route> | ||
| </scatter-gather> | ||
| ---- | ||
|
|
||
|
|
@@ -50,17 +138,19 @@ After aggregation, the variables are: | |
|
|
||
| == Error Handling Inside Scatter-Gather Routes | ||
|
|
||
| You can use a Try scope in each route of a Scatter-Gather component to handle any errors that might be generated by a route’s event processors. After every route executes, if any has failed with an error, then the Scatter-Gather component throws an error of type `MULE:COMPOSITE_ROUTING`, and event processing does not proceed past the Scatter-Gather component in the flow. Instead, the flow branches to your error-handling event processors. | ||
| You can use a Try scope in each route of a Scatter-Gather component to handle any errors generated by a route’s event processors. After every route executes, if any failed with an error, the Scatter-Gather component throws an error of type `MULE:COMPOSITE_ROUTING`, and event processing does not proceed past the Scatter-Gather component in the flow. Instead, the flow branches to your error-handling event processors. | ||
|
|
||
| Because the `MULE:COMPOSITE_ROUTING` error object gathers not only errors from routes that failed but also Mule events from successfully completed routes, your application can use the error-handling event processors to process Mule events from the routes that completed | ||
| Because the `MULE:COMPOSITE_ROUTING` error object gathers not only errors from routes that failed but also Mule events from successfully completed routes, your application can use the error-handling event processors to process Mule events from the routes that completed. | ||
|
|
||
|
|
||
| To illustrate how this works, consider the following two cases: | ||
|
|
||
| * The routes in a Scatter-Gather component each contain a Try scope. + | ||
| One of the routes generates an error that is successfully handled by that route’s Try scope through an `on-error-continue` error handler, so the route is completed successfully. The Scatter-Gather component consolidates the Mule events from all routes into a new Mule event and passes the consolidated event to the next event processor. | ||
| * The routes in a Scatter-Gather component each contain a Try scope. | ||
| + | ||
| One of the routes generates an error that is successfully handled by that route’s Try scope through an `on-error-continue` error handler, so the route completes successfully. The Scatter-Gather component consolidates the Mule events from all routes into a new Mule event and passes the consolidated event to the next event processor. | ||
|
|
||
| * One of the routes in a Scatter-Gather component does not contain a Try scope or contains a Try scope with an error handler that cannot handle the error type, or the error handler is an `on-error-propagate` type. + | ||
| * One of the routes in a Scatter-Gather component does not contain a Try scope or contains a Try scope with an error handler that cannot handle the error type, or the error handler is an `on-error-propagate` type: | ||
| + | ||
| An error occurs in this route, causing the route to fail, which in turn causes the Scatter-Gather component to throw a `MULE:COMPOSITE_ROUTING` error. The flow branches to your error-handling event processors, which are able to process the Mule events from the completed routes. | ||
|
|
||
| Example of handling these errors: | ||
|
|
@@ -69,74 +159,88 @@ Example of handling these errors: | |
| <flow name="errorHandler"> | ||
| <scatter-gather> | ||
| <route> | ||
| <raise-error type="APP:MYERROR"/> | ||
| <raise-error type="APP:MYERROR" description="My Error Example"/> | ||
| </route> | ||
| <route> | ||
| <set-payload value="apple"/> | ||
| </route> | ||
| </scatter-gather> | ||
| <error-handler> | ||
| <on-error-continue type="MULE:COMPOSITE_ROUTING"> | ||
| <!-- This will have the error thrown by the first route --> | ||
| <on-error-continue type="MULE:COMPOSITE_ROUTING"> | ||
| <!-- Prints the error thrown by the first route --> | ||
| <logger level="WARN" message="#[error.errorMessage.payload.failures['0']]"/> | ||
| <!-- This will be a null value --> | ||
| <!-- Prints a null value --> | ||
| <logger level="WARN" message="#[error.errorMessage.payload.failures['1']]"/> | ||
|
|
||
| <!-- This will be a null value --> | ||
| <!-- Prints a null value --> | ||
| <logger level="WARN" message="#[error.errorMessage.payload.results['0']]"/> | ||
| <!-- This will have the result of the second (correctly executed) route --> | ||
| <!-- Prints the result of the second (correctly executed) route --> | ||
| <logger level="WARN" message="#[error.errorMessage.payload.results['1']]"/> | ||
|
|
||
| <!-- Prints the error type --> | ||
| <logger level="WARN" message="#[error.errorType.identifier]"/> | ||
| </on-error-continue> | ||
| </error-handler> | ||
| </flow> | ||
| ---- | ||
|
|
||
| == Handle Timeout Errors in a Scatter-Gather | ||
| The Logger output looks like this (edited for readability): | ||
|
|
||
| If you configure a timeout for a Scatter-Gather component and a route does not complete processing before the timeout expires, the route throws a `MULE:TIMEOUT` error. This error is then handled the same way as any other error generated from a route: after each route completes (either by processing success or by throwing a `MULE:TIMEOUT` error), the successful results and errors are collected together in the Scatter-Gather component `MULE:COMPOSITE_ROUTING` error, which is then processed in your configured error handler. | ||
| [source, logs] | ||
| ---- | ||
| WARN ...LoggerMessageProcessor: | ||
| org.mule.runtime.core.internal.message.ErrorBuilder$DeserializableErrorImplementation | ||
| { | ||
| description=My Error | ||
| detailedDescription=My Error | ||
| errorType=APP:MYERROR | ||
| cause=org.mule.runtime.api.exception.DefaultMuleException | ||
| errorMessage=- | ||
| suppressedErrors=[] | ||
| childErrors=[] | ||
| } | ||
| WARN ...LoggerMessageProcessor: null | ||
| WARN ...LoggerMessageProcessor: null | ||
| WARN ...LoggerMessageProcessor: | ||
| org.mule.runtime.core.internal.message.DefaultMessageBuilder$MessageImplementation | ||
| { | ||
| payload=apple | ||
| mediaType=*/* | ||
| attributes=<not set> | ||
| attributesMediaType=*/* | ||
| } | ||
| WARN ...LoggerMessageProcessor: COMPOSITE_ROUTING | ||
| ---- | ||
|
|
||
| == Example Project | ||
| In Anypoint Studio, you can download and open the example project _Scatter-Gather Flow Control_ from Anypoint Exchange to learn more about how to use the Scatter-Gather component. This example shows the usage of the scatter-gather control flow to aggregate data in parallel and return the result in JSON. | ||
| == Handle Timeout Errors in a Scatter-Gather | ||
|
|
||
| The example uses prepared data as input for two resources that should be aggregated. The data represents information about two contacts and has the following structure: | ||
| If you configure a timeout for a Scatter-Gather component and a route does not complete processing before the timeout expires, the route throws a `MULE:TIMEOUT` error. This error is then handled the same way as any other error generated from a route: after each route completes (either by processing successfully or by throwing a `MULE:TIMEOUT` error), the successful results and errors are collected into the Scatter-Gather component `MULE:COMPOSITE_ROUTING` error, which is then processed in your configured error handler. | ||
|
|
||
| |=== | ||
| |Resource|`firstname`|`surname`|`phone`|`email` | ||
| |contacts-1.csv | ||
| |John | ||
| |Doe | ||
| |096548763 | ||
| |john.doe@texasComp.com | ||
|
|
||
| |contacts-2.csv | ||
| |Jane | ||
| |Doe | ||
| |091558780 | ||
| |jane.doe@texasComp.com | ||
| |=== | ||
| == Repeatable Stream Input | ||
|
|
||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @IvanAndresFritzler This new section re repeatable streams replaces previous content from the introduction. |
||
| DataWeave is used to aggregate the data. The information about the contacts is aggregated to a JSON structure that represents data from both resources. | ||
| Repeatable streams are a Mule feature that enables a component to re-read a stream. Non-repeatable streams can be read only once before they are lost. By default, Mule components return a repeatable stream unless their streaming strategy is configured to be non-repeatable. | ||
|
|
||
| To download and open this example project while you are in Anypoint Studio, click the Exchange icon in the upper-left corner. Then, in the window that opens, log into Anypoint Exchange and search on the name of the project. | ||
| The Scatter-Gather component can process only repeatable streams and _cannot_ process a non-repeatable stream. If Scatter-Gather receives a non-repeatable stream as input from an upstream component that is configured with such a strategy, Scatter-Gather will not be able to process it. | ||
|
|
||
| == Scatter-Gather XML Reference | ||
|
|
||
| [%header%autowidth.spread,cols="a,a"] | ||
| |=== | ||
| |Element |Description | ||
| | `scatter-gather` |Sends a request message to multiple targets concurrently. It collects the responses from all routes, and aggregates them into a single message. | ||
| | `scatter-gather` |Sends a request message to multiple targets concurrently. This scope collects the responses from all routes and aggregates them into a single message. | ||
| 2+| *Attributes* | ||
| |`timeout` |Sets the timeout for responses from sent messages, in milliseconds. A value of 0 or lower than 0 means no timeout. | ||
| |`timeout` |Sets the timeout for responses from sent messages, in milliseconds. A value of 0 or less than 0 means no timeout. | ||
|
|
||
| | `maxConcurrency` |Determines the maximum number of concurrent routes to process. | ||
|
|
||
| | `maxConcurrency` |Determines the maximum amount of concurrent routes to process. + | ||
| By default all routes run in parallel. | ||
| * By default all routes run in parallel. | ||
|
|
||
| By setting this value to 1, scatter-gather processes the routes sequentially. | ||
| * By setting this value to 1, `scatter-gather` processes the routes sequentially. | ||
|
|
||
| | `target` | The name of the target variable. | ||
| | `target` | The name of the xref:target-variables.adoc[target variable]. | ||
|
|
||
| | `targetValue` | Value of the data to store in the target variable. + | ||
| If not set, the default value is `#[payload]`. + | ||
| | `targetValue` | Value of the data to store in the target variable. | ||
| If not set, the default value is `#[payload]`. | ||
| This field accepts any value that a variable accepts: | ||
|
|
||
| * Any supported data type. | ||
|
|
@@ -158,7 +262,8 @@ This field accepts any value that a variable accepts: | |
|
|
||
| == See Also | ||
|
|
||
| * xref:about-components.adoc[Core Components] | ||
| * xref:about-components.adoc[] | ||
| * xref:transaction-management.adoc#tx_scopes_routers[How Transactions Affect Scopes and Routers] | ||
| * xref:error-handling.adoc[Error Handling] | ||
| * xref:try-scope-concept.adoc[Try Scope] | ||
| * xref:error-handling.adoc[] | ||
| * xref:try-scope-concept.adoc[] | ||
| * xref:about-mule-event.adoc[] | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deleted
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@IvanAndresFritzler will check this
The issue is actually the empty On Error Propagate data that returns after calling the Raise Error component in the component.