diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/sql/SqlQueryBuildIndexTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/sql/SqlQueryBuildIndexTest.java new file mode 100644 index 0000000000000..0eac0a39e8d3b --- /dev/null +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/sql/SqlQueryBuildIndexTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.sql; + +import java.util.LinkedHashMap; +import java.util.concurrent.CountDownLatch; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.cache.query.index.Index; +import org.apache.ignite.internal.cache.query.index.IndexName; +import org.apache.ignite.internal.cache.query.index.IndexProcessor; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** */ +@RunWith(Parameterized.class) +public class SqlQueryBuildIndexTest extends GridCommonAbstractTest { + /** */ + private static final String CACHE = "TEST_CACHE"; + + /** */ + private static final String TBL = "Person"; + + /** */ + private static final String IDX = "TEST_IDX"; + + /** */ + private static final int CNT = 10_000; + + /** */ + private boolean persistenceEnabled; + + /** */ + @Parameterized.Parameter + public String qryNode; + + /** */ + @Parameterized.Parameters(name = "qryNode={0}") + public static Object[] parameters() { + return new Object[] {"CRD", "CLN"}; + } + + /** + * {@inheritDoc} + */ + @Override protected void beforeTest() throws Exception { + cleanPersistenceDir(); + } + + /** + * {@inheritDoc} + */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + cleanPersistenceDir(); + } + + /** + * {@inheritDoc} + */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + QueryEntity qe = new QueryEntity(Long.class.getName(), Integer.class.getName()) + .setTableName(TBL) + .setKeyFieldName("id") + .setValueFieldName("fld") + .setFields(new LinkedHashMap<>(F.asMap("id", Long.class.getName(), "fld", Integer.class.getName()))); + + CacheConfiguration ccfg = new CacheConfiguration() + .setName(CACHE) + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + .setQueryEntities(F.asList(qe)) + .setBackups(1); + + cfg.setDataStorageConfiguration( + new DataStorageConfiguration() + .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(persistenceEnabled)) + ); + + cfg.setCacheConfiguration(ccfg); + + cfg.setBuildIndexThreadPoolSize(1); + + return cfg; + } + + /** */ + @Test + public void testConcurrentCreateIndex() throws Exception { + persistenceEnabled = true; + + Ignite crd = startGrids(3); + + crd.cluster().state(ClusterState.ACTIVE); + + IgniteCache cache = cache(); + + insertData(); + + CountDownLatch idxBuildGate = new CountDownLatch(1); + + grid(0).context().pools().buildIndexExecutorService().execute(() -> { + try { + idxBuildGate.await(); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + multithreadedAsync(() -> { + SqlFieldsQuery ddl = new SqlFieldsQuery("CREATE INDEX " + IDX + " ON " + TBL + "(fld)"); + cache.query(ddl).getAll(); + }, 1); + + IndexProcessor ip = grid(0).context().indexProcessor(); + + IndexName name = new IndexName(cache.getName(), CACHE, TBL.toUpperCase(), IDX); + + boolean seenBuilding = GridTestUtils.waitForCondition(() -> { + Index idx = ip.index(name); + return idx != null && idx.rebuildInProgress(); + }, 10_000); + + assertTrue("Index must exist and be in rebuild state", seenBuilding); + + String sql = "SELECT id FROM " + TBL + " USE INDEX(" + IDX + ") " + + "WHERE fld BETWEEN ? AND ? ORDER BY id"; + + SqlFieldsQuery q = new SqlFieldsQuery(sql).setArgs(0, CNT - 1); + + GridTestUtils.assertThrows(null, () -> { + cache.query(q).getAll(); + }, IgniteException.class, "Failed to parse query. Index \"" + IDX + "\" not found; SQL statement"); + + idxBuildGate.countDown(); + + crd.cache(CACHE).indexReadyFuture().get(); + + boolean done = GridTestUtils.waitForCondition(() -> { + Index idx = ip.index(name); + return idx != null && !idx.rebuildInProgress(); + }, 20_000); + + assertTrue("Build must finish", done); + + assertEquals(CNT, cache.query(new SqlFieldsQuery(sql).setArgs(0, CNT - 1)).getAll().size()); + } + + /** */ + private void insertData() { + try (IgniteDataStreamer streamer = grid(0).dataStreamer(CACHE)) { + for (int i = 0; i < CNT; i++) + streamer.addData((long)i, i); + } + } + + /** */ + private IgniteCache cache() throws Exception { + Ignite n = "CRD".equals(qryNode) ? grid(0) : startClientGrid(); + return n.cache(CACHE); + } + +} diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java index b9f1914f9e10d..b0d265d3e2124 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java +++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java @@ -24,6 +24,7 @@ import org.apache.ignite.internal.processors.query.calcite.jdbc.JdbcThinTransactionalSelfTest; import org.apache.ignite.internal.processors.query.calcite.message.CalciteCommunicationMessageSerializationTest; import org.apache.ignite.internal.processors.query.calcite.sql.SqlCustomParserTest; +import org.apache.ignite.internal.processors.query.calcite.sql.SqlQueryBuildIndexTest; import org.apache.ignite.internal.processors.query.calcite.sql.SqlReservedWordsTest; import org.apache.ignite.internal.processors.tx.SqlTransactionsIsolationTest; import org.apache.ignite.internal.processors.tx.SqlTransactionsUnsupportedModesTest; @@ -42,6 +43,7 @@ SqlCustomParserTest.class, SqlReservedWordsTest.class, + SqlQueryBuildIndexTest.class, LogicalRelImplementorTest.class, ScriptTestSuite.class, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexProcessor.java index 35ae189f8e7f1..317105338805c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexProcessor.java @@ -218,7 +218,15 @@ public Index createIndexDynamically( IndexDefinition definition, SchemaIndexCacheVisitor cacheVisitor ) { - Index idx = createIndex(cctx, factory, definition); + IndexFactory dynamicFactory = (gcctx, indexDefinition) -> { + Index idx = factory.createIndex(gcctx, indexDefinition); + + idx.markIndexRebuild(true); + + return idx; + }; + + Index idx = createIndex(cctx, dynamicFactory, definition); // Populate index with cache rows. cacheVisitor.visit(row -> { @@ -226,6 +234,8 @@ public Index createIndexDynamically( idx.onUpdate(null, row, false); }); + idx.markIndexRebuild(false); + return idx; } diff --git a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryBuildIndexTest.java b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryBuildIndexTest.java new file mode 100644 index 0000000000000..b4ede80b3bd2f --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryBuildIndexTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.query; + +import java.util.LinkedHashMap; +import java.util.concurrent.CountDownLatch; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.cache.query.index.Index; +import org.apache.ignite.internal.cache.query.index.IndexName; +import org.apache.ignite.internal.cache.query.index.IndexProcessor; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.between; + +/** */ +@RunWith(Parameterized.class) +public class IndexQueryBuildIndexTest extends GridCommonAbstractTest { + /** */ + private static final String CACHE = "TEST_CACHE"; + + /** */ + private static final String TBL = "Person"; + + /** */ + private static final String IDX = "TEST_IDX"; + + /** */ + private static final int CNT = 10_000; + + /** */ + private boolean persistenceEnabled; + + /** */ + @Parameterized.Parameter + public String qryNode; + + /** */ + @Parameterized.Parameters(name = "qryNode={0}") + public static Object[] parameters() { + return new Object[] { "CRD", "CLN" }; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + QueryEntity qe = new QueryEntity(Long.class.getName(), Integer.class.getName()) + .setTableName(TBL) + .setKeyFieldName("id") + .setValueFieldName("fld") + .setFields(new LinkedHashMap<>( + F.asMap("id", Long.class.getName(), "fld", Integer.class.getName())) + ); + + CacheConfiguration ccfg = new CacheConfiguration() + .setName(CACHE) + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + .setQueryEntities(F.asList(qe)) + .setBackups(1); + + cfg.setDataStorageConfiguration( + new DataStorageConfiguration().setDefaultDataRegionConfiguration( + new DataRegionConfiguration().setPersistenceEnabled(persistenceEnabled))); + + cfg.setCacheConfiguration(ccfg); + + cfg.setBuildIndexThreadPoolSize(1); + + return cfg; + } + + /** */ + @Test + public void testConcurrentCreateIndex() throws Exception { + persistenceEnabled = true; + + Ignite crd = startGrids(3); + + crd.cluster().state(ClusterState.ACTIVE); + + IgniteCache cache = cache(); + + insertData(); + + CountDownLatch idxBuild = new CountDownLatch(1); + + grid(0).context().pools().buildIndexExecutorService().execute(() -> { + try { + idxBuild.await(); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + multithreadedAsync(() -> { + SqlFieldsQuery idxCreate = new SqlFieldsQuery("create index " + IDX + " on " + TBL + "(fld)"); + + cache.query(idxCreate).getAll(); + }, 1); + + IndexProcessor ip = grid(0).context().indexProcessor(); + + IndexName name = new IndexName( + cache.getName(), CACHE, TBL.toUpperCase(), IDX + ); + + boolean seenBuilding = GridTestUtils.waitForCondition(() -> { + Index idx = ip.index(name); + + return idx != null && idx.rebuildInProgress(); + }, 10_000); + + assertTrue("Index must exist and be in rebuild state", seenBuilding); + + IndexQuery qry = new IndexQuery(Integer.class) + .setCriteria(between("fld", 0, CNT)); + + GridTestUtils.assertThrows(null, () -> { + cache.query(qry).getAll(); + }, IgniteException.class, "Failed to run IndexQuery due to index rebuild is in progress"); + + idxBuild.countDown(); + + crd.cache(CACHE).indexReadyFuture().get(); + + boolean done = GridTestUtils.waitForCondition(() -> { + Index idx = ip.index(name); + + return idx != null && !idx.rebuildInProgress(); + }, 20_000); + + assertTrue("Build must finish", done); + + assertEquals(CNT, cache.query(qry).getAll().size()); + } + + /** */ + private void insertData() { + try (IgniteDataStreamer streamer = grid(0).dataStreamer(CACHE)) { + for (int i = 0; i < CNT; i++) + streamer.addData((long)i, i); + } + } + + /** */ + private IgniteCache cache() throws Exception { + Ignite n = "CRD".equals(qryNode) ? grid(0) : startClientGrid(); + + return n.cache(CACHE); + } + +} diff --git a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java index ed392ae42c3a1..49609ddc40089 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java @@ -37,6 +37,7 @@ IndexQueryRangeTest.class, IndexQueryPartitionTest.class, IndexQueryRebuildIndexTest.class, + IndexQueryBuildIndexTest.class, IndexQueryCacheKeyValueFieldsTest.class, IndexQueryCacheKeyValueEscapedFieldsTest.class, IndexQueryCacheKeyValueTransformedFieldsTest.class,