Skip to content

Commit 4d3218f

Browse files
authored
Merge branch 'master' into ib/log-db
2 parents 0415296 + 4584624 commit 4d3218f

File tree

15 files changed

+305
-61
lines changed

15 files changed

+305
-61
lines changed

it/runtime-v2/src/test/java/com/walmartlabs/concord/it/runtime/v2/ConcordConfiguration.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@
99
* Licensed under the Apache License, Version 2.0 (the "License");
1010
* you may not use this file except in compliance with the License.
1111
* You may obtain a copy of the License at
12-
*
12+
*
1313
* http://www.apache.org/licenses/LICENSE-2.0
14-
*
14+
*
1515
* Unless required by applicable law or agreed to in writing, software
1616
* distributed under the License is distributed on an "AS IS" BASIS,
1717
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -60,7 +60,7 @@ public static ConcordRule configure() {
6060
.useLocalMavenRepository(true)
6161
.extraConfigurationSupplier(() -> """
6262
concord-agent {
63-
dependencyResolveTimeout = "5 seconds"
63+
dependencyResolveTimeout = "20 seconds"
6464
logMaxDelay = "250 milliseconds"
6565
pollInterval = "250 milliseconds"
6666
prefork {

it/runtime-v2/src/test/java/com/walmartlabs/concord/it/runtime/v2/ProcessIT.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import static com.walmartlabs.concord.it.common.ITUtils.randomString;
3636
import static com.walmartlabs.concord.it.runtime.v2.Utils.resourceToString;
3737
import static org.junit.jupiter.api.Assertions.*;
38+
import static org.junit.jupiter.api.Assertions.assertNotNull;
3839

3940
public class ProcessIT extends AbstractTest {
4041

@@ -196,6 +197,25 @@ public void testOutVariables() throws Exception {
196197
assertFalse(data.containsKey("z"));
197198
}
198199

200+
@Test
201+
public void testOutVariablesForFailedProcess() throws Exception {
202+
Payload payload = new Payload()
203+
.archive(resource("outForFailed"))
204+
.out("x", "y.some.boolean", "z");
205+
206+
ConcordProcess proc = concord.processes().start(payload);
207+
expectStatus(proc, ProcessEntry.StatusEnum.FAILED);
208+
209+
// ---
210+
211+
Map<String, Object> data = proc.getOutVariables();
212+
assertNotNull(data);
213+
214+
assertEquals(123, data.get("x"));
215+
assertEquals(true, data.get("y.some.boolean"));
216+
assertFalse(data.containsKey("z"));
217+
}
218+
199219
@Test
200220
public void testThrowWithPayload() throws Exception {
201221
Payload payload = new Payload()
@@ -729,6 +749,23 @@ public void testDryRunModeNotSupportedByScript() throws Exception {
729749
proc.assertLog(".*Error @ line: 6, col: 7. Dry-run mode is not supported for this 'script' step.*");
730750
}
731751

752+
@Test
753+
public void testThrowParallelWithPayload() throws Exception {
754+
Payload payload = new Payload()
755+
.archive(resource("parallelExceptionPayload"));
756+
757+
ConcordProcess proc = concord.processes().start(payload);
758+
expectStatus(proc, ProcessEntry.StatusEnum.FINISHED);
759+
760+
// ---
761+
Map<String, Object> data = proc.getOutVariables();
762+
List<Map<String, Object>> exceptions = (List<Map<String, Object>>) ConfigurationUtils.get(data, "exceptions");
763+
764+
assertNotNull(exceptions);
765+
assertEquals(List.of("BOOM1", "BOOM2"), exceptions.stream().map(e -> e.get("message")).toList());
766+
assertEquals(List.of(Map.of("key", 1), Map.of("key", 2)), exceptions.stream().map(e -> e.get("payload")).toList());
767+
}
768+
732769
private List<ProcessEventEntry> getProcessElementEvents(ConcordProcess proc) throws Exception {
733770
ProcessEventsApi processEventsApi = new ProcessEventsApi(concord.apiClient());
734771
return processEventsApi.listProcessEvents(proc.instanceId(), "ELEMENT", null, null, null, "pre", null, null);
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
configuration:
2+
runtime: "concord-v2"
3+
4+
flows:
5+
default:
6+
- set:
7+
x: 123
8+
y:
9+
some:
10+
nested: ["data", "in", "arrays"]
11+
boolean: true
12+
number: 234
13+
14+
- throw: BOOM
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
configuration:
2+
runtime: "concord-v2"
3+
out:
4+
- exceptions
5+
6+
flows:
7+
default:
8+
- try:
9+
- task: "throw"
10+
in:
11+
exception: "BOOM${item}"
12+
payload:
13+
key: "${item}"
14+
loop:
15+
items:
16+
- 1
17+
- 2
18+
mode: parallel
19+
parallelism: 2
20+
error:
21+
- set:
22+
exceptions: ${lastError.cause.exceptions.stream().map(e -> e.cause).toList()}

runtime/v2/runner-test/src/test/java/com/walmartlabs/concord/runtime/v2/runner/LogSegmentsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -625,7 +625,7 @@ public void callWithRetryTest() throws Exception {
625625
"(concord.yaml) @ line: 3, col: 7, thread: 0, flow: inner");
626626
assertSegmentStatusError(log, 1);
627627

628-
assertSystemSegment(log, "[WARN ] Last error: com.walmartlabs.concord.runtime.v2.sdk.UserDefinedException: FAIL. Waiting for 1000ms before retry (attempt #0)");
628+
assertSystemSegment(log, "[WARN ] Last error: FAIL. Waiting for 1000ms before retry (attempt #0)");
629629

630630
assertSegmentLog(log, 2, "[INFO ] in inner flow");
631631
assertSegmentStatusOk(log, 2);

runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/OutVariablesProcessor.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@
99
* Licensed under the Apache License, Version 2.0 (the "License");
1010
* you may not use this file except in compliance with the License.
1111
* You may obtain a copy of the License at
12-
*
12+
*
1313
* http://www.apache.org/licenses/LICENSE-2.0
14-
*
14+
*
1515
* Unless required by applicable law or agreed to in writing, software
1616
* distributed under the License is distributed on an "AS IS" BASIS,
1717
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,6 +20,7 @@
2020
* =====
2121
*/
2222

23+
import com.fasterxml.jackson.core.type.TypeReference;
2324
import com.fasterxml.jackson.databind.ObjectMapper;
2425
import com.walmartlabs.concord.runtime.v2.sdk.EvalContext;
2526
import com.walmartlabs.concord.runtime.v2.sdk.EvalContextFactory;
@@ -43,6 +44,9 @@
4344
*/
4445
public class OutVariablesProcessor implements ExecutionListener {
4546

47+
private static final TypeReference<Map<String, Object>> MAP_TYPE = new TypeReference<>() {
48+
};
49+
4650
private final ObjectMapper objectMapper;
4751
private final PersistenceService persistenceService;
4852
private final List<String> outVariables;
@@ -84,7 +88,13 @@ public void afterProcessEnds(Runtime runtime, State state, Frame lastFrame) {
8488
return;
8589
}
8690

91+
Map<String, Object> currentOut = persistenceService.loadPersistedFile(Constants.Files.OUT_VALUES_FILE_NAME,
92+
in -> objectMapper.readValue(in, MAP_TYPE));
93+
94+
Map<String, Object> result = new HashMap<>(currentOut != null ? currentOut : Collections.emptyMap());
95+
result.putAll(outValues);
96+
8797
persistenceService.persistFile(Constants.Files.OUT_VALUES_FILE_NAME,
88-
out -> objectMapper.writeValue(out, outValues));
98+
out -> objectMapper.writeValue(out, result));
8999
}
90100
}

runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/Runner.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,7 @@
2626
import com.walmartlabs.concord.runtime.common.injector.InstanceId;
2727
import com.walmartlabs.concord.runtime.v2.model.ProcessDefinition;
2828
import com.walmartlabs.concord.runtime.v2.runner.compiler.CompilerUtils;
29-
import com.walmartlabs.concord.runtime.v2.runner.vm.SaveLastErrorCommand;
30-
import com.walmartlabs.concord.runtime.v2.runner.vm.UpdateLocalsCommand;
31-
import com.walmartlabs.concord.runtime.v2.runner.vm.VMUtils;
29+
import com.walmartlabs.concord.runtime.v2.runner.vm.*;
3230
import com.walmartlabs.concord.runtime.v2.sdk.Compiler;
3331
import com.walmartlabs.concord.runtime.v2.sdk.ProcessConfiguration;
3432
import com.walmartlabs.concord.svm.*;
@@ -77,7 +75,7 @@ public ProcessSnapshot start(ProcessConfiguration processConfiguration, ProcessD
7775
// install the exception handler into the root frame
7876
// takes care of all unhandled errors bubbling up
7977
VMUtils.assertNearestRoot(state, state.getRootThreadId())
80-
.setExceptionHandler(new SaveLastErrorCommand());
78+
.setExceptionHandler(new BlockCommand(new SaveOutVariablesOnErrorCommand(), new SaveLastErrorCommand()));
8179

8280
VM vm = createVM(processDefinition);
8381
// update the global variables using the input map by running a special command

runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/remote/TaskCallEventRecordingListener.java

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.walmartlabs.concord.runtime.v2.model.Location;
2929
import com.walmartlabs.concord.runtime.v2.model.Step;
3030
import com.walmartlabs.concord.runtime.v2.runner.EventReportingService;
31+
import com.walmartlabs.concord.runtime.v2.runner.SensitiveDataHolder;
3132
import com.walmartlabs.concord.runtime.v2.runner.tasks.TaskCallEvent;
3233
import com.walmartlabs.concord.runtime.v2.runner.tasks.TaskCallListener;
3334
import com.walmartlabs.concord.runtime.v2.sdk.*;
@@ -64,7 +65,9 @@ public void onEvent(TaskCallEvent event) {
6465

6566
List<Object> inVars = event.input();
6667
if (inVars != null && eventConfiguration.recordTaskInVars()) {
67-
Map<String, Object> vars = maskVars(convertInput(hideSensitiveData(inVars, event.inputAnnotations())), eventConfiguration.inVarsBlacklist());
68+
Map<String, Object> input = convertInput(processSensitiveDataAnnotations(inVars, event.inputAnnotations()));
69+
input = processSensitiveData(input);
70+
Map<String, Object> vars = maskVars(input, eventConfiguration.inVarsBlacklist());
6871
if (eventConfiguration.truncateInVars()) {
6972
vars = ObjectTruncater.truncateMap(vars, eventConfiguration.truncateMaxStringLength(), eventConfiguration.truncateMaxArrayLength(), eventConfiguration.truncateMaxDepth());
7073
}
@@ -75,7 +78,9 @@ public void onEvent(TaskCallEvent event) {
7578

7679
Object outVars = event.result();
7780
if (outVars != null && eventConfiguration.recordTaskOutVars()) {
78-
Map<String, Object> vars = maskVars(asMapOrNull(outVars), eventConfiguration.outVarsBlacklist());
81+
Map<String, Object> output = asMapOrNull(outVars);
82+
output = processSensitiveData(output);
83+
Map<String, Object> vars = maskVars(output, eventConfiguration.outVarsBlacklist());
7984
if (eventConfiguration.truncateOutVars()) {
8085
vars = ObjectTruncater.truncateMap(vars, eventConfiguration.truncateMaxStringLength(), eventConfiguration.truncateMaxArrayLength(), eventConfiguration.truncateMaxDepth());
8186
}
@@ -86,7 +91,9 @@ public void onEvent(TaskCallEvent event) {
8691

8792
Object metaVars = event.meta();
8893
if (metaVars != null && eventConfiguration.recordTaskMeta()) {
89-
Map<String, Object> meta = maskVars(asMapOrNull(metaVars), eventConfiguration.metaBlacklist());
94+
Map<String, Object> rawMeta = asMapOrNull(metaVars);
95+
Map<String, Object> meta = processSensitiveData(rawMeta);
96+
meta = maskVars(meta, eventConfiguration.metaBlacklist());
9097
if (eventConfiguration.truncateMeta()) {
9198
meta = ObjectTruncater.truncateMap(meta, eventConfiguration.truncateMaxStringLength(), eventConfiguration.truncateMaxArrayLength(), eventConfiguration.truncateMaxDepth());
9299
}
@@ -170,6 +177,34 @@ static Map<String, Object> maskVars(Map<String, Object> vars, Collection<String>
170177
return result;
171178
}
172179

180+
@SuppressWarnings({"unchecked", "rawtypes"})
181+
static <T> T processSensitiveData(T v) {
182+
Set<String> sensitiveStrings = SensitiveDataHolder.getInstance().get();
183+
if (sensitiveStrings.isEmpty()) {
184+
return v;
185+
}
186+
187+
if (v instanceof String s) {
188+
for (String sensitiveString : sensitiveStrings) {
189+
s = s.replace(sensitiveString, MASK);
190+
}
191+
return (T) s;
192+
} else if (v instanceof List<?> l) {
193+
List<Object> result = new ArrayList<>(l.size());
194+
for (Object vv : l) {
195+
vv = processSensitiveData(vv);
196+
result.add(vv);
197+
}
198+
return (T) result;
199+
} else if (v instanceof Map m) {
200+
Map<String, Object> result = new HashMap<>(m);
201+
result.replaceAll((k, vv) -> processSensitiveData(vv));
202+
return (T) result;
203+
}
204+
205+
return v;
206+
}
207+
173208
@SuppressWarnings("unchecked")
174209
private static Map<String, Object> ensureModifiable(Map<String, Object> m, int depth, String[] path) {
175210
if (depth == 0) {
@@ -217,7 +252,7 @@ private static Map<String, Object> convertInput(List<Object> input) {
217252
return result;
218253
}
219254

220-
private static List<Object> hideSensitiveData(List<Object> input, List<List<Annotation>> annotations) {
255+
private static List<Object> processSensitiveDataAnnotations(List<Object> input, List<List<Annotation>> annotations) {
221256
if (annotations.isEmpty()) {
222257
return input;
223258
}

runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/LoggedException.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,9 @@ public String toString() {
4040
public String getMessage() {
4141
return getCause().getMessage();
4242
}
43+
44+
@Override
45+
public StackTraceElement[] getStackTrace() {
46+
return new StackTraceElement[0];
47+
}
4348
}

0 commit comments

Comments
 (0)