@@ -48,6 +48,8 @@ void dcp_step(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const void* cookie) {
48
48
(This approach seems safer than calling pthread_cancel()) */
49
49
static std::atomic<bool > stop_continuous_dcp_thread (false );
50
50
51
+ static bool wait_started (false );
52
+
51
53
struct SeqnoRange {
52
54
uint64_t start;
53
55
uint64_t end;
@@ -898,6 +900,85 @@ extern "C" {
898
900
}
899
901
}
900
902
903
+ /* DCP step thread that keeps running till it reads upto 'exp_mutations'.
904
+ Note: the exp_mutations is cumulative across all streams in the DCP
905
+ connection */
906
+ static void dcp_waiting_step (ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
907
+ const void *cookie, uint32_t opaque,
908
+ uint64_t exp_mutations)
909
+ {
910
+ bool done = false ;
911
+ size_t bytes_read = 0 ;
912
+ bool pending_marker_ack = false ;
913
+ uint64_t marker_end = 0 ;
914
+ uint64_t num_mutations = 0 ;
915
+ std::unique_ptr<dcp_message_producers> producers (get_dcp_producers (h, h1));
916
+
917
+ do {
918
+ if (bytes_read > 512 ) {
919
+ checkeq (ENGINE_SUCCESS,
920
+ h1->dcp .buffer_acknowledgement (h, cookie, ++opaque, 0 ,
921
+ bytes_read),
922
+ " Failed to get dcp buffer ack" );
923
+ bytes_read = 0 ;
924
+ }
925
+ ENGINE_ERROR_CODE err = h1->dcp .step (h, cookie, producers.get ());
926
+ if (err == ENGINE_DISCONNECT) {
927
+ done = true ;
928
+ } else {
929
+ switch (dcp_last_op) {
930
+ case PROTOCOL_BINARY_CMD_DCP_MUTATION:
931
+ bytes_read += dcp_last_packet_size;
932
+ if (pending_marker_ack && dcp_last_byseqno == marker_end) {
933
+ sendDcpAck (h, h1, cookie,
934
+ PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER,
935
+ PROTOCOL_BINARY_RESPONSE_SUCCESS,
936
+ dcp_last_opaque);
937
+ }
938
+ ++num_mutations;
939
+ break ;
940
+ case PROTOCOL_BINARY_CMD_DCP_STREAM_END:
941
+ done = true ;
942
+ bytes_read += dcp_last_packet_size;
943
+ break ;
944
+ case PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER:
945
+ if (dcp_last_flags & 8 ) {
946
+ pending_marker_ack = true ;
947
+ marker_end = dcp_last_snap_end_seqno;
948
+ }
949
+ bytes_read += dcp_last_packet_size;
950
+ break ;
951
+ case 0 :
952
+ /* No messages were ready on the last step call, so we
953
+ * wait till the conn is notified of new item.
954
+ * Note that we check for 0 because we clear the
955
+ * dcp_last_op value below.
956
+ */
957
+ testHarness.lock_cookie (cookie);
958
+ /* waitfor_cookie() waits on a condition variable. But
959
+ the api expects the cookie to be locked before
960
+ calling it */
961
+ wait_started = true ;
962
+ testHarness.waitfor_cookie (cookie);
963
+ testHarness.unlock_cookie (cookie);
964
+ break ;
965
+ default :
966
+ // Aborting ...
967
+ std::string err_string (" Unexpected DCP operation: " +
968
+ std::to_string (dcp_last_op));
969
+ check (false , err_string.c_str ());
970
+ }
971
+ if (num_mutations >= exp_mutations) {
972
+ done = true ;
973
+ }
974
+ dcp_last_op = 0 ;
975
+ }
976
+ } while (!done);
977
+
978
+ /* Do buffer ack of the outstanding bytes */
979
+ h1->dcp .buffer_acknowledgement (h, cookie, ++opaque, 0 , bytes_read);
980
+ }
981
+
901
982
// Testcases //////////////////////////////////////////////////////////////////
902
983
903
984
static enum test_result test_dcp_vbtakeover_no_stream (ENGINE_HANDLE *h,
@@ -1531,6 +1612,55 @@ static enum test_result test_dcp_consumer_noop(ENGINE_HANDLE *h,
1531
1612
return SUCCESS;
1532
1613
}
1533
1614
1615
+ static enum test_result test_dcp_producer_stream_req_open (ENGINE_HANDLE *h,
1616
+ ENGINE_HANDLE_V1 *h1)
1617
+ {
1618
+ const void *cookie = testHarness.create_cookie ();
1619
+ const int num_items = 3 ;
1620
+
1621
+ DcpStreamCtx ctx;
1622
+ ctx.vb_uuid = get_ull_stat (h, h1, " vb_0:0:id" , " failovers" );
1623
+ ctx.seqno = {0 , static_cast <uint64_t >(-1 )};
1624
+
1625
+ std::string name (" unittest" );
1626
+ TestDcpConsumer tdc (name.c_str (), cookie);
1627
+ tdc.addStreamCtx (ctx);
1628
+
1629
+ tdc.openConnection (h, h1);
1630
+
1631
+ /* Create a separate thread that does tries to get any DCP items */
1632
+ std::thread dcp_step_thread (dcp_waiting_step, h, h1, cookie, 0 , num_items);
1633
+
1634
+ /* We need to wait till the 'dcp_waiting_step' thread begins its wait */
1635
+ while (1 ) {
1636
+ /* Busy wait is ok here. To do a non busy wait we must use
1637
+ another condition variable which is an overkill here */
1638
+ testHarness.lock_cookie (cookie);
1639
+ if (wait_started) {
1640
+ testHarness.unlock_cookie (cookie);
1641
+ break ;
1642
+ }
1643
+ testHarness.unlock_cookie (cookie);
1644
+ }
1645
+
1646
+ /* Now create a stream */
1647
+ tdc.openStreams (h, h1);
1648
+
1649
+ /* Write items */
1650
+ write_items (h, h1, num_items, 0 );
1651
+ wait_for_flusher_to_settle (h, h1);
1652
+ verify_curr_items (h, h1, num_items, " Wrong amount of items" );
1653
+
1654
+ /* If the notification (to 'dcp_waiting_step' thread upon writing an item)
1655
+ mechanism is efficient, we must see the 'dcp_waiting_step' finish before
1656
+ test time out */
1657
+ dcp_step_thread.join ();
1658
+
1659
+ testHarness.destroy_cookie (cookie);
1660
+
1661
+ return SUCCESS;
1662
+ }
1663
+
1534
1664
static enum test_result test_dcp_producer_stream_req_partial (ENGINE_HANDLE *h,
1535
1665
ENGINE_HANDLE_V1 *h1) {
1536
1666
@@ -5460,6 +5590,7 @@ static enum test_result test_set_dcp_param(ENGINE_HANDLE *h,
5460
5590
const char *default_dbname = " ./ep_testsuite_dcp" ;
5461
5591
5462
5592
BaseTestCase testsuite_testcases[] = {
5593
+
5463
5594
TestCase (" test dcp vbtakeover stat no stream" ,
5464
5595
test_dcp_vbtakeover_no_stream, test_setup, teardown, nullptr ,
5465
5596
prepare, cleanup),
@@ -5507,6 +5638,16 @@ BaseTestCase testsuite_testcases[] = {
5507
5638
TestCase (" test dcp replica stream all" , test_dcp_replica_stream_all,
5508
5639
test_setup, teardown, " chk_remover_stime=1;max_checkpoints=2" ,
5509
5640
prepare, cleanup),
5641
+ TestCase (" test dcp producer stream open" ,
5642
+ test_dcp_producer_stream_req_open, test_setup, teardown,
5643
+ /* Expecting the connection manager background thread to notify
5644
+ the connection at its default time interval is not very
5645
+ efficent when we have items to be sent in a DCP stream.
5646
+ Hence increase the default time to very high value, so that
5647
+ the test fails if we are not doing a notification correctly
5648
+ */
5649
+ " connection_manager_interval=200000000" ,
5650
+ prepare, cleanup),
5510
5651
TestCase (" test producer stream request (partial)" ,
5511
5652
test_dcp_producer_stream_req_partial, test_setup, teardown,
5512
5653
/* set chk_period to essentially infinity so it won't run
0 commit comments