2
2
3
3
import com .azure .cosmos .CosmosAsyncContainer ;
4
4
import com .azure .cosmos .CosmosAsyncDatabase ;
5
- import com .azure .cosmos .CosmosException ;
5
+ import com .azure .cosmos .implementation . NotFoundException ;
6
6
import com .azure .cosmos .models .*;
7
7
import com .fasterxml .jackson .databind .JsonNode ;
8
8
import com .fasterxml .jackson .databind .ObjectMapper ;
9
9
import com .genexus .db .Cursor ;
10
10
import com .genexus .db .ServiceCursorBase ;
11
11
import com .genexus .db .driver .GXConnection ;
12
+ import com .genexus .db .service .Query ;
12
13
import com .genexus .db .service .QueryType ;
13
14
import com .genexus .db .service .ServicePreparedStatement ;
14
15
import com .genexus .db .service .VarValue ;
27
28
import java .util .*;
28
29
import java .util .concurrent .CountDownLatch ;
29
30
30
- import java .util .concurrent .atomic .AtomicInteger ;
31
+ import java .util .concurrent .atomic .AtomicReference ;
31
32
import java .util .regex .Matcher ;
32
33
import java .util .regex .Pattern ;
33
34
import java .util .stream .Collectors ;
@@ -69,20 +70,19 @@ private int _executeQuery(CosmosDBResultSet resultSet) throws Exception {
69
70
{
70
71
case QUERY :
71
72
{
72
- if (querybyPK (query ) && keyCondition != null )
73
+ if (isQueryByPK (query ) && keyCondition != null )
73
74
{
74
- if (keyCondition .isEmpty () || !keyCondition .containsKey ("id" ) || !keyCondition .containsKey (query .getPartitionKey ())) {
75
- AtomicInteger statusCode = executeReadByPK (keyCondition .get ("id" ).toString (),keyCondition .get (query .getPartitionKey ()));
76
- if (statusCode != null && statusCode .get () == 404 )
75
+ if (keyCondition .containsKey ("id" ) && keyCondition .containsKey (query .getPartitionKey ())) {
76
+ int [] statusCode = new int [1 ];
77
+ resultSet .iterator = executeReadByPK (keyCondition .get ("id" ).toString (),keyCondition .get (query .getPartitionKey ()), statusCode );
78
+ if (statusCode != null && statusCode [0 ] == 404 )
77
79
return Cursor .EOF ;
80
+ return 0 ;
78
81
}
79
82
}
80
- else
81
- {
82
- String sqlQuery = CosmosDBHelper .createCosmosQuery (query , cursor , parms );
83
- resultSet .iterator = queryWithPaging (sqlQuery , new CosmosQueryRequestOptions ());
84
- return 0 ;
85
- }
83
+ String sqlQuery = CosmosDBHelper .createCosmosQuery (query , cursor , parms );
84
+ resultSet .iterator = queryWithPaging (sqlQuery , new CosmosQueryRequestOptions ());
85
+ return 0 ;
86
86
}
87
87
case INS :
88
88
{
@@ -140,39 +140,58 @@ private void getContainer(String containerName) throws SQLException {
140
140
container = getDatabase ().getContainer (containerName );
141
141
}
142
142
143
- private boolean querybyPK (CosmosDBQuery query ){
144
- if (query .filters .length > 0 )
143
+ private boolean isQueryByPK (CosmosDBQuery query ){
144
+ //Find out if the query is by PK and by equality
145
+ if (query .filters .length == 1 )
145
146
{
146
- String equalFilterPattern = "\\ ((.*) = :(.*)\\ ) and \\ ((.*) = :(.*)\\ )" ;
147
- //ToDo
147
+ String equalFilterPattern = "^\\ (\\ (id = :([a-zA-Z0-9]+):\\ ) and \\ (([a-zA-Z0-9]+) = :([a-zA-Z0-9]+):\\ )\\ )" ;
148
+ Matcher matcher = Pattern .compile (equalFilterPattern ).matcher (query .filters [0 ]);
149
+ if (matcher .matches ()) {
150
+ String attItem = matcher .group (2 );
151
+ String pkParmValue ;
152
+ if (attItem .equals (query .getPartitionKey ()))
153
+ {
154
+ pkParmValue = matcher .group (3 );
155
+ getVarValuesFromQuery (pkParmValue ,attItem ,query );
156
+
157
+ String idParmValue = matcher .group (1 );
158
+ getVarValuesFromQuery (idParmValue ,"id" ,query );
159
+ return true ;
160
+ }
161
+ }
148
162
}
149
163
return false ;
150
164
}
151
-
152
- private AtomicInteger executeReadByPK (String idValue , Object partitionKey ) throws Exception {
153
- // Read document by ID
154
- AtomicInteger statusCode = null ;
165
+ private Iterator <HashMap <String , Object >> executeReadByPK (String idValue , Object partitionKey , int [] statusCode ) throws Exception {
166
+ // Read document by ID
167
+ Iterator <HashMap <String , Object >> iterator = null ;
155
168
if (container != null ) {
156
- CountDownLatch latch = new CountDownLatch (1 );
169
+ AtomicReference <JsonNode > itemRef = new AtomicReference <>();
170
+ List <HashMap <String , Object >> hashMapList = new ArrayList <>();
157
171
158
- Mono <CosmosItemResponse <JsonNode >> itemResponseMono = container .readItem (idValue , toPartitionKey (partitionKey ), JsonNode .class );
172
+ try {
173
+ container .readItem (idValue , toPartitionKey (partitionKey ), new CosmosItemRequestOptions (), JsonNode .class )
174
+ .map (CosmosItemResponse ::getItem )
175
+ .doOnNext (item -> {
176
+ itemRef .set (item );
177
+ })
178
+ .block (); // Wait for the read to complete
159
179
160
- itemResponseMono .doOnSuccess ((response ) -> {
161
- latch .countDown (); // signal completion
162
- })
163
- .doOnError (Exception .class , exception -> {
164
- latch .countDown (); // signal completion
165
- logger .error (String .format ("Fail: %1" ,exception .getMessage ()));
166
- if (exception instanceof CosmosException && ((CosmosException ) exception ).getStatusCode () == 404 )
167
- statusCode .set (404 );
168
- }).subscribe ();
169
-
170
- latch .await (); // wait for completion
171
- }
172
- else {
173
- throw new Exception ("CosmosDB Insert By PK Execution failed. Container not found." );
180
+ JsonNode item = itemRef .get ();
181
+ HashMap <String , Object > pageResult = CosmosDBHelper .jsonNodeToHashMap (item );
182
+ hashMapList .add (pageResult );
183
+ if (!hashMapList .isEmpty ())
184
+ return iterator = hashMapList .iterator ();
185
+ else
186
+ return null ;
187
+ }
188
+ catch (NotFoundException ex ) {
189
+ statusCode [0 ] = 404 ;
190
+ }
174
191
}
175
- return statusCode ;
192
+ else
193
+ throw new Exception ("CosmosDB Read By PK Execution failed. Container not found." );
194
+ return null ;
176
195
}
177
196
private Iterator <HashMap <String , Object >> queryWithPaging (String sqlQuery , CosmosQueryRequestOptions options ) throws Exception {
178
197
@@ -216,7 +235,6 @@ private int[] deleteDocument(String idValue, Object partitionKey) throws Excepti
216
235
logger .debug (String .format ("Deleted document- id: %1 partitionkey: %2" ,idValue ,partitionKey .toString ()));
217
236
logger .debug (String .format ("Status Code: %1" ,itemResponse .getStatusCode ()));
218
237
statusCode [0 ] = itemResponse .getStatusCode ();
219
- //latch.countDown();
220
238
})
221
239
.doOnError (error -> {
222
240
//logger.error(String.format("Fail: %1",error.getMessage()));
@@ -240,7 +258,6 @@ private int[] createDocument(JSONObject jsonObject, Object partitionKey) throws
240
258
if (container != null ) {
241
259
ObjectMapper mapper = new ObjectMapper ();
242
260
String jsonStr = jsonObject .toString ();
243
- //Parse string to extract nulls
244
261
jsonStr = jsonStr .replaceAll ("\\ \" (null)\\ \" " , "$1" );
245
262
246
263
JsonNode jsonNode = mapper .readTree (jsonStr );
@@ -252,7 +269,6 @@ private int[] createDocument(JSONObject jsonObject, Object partitionKey) throws
252
269
logger .debug (String .format ("Inserted document: %1" ,response .getItem ().toString ()));
253
270
logger .debug (String .format ("Status Code: %1" ,response .getStatusCode ()));
254
271
statusCode [0 ] = response .getStatusCode ();
255
- //latch.countDown();
256
272
})
257
273
.doOnError (error -> {
258
274
//logger.error(String.format("Fail: %1",error.getMessage()));
@@ -289,7 +305,6 @@ private int[] replaceDocument(JSONObject jsonObject, String idValue , Object par
289
305
logger .debug (String .format ("Replaced document- id: %1 partitionkey: %2" ,idValue ,partitionKey .toString ()));
290
306
logger .debug (String .format ("Status Code: %1" ,itemResponse .getStatusCode ()));
291
307
statusCode [0 ] = itemResponse .getStatusCode ();
292
- //latch.countDown();
293
308
})
294
309
.doOnError (error -> {
295
310
//logger.error(String.format("Fail: %1",error.getMessage()));
@@ -307,6 +322,20 @@ private int[] replaceDocument(JSONObject jsonObject, String idValue , Object par
307
322
throw new Exception ("CosmosDB Replace Execution failed. Container not found." );
308
323
}
309
324
}
325
+ private void getVarValuesFromQuery (String varName , String name , Query query )
326
+ {
327
+ varName = varName .substring (1 , varName .length () - 1 );
328
+ String varNameM = ":" + varName ;
329
+ VarValue varValue = null ;
330
+ for (Map .Entry <String , VarValue > entry : query .getVars ().entrySet ()) {
331
+ if (entry .getKey ().toString ().equals (varNameM )) {
332
+ varValue = entry .getValue ();
333
+ break ;
334
+ }
335
+ }
336
+ if (varValue != null )
337
+ keyCondition .put (name , varValue .value );
338
+ }
310
339
private JSONObject setUpJsonPayload (boolean isUpdate ) throws JSONException , SQLException {
311
340
// Setup the json payload to execute the insert or update query.
312
341
@@ -370,7 +399,7 @@ private JSONObject setUpJsonPayload(boolean isUpdate) throws JSONException, SQLE
370
399
}
371
400
372
401
private PartitionKey toPartitionKey (Object value ) throws Exception {
373
- if (Double .class .isInstance (value )) //Double.valueOf(value) instanceof Double)
402
+ if (Double .class .isInstance (value ))
374
403
return new PartitionKey ((double )value );
375
404
if (value instanceof Boolean )
376
405
return new PartitionKey ((boolean )value );
0 commit comments