Skip to content

Commit ec0c17a

Browse files
committed
Adding fetcher for plugin to pipeline mapping.
* Added a metadata fetcher to fetch mapping from CDAP metadata. Sample response from metadata API for a user plugin `Trash` and the pipeline `DataFusionQuickstart` ``` { "sort": "io.cdap.cdap.data2.metadata.dataset.SortInfo@74a24bc1", "offset": 0, "limit": 2147483647, "numCursors": 0, "total": 1, "results": [ { "metadataEntity": { "details": { "namespace": "default", "artifact": "trash-plugin", "version": "1.2.0", "type": "batchsink", "plugin": "Trash" }, "type": "plugin" }, "metadata": { "SYSTEM": { "properties": { "default:DataFusionQuickstart": "1" }, "tags": [] } } } ], "cursors": [], "showHidden": false, "entityScope": [ "USER" ] } ```
1 parent 0d72e99 commit ec0c17a

File tree

4 files changed

+365
-0
lines changed

4 files changed

+365
-0
lines changed
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.cdap.internal.app.upgrade;
18+
19+
import io.cdap.cdap.proto.id.ApplicationId;
20+
import io.cdap.cdap.proto.id.PluginId;
21+
import java.util.Objects;
22+
23+
/**
24+
* Mapping for the latest application version and its plugin.
25+
*/
26+
public class ApplicationPluginMapping {
27+
28+
// Version less application Id.
29+
private final ApplicationId applicationId;
30+
private final PluginId pluginId;
31+
32+
public ApplicationPluginMapping(ApplicationId applicationId, PluginId pluginId) {
33+
this.applicationId = applicationId;
34+
this.pluginId = pluginId;
35+
}
36+
37+
public ApplicationId getApplicationId() {
38+
return applicationId;
39+
}
40+
41+
public PluginId getPluginId() {
42+
return pluginId;
43+
}
44+
45+
@Override
46+
public boolean equals(Object o) {
47+
if (this == o) {
48+
return true;
49+
}
50+
if (!(o instanceof ApplicationPluginMapping)) {
51+
return false;
52+
}
53+
ApplicationPluginMapping that = (ApplicationPluginMapping) o;
54+
return Objects.equals(applicationId, that.applicationId) && Objects.equals(
55+
pluginId, that.pluginId);
56+
}
57+
58+
@Override
59+
public int hashCode() {
60+
return Objects.hash(applicationId, pluginId);
61+
}
62+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.cdap.internal.app.upgrade;
18+
19+
import io.cdap.cdap.proto.id.NamespaceId;
20+
import java.util.List;
21+
22+
/**
23+
* Fetcher for application to plugin mapping.
24+
*/
25+
public interface ApplicationPluginMappingFetcher {
26+
27+
/**
28+
* Fetches application to plugin mapping for a namespace.
29+
*
30+
* @param namespace the namespace for which mapping will be fetched.
31+
* @return a list of type {@link ApplicationPluginMapping}. An empty list will be returned if no
32+
* applications are found.
33+
* @throws Exception the exception during fetching of application plugin mapping.
34+
*/
35+
List<ApplicationPluginMapping> fetchApplicationPluginMapping(NamespaceId namespace)
36+
throws Exception;
37+
38+
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.cdap.internal.app.upgrade;
18+
19+
import io.cdap.cdap.api.metadata.MetadataEntity;
20+
import io.cdap.cdap.api.metadata.MetadataScope;
21+
import io.cdap.cdap.metadata.MetadataAdmin;
22+
import io.cdap.cdap.proto.id.ApplicationId;
23+
import io.cdap.cdap.proto.id.NamespaceId;
24+
import io.cdap.cdap.proto.id.PluginId;
25+
import io.cdap.cdap.spi.metadata.MetadataRecord;
26+
import io.cdap.cdap.spi.metadata.SearchRequest;
27+
import io.cdap.cdap.spi.metadata.SearchResponse;
28+
import java.util.ArrayList;
29+
import java.util.List;
30+
import java.util.Map;
31+
import javax.inject.Inject;
32+
33+
/**
34+
* Fetcher for application to plugin mapping from the stored metadata.
35+
*/
36+
public class MetadataApplicationPluginMappingFetcher implements ApplicationPluginMappingFetcher {
37+
38+
private static final String CONNECTOR_PROPERTY = "connector";
39+
private static final String PROPERTY_SEPARATOR = ":";
40+
41+
private final MetadataAdmin metadataAdmin;
42+
43+
@Inject
44+
public MetadataApplicationPluginMappingFetcher(MetadataAdmin metadataAdmin) {
45+
this.metadataAdmin = metadataAdmin;
46+
}
47+
48+
@Override
49+
public List<ApplicationPluginMapping> fetchApplicationPluginMapping(NamespaceId namespaceId)
50+
throws Exception {
51+
String namespace = namespaceId.getNamespace();
52+
List<ApplicationPluginMapping> results = new ArrayList<>();
53+
results.addAll(fromSearchResponse(
54+
metadataAdmin.search(buildSearchRequest(MetadataScope.USER, namespaceId)),
55+
namespace));
56+
results.addAll(
57+
fromSearchResponse(
58+
metadataAdmin.search(buildSearchRequest(MetadataScope.SYSTEM, namespaceId)),
59+
namespace));
60+
return results;
61+
}
62+
63+
private List<ApplicationPluginMapping> fromSearchResponse(SearchResponse response,
64+
String namespace) {
65+
List<ApplicationPluginMapping> results = new ArrayList<>();
66+
for (MetadataRecord result : response.getResults()) {
67+
MetadataEntity pluginEntity = result.getEntity();
68+
if (!MetadataEntity.PLUGIN.equalsIgnoreCase(pluginEntity.getType())) {
69+
continue;
70+
}
71+
PluginId pluginDetail = toPluginId(pluginEntity);
72+
String namespacePrefix = namespace + PROPERTY_SEPARATOR;
73+
// Pipelines are present as part of system scope.
74+
Map<String, String> pluginProperties = result.getMetadata()
75+
.getProperties(MetadataScope.SYSTEM);
76+
for (String propertyKey : pluginProperties.keySet()) {
77+
if (CONNECTOR_PROPERTY.equalsIgnoreCase(propertyKey) || !propertyKey
78+
.startsWith(namespacePrefix)) {
79+
continue;
80+
}
81+
String[] split = propertyKey.split(PROPERTY_SEPARATOR);
82+
results.add(
83+
new ApplicationPluginMapping(new ApplicationId(split[0], split[1]), pluginDetail));
84+
}
85+
}
86+
return results;
87+
}
88+
89+
private PluginId toPluginId(MetadataEntity pluginEntity) {
90+
return new PluginId(pluginEntity.getValue(MetadataEntity.NAMESPACE),
91+
pluginEntity.getValue(MetadataEntity.ARTIFACT),
92+
pluginEntity.getValue(MetadataEntity.VERSION),
93+
pluginEntity.getValue(MetadataEntity.PLUGIN),
94+
pluginEntity.getValue(MetadataEntity.TYPE));
95+
}
96+
97+
private SearchRequest buildSearchRequest(MetadataScope scope, NamespaceId namespaceId) {
98+
SearchRequest.Builder builder = SearchRequest.of("*").
99+
addType(MetadataEntity.PLUGIN);
100+
if (MetadataScope.SYSTEM == scope) {
101+
builder.addNamespace(scope.toString().toLowerCase());
102+
} else {
103+
builder.addNamespace(namespaceId.getNamespace());
104+
}
105+
return builder.build();
106+
}
107+
}
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.cdap.internal.app.upgrade;
18+
19+
import static org.mockito.ArgumentMatchers.eq;
20+
import static org.mockito.Mockito.times;
21+
import static org.mockito.Mockito.verify;
22+
import static org.mockito.Mockito.when;
23+
24+
import com.google.common.collect.ImmutableList;
25+
import com.google.common.collect.ImmutableMap;
26+
import io.cdap.cdap.api.metadata.MetadataEntity;
27+
import io.cdap.cdap.api.metadata.MetadataScope;
28+
import io.cdap.cdap.metadata.MetadataAdmin;
29+
import io.cdap.cdap.proto.id.ApplicationId;
30+
import io.cdap.cdap.proto.id.NamespaceId;
31+
import io.cdap.cdap.proto.id.PluginId;
32+
import io.cdap.cdap.spi.metadata.Metadata;
33+
import io.cdap.cdap.spi.metadata.MetadataRecord;
34+
import io.cdap.cdap.spi.metadata.SearchRequest;
35+
import io.cdap.cdap.spi.metadata.SearchResponse;
36+
import java.util.Arrays;
37+
import java.util.Collection;
38+
import java.util.Collections;
39+
import java.util.List;
40+
import org.junit.Assert;
41+
import org.junit.Before;
42+
import org.junit.Test;
43+
import org.junit.runner.RunWith;
44+
import org.junit.runners.Parameterized;
45+
import org.junit.runners.Parameterized.Parameters;
46+
import org.mockito.Mock;
47+
import org.mockito.MockitoAnnotations;
48+
49+
/**
50+
* Parameterized tests for {@link MetadataApplicationPluginMappingFetcher} using a parameter class.
51+
*/
52+
@RunWith(Parameterized.class)
53+
public class MetadataApplicationPluginMappingFetcherTest {
54+
55+
// A static inner class to hold the parameters for each test case
56+
static class TestCaseParams {
57+
58+
final String name;
59+
final SearchResponse userResponse;
60+
final SearchResponse systemResponse;
61+
final List<ApplicationPluginMapping> expectedMappings;
62+
63+
TestCaseParams(String name, SearchResponse userResponse, SearchResponse systemResponse,
64+
List<ApplicationPluginMapping> expectedMappings) {
65+
this.name = name;
66+
this.userResponse = userResponse;
67+
this.systemResponse = systemResponse;
68+
this.expectedMappings = expectedMappings;
69+
}
70+
}
71+
72+
@Parameters(name = "{index}: {0}")
73+
public static Collection<Object[]> data() {
74+
SearchResponse emptyResponse = new SearchResponse(SearchRequest.of("*").build(), null, 0,
75+
Integer.MAX_VALUE, 0, Collections.emptyList());
76+
77+
SearchResponse userPluginResponse = new SearchResponse(SearchRequest.of("*").build(), null, 0,
78+
Integer.MAX_VALUE, 0, ImmutableList.of(new MetadataRecord(
79+
MetadataEntity.builder().append("namespace", "default")
80+
.append("artifact", "trash-plugin")
81+
.append("version", "1.2.0")
82+
.append(MetadataEntity.TYPE, "batchsink")
83+
.appendAsType(MetadataEntity.PLUGIN, "trash")
84+
.build(),
85+
new Metadata(
86+
MetadataScope.SYSTEM, ImmutableMap.of("default:pipeline_1", "1")))));
87+
88+
SearchResponse systemPluginResponse = new SearchResponse(SearchRequest.of("*").build(), null, 0,
89+
Integer.MAX_VALUE, 0, ImmutableList.of(new MetadataRecord(
90+
MetadataEntity.builder().append("namespace", "system")
91+
.append("artifact", "google-cloud")
92+
.append("version", "0.24.0")
93+
.append(MetadataEntity.TYPE, "batchsource")
94+
.appendAsType(MetadataEntity.PLUGIN, "GCS")
95+
.build(),
96+
new Metadata(
97+
MetadataScope.SYSTEM, ImmutableMap.of("default:pipeline_1", "1")))));
98+
99+
ApplicationPluginMapping applicationSystemPluginMapping =
100+
new ApplicationPluginMapping(new ApplicationId("default", "pipeline_1"),
101+
new PluginId("system", "google-cloud", "0.24.0", "GCS", "batchsource"));
102+
ApplicationPluginMapping applicationUserPluginMapping =
103+
new ApplicationPluginMapping(new ApplicationId("default", "pipeline_1"),
104+
new PluginId("default", "trash-plugin", "1.2.0", "trash", "batchsink"));
105+
106+
return Arrays.asList(new Object[][]{
107+
{new TestCaseParams("No Plugin Mappings", emptyResponse, emptyResponse,
108+
Collections.emptyList())},
109+
{new TestCaseParams("Only System Plugin Mappings", emptyResponse, systemPluginResponse,
110+
ImmutableList.of(applicationSystemPluginMapping))},
111+
{new TestCaseParams("Only User Plugin Mappings", userPluginResponse, emptyResponse,
112+
ImmutableList.of(applicationUserPluginMapping))},
113+
{new TestCaseParams("Both Plugin Mappings present", userPluginResponse,
114+
systemPluginResponse,
115+
ImmutableList.of(applicationUserPluginMapping, applicationSystemPluginMapping))}
116+
});
117+
}
118+
119+
@Mock
120+
private MetadataAdmin metadataAdmin;
121+
122+
private MetadataApplicationPluginMappingFetcher mappingFetcher;
123+
124+
private final TestCaseParams params;
125+
126+
public MetadataApplicationPluginMappingFetcherTest(TestCaseParams params) {
127+
this.params = params;
128+
}
129+
130+
@Before
131+
public void setUp() {
132+
MockitoAnnotations.initMocks(this);
133+
mappingFetcher = new MetadataApplicationPluginMappingFetcher(metadataAdmin);
134+
}
135+
136+
@Test
137+
public void testFetchApplicationPluginMapping() throws Exception {
138+
// Arrange
139+
SearchRequest userPluginRequest = SearchRequest.of("*").addType("plugin")
140+
.addNamespace(NamespaceId.DEFAULT.getNamespace())
141+
.build();
142+
SearchRequest systemPluginRequest = SearchRequest.of("*").addType("plugin")
143+
.addNamespace(NamespaceId.SYSTEM.getNamespace()).build();
144+
145+
when(metadataAdmin.search(eq(userPluginRequest))).thenReturn(params.userResponse);
146+
when(metadataAdmin.search(eq(systemPluginRequest))).thenReturn(params.systemResponse);
147+
148+
// Act
149+
List<ApplicationPluginMapping> actual = mappingFetcher.fetchApplicationPluginMapping(
150+
NamespaceId.DEFAULT);
151+
152+
// Assert
153+
Assert.assertEquals(String.format("Test case '%s' failed.", params.name),
154+
params.expectedMappings, actual);
155+
verify(metadataAdmin, times(1)).search(userPluginRequest);
156+
verify(metadataAdmin, times(1)).search(systemPluginRequest);
157+
}
158+
}

0 commit comments

Comments
 (0)