Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ import org.apache.logging.log4j.LogManager
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.ActionRequest
import org.opensearch.action.delete.DeleteRequest
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.alerting.comments.CommentsIndices.Companion.ALL_COMMENTS_INDEX_PATTERN
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.suspendUntil
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.inject.Inject
import org.opensearch.common.settings.Settings
Expand All @@ -38,6 +38,8 @@ import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils
import org.opensearch.index.query.QueryBuilders
import org.opensearch.remote.metadata.client.SdkClient
import org.opensearch.remote.metadata.client.SearchDataObjectRequest
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
Expand All @@ -49,6 +51,7 @@ private val log = LogManager.getLogger(TransportDeleteAlertingCommentAction::cla
class TransportDeleteAlertingCommentAction @Inject constructor(
transportService: TransportService,
val client: Client,
val sdkClient: SdkClient,
actionFilters: ActionFilters,
val clusterService: ClusterService,
settings: Settings,
Expand Down Expand Up @@ -151,11 +154,15 @@ class TransportDeleteAlertingCommentAction @Inject constructor(
.version(true)
.seqNoAndPrimaryTerm(true)
.query(queryBuilder)
val searchRequest = SearchRequest()
.source(searchSourceBuilder)
val searchRequest = SearchDataObjectRequest.builder()
.searchSourceBuilder(searchSourceBuilder)
.indices(ALL_COMMENTS_INDEX_PATTERN)
.build()

val searchResponse: SearchResponse = sdkClient.suspendUntil {
searchDataObjectAsync(searchRequest).whenComplete(it)
}

val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) }
val comments = searchResponse.hits.map { hit ->
val xcp = XContentHelper.createParser(
NamedXContentRegistry.EMPTY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
package org.opensearch.alerting.transport

import org.apache.logging.log4j.LogManager
import org.opensearch.OpenSearchException
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.get.GetRequest
import org.opensearch.action.get.GetResponse
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
Expand All @@ -29,13 +29,17 @@ import org.opensearch.core.action.ActionListener
import org.opensearch.core.rest.RestStatus
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.index.IndexNotFoundException
import org.opensearch.remote.metadata.client.GetDataObjectRequest
import org.opensearch.remote.metadata.client.SdkClient
import org.opensearch.remote.metadata.common.SdkClientUtils
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import org.opensearch.transport.client.Client

class TransportGetWorkflowAction @Inject constructor(
transportService: TransportService,
val client: Client,
val sdkClient: SdkClient,
actionFilters: ActionFilters,
val xContentRegistry: NamedXContentRegistry,
val clusterService: ClusterService,
Expand All @@ -56,94 +60,97 @@ class TransportGetWorkflowAction @Inject constructor(
override fun doExecute(task: Task, getWorkflowRequest: GetWorkflowRequest, actionListener: ActionListener<GetWorkflowResponse>) {
val user = readUserFromThreadContext(client)

val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, getWorkflowRequest.workflowId)
val getRequest = GetDataObjectRequest.builder()
.index(ScheduledJob.SCHEDULED_JOBS_INDEX)
.id(getWorkflowRequest.workflowId)
.build()

if (!validateUserBackendRoles(user, actionListener)) {
return
}

client.threadPool().threadContext.stashContext().use {
client.get(
getRequest,
object : ActionListener<GetResponse> {
override fun onResponse(response: GetResponse) {
if (!response.isExists) {
log.error("Workflow with ${getWorkflowRequest.workflowId} not found")
actionListener.onFailure(
AlertingException.wrap(
OpenSearchStatusException(
"Workflow not found.",
RestStatus.NOT_FOUND
sdkClient.getDataObjectAsync(getRequest)
.whenComplete(
SdkClientUtils.wrapGetCompletion(object : ActionListener<GetResponse> {
override fun onResponse(response: GetResponse) {
if (!response.isExists) {
log.error("Workflow with ${getWorkflowRequest.workflowId} not found")
actionListener.onFailure(
AlertingException.wrap(
OpenSearchStatusException(
"Workflow not found.",
RestStatus.NOT_FOUND
)
)
)
)
return
}
return
}

var workflow: Workflow? = null
if (!response.isSourceEmpty) {
XContentHelper.createParser(
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
response.sourceAsBytesRef, XContentType.JSON
).use { xcp ->
val compositeMonitor = ScheduledJob.parse(xcp, response.id, response.version)
if (compositeMonitor is Workflow) {
workflow = compositeMonitor
} else {
log.error("Wrong monitor type returned")
actionListener.onFailure(
AlertingException.wrap(
OpenSearchStatusException(
"Workflow not found.",
RestStatus.NOT_FOUND
var workflow: Workflow? = null
if (!response.isSourceEmpty) {
XContentHelper.createParser(
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
response.sourceAsBytesRef, XContentType.JSON
).use { xcp ->
val compositeMonitor = ScheduledJob.parse(xcp, response.id, response.version)
if (compositeMonitor is Workflow) {
workflow = compositeMonitor
} else {
log.error("Wrong monitor type returned")
actionListener.onFailure(
AlertingException.wrap(
OpenSearchStatusException(
"Workflow not found.",
RestStatus.NOT_FOUND
)
)
)
)
return
}
return
}

// security is enabled and filterby is enabled
if (!checkUserPermissionsWithResource(
user,
workflow?.user,
actionListener,
"workflow",
getWorkflowRequest.workflowId
)
) {
return
// security is enabled and filterby is enabled
if (!checkUserPermissionsWithResource(
user,
workflow?.user,
actionListener,
"workflow",
getWorkflowRequest.workflowId
)
) {
return
}
}
}
}

actionListener.onResponse(
GetWorkflowResponse(
response.id,
response.version,
response.seqNo,
response.primaryTerm,
RestStatus.OK,
workflow
actionListener.onResponse(
GetWorkflowResponse(
response.id,
response.version,
response.seqNo,
response.primaryTerm,
RestStatus.OK,
workflow
)
)
)
}
}

override fun onFailure(t: Exception) {
log.error("Getting the workflow failed", t)
override fun onFailure(t: Exception) {
log.error("Getting the workflow failed", t)

if (t is IndexNotFoundException) {
actionListener.onFailure(
OpenSearchStatusException(
"Workflow not found",
RestStatus.NOT_FOUND
if (t is IndexNotFoundException || t is OpenSearchException && t.cause is IndexNotFoundException) {
actionListener.onFailure(
OpenSearchStatusException(
"Workflow not found",
RestStatus.NOT_FOUND
)
)
)
} else {
actionListener.onFailure(AlertingException.wrap(t))
} else {
actionListener.onFailure(AlertingException.wrap(t))
}
}
}
}
)
})
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@ import org.opensearch.common.settings.Settings
import org.opensearch.commons.alerting.util.AlertingException
import org.opensearch.core.action.ActionListener
import org.opensearch.core.rest.RestStatus
import org.opensearch.remote.metadata.client.SdkClient
import org.opensearch.remote.metadata.client.SearchDataObjectRequest
import org.opensearch.remote.metadata.common.SdkClientUtils
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import org.opensearch.transport.client.Client

class TransportSearchEmailAccountAction @Inject constructor(
transportService: TransportService,
val client: Client,
val sdkClient: SdkClient,
actionFilters: ActionFilters,
val clusterService: ClusterService,
settings: Settings
Expand Down Expand Up @@ -54,19 +58,24 @@ class TransportSearchEmailAccountAction @Inject constructor(
return
}

val searchDataObjectRequest = SearchDataObjectRequest.builder()
.indices(*searchRequest.indices())
.searchSourceBuilder(searchRequest.source())
.build()

client.threadPool().threadContext.stashContext().use {
client.search(
searchRequest,
object : ActionListener<SearchResponse> {
override fun onResponse(response: SearchResponse) {
actionListener.onResponse(response)
}
sdkClient.searchDataObjectAsync(searchDataObjectRequest)
.whenComplete(
SdkClientUtils.wrapSearchCompletion(object : ActionListener<SearchResponse> {
override fun onResponse(response: SearchResponse) {
actionListener.onResponse(response)
}

override fun onFailure(e: Exception) {
actionListener.onFailure(e)
}
}
)
override fun onFailure(e: Exception) {
actionListener.onFailure(e)
}
})
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@ import org.opensearch.common.settings.Settings
import org.opensearch.commons.alerting.util.AlertingException
import org.opensearch.core.action.ActionListener
import org.opensearch.core.rest.RestStatus
import org.opensearch.remote.metadata.client.SdkClient
import org.opensearch.remote.metadata.client.SearchDataObjectRequest
import org.opensearch.remote.metadata.common.SdkClientUtils
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import org.opensearch.transport.client.Client

class TransportSearchEmailGroupAction @Inject constructor(
transportService: TransportService,
val client: Client,
val sdkClient: SdkClient,
actionFilters: ActionFilters,
val clusterService: ClusterService,
settings: Settings
Expand Down Expand Up @@ -54,19 +58,22 @@ class TransportSearchEmailGroupAction @Inject constructor(
return
}

val searchDataObjectRequest = SearchDataObjectRequest.builder().indices(*searchRequest.indices())
.searchSourceBuilder(searchRequest.source()).build()

client.threadPool().threadContext.stashContext().use {
client.search(
searchRequest,
object : ActionListener<SearchResponse> {
override fun onResponse(response: SearchResponse) {
actionListener.onResponse(response)
}
sdkClient.searchDataObjectAsync(searchDataObjectRequest)
.whenComplete(
SdkClientUtils.wrapSearchCompletion(object : ActionListener<SearchResponse> {
override fun onResponse(response: SearchResponse) {
actionListener.onResponse(response)
}

override fun onFailure(e: Exception) {
actionListener.onFailure(e)
}
}
)
override fun onFailure(e: Exception) {
actionListener.onFailure(e)
}
})
)
}
}
}
Loading
Loading