3838
3939class tx_send : public proton ::messaging_handler, proton::transaction_handler {
4040 private:
41- proton::sender sender;
41+ proton::sender sender;
4242 std::string url;
4343 int total;
4444 int batch_size;
4545 int sent;
4646 int batch_index = 0 ;
4747 int current_batch = 0 ;
4848 int committed = 0 ;
49- int confirmed = 0 ;
50-
51- proton::session session;
5249
5350 public:
5451 tx_send (const std::string &s, int c, int b):
@@ -59,35 +56,34 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler {
5956 }
6057
6158 void on_session_open (proton::session &s) override {
62- session = s;
63- std::cout << " [on_session_open] declare_txn started..." << std::endl;
59+ std::cout << " New session is open, declaring transaction now..." << std::endl;
6460 s.declare_transaction (*this );
65- std::cout << " [on_session_open] declare_txn ended..." << std::endl;
6661 }
6762
68- void on_transaction_declare_failed (proton::session) {}
63+ void on_transaction_declare_failed (proton::session s) {
64+ std::cout << " Transaction declarion failed" << std::endl;
65+ s.connection ().close ();
66+ exit (-1 );
67+ }
68+
6969 void on_transaction_commit_failed (proton::session s) {
70- std::cout << " Transaction Commit Failed " << std::endl;
70+ std::cout << " Transaction commit failed! " << std::endl;
7171 s.connection ().close ();
7272 exit (-1 );
7373 }
7474
7575 void on_transaction_declared (proton::session s) override {
76- std::cout << " [on_transaction_declared] Session: " << (&s)
77- << std::endl;
78- std::cout << " [on_transaction_declared] txn is_empty " << (s.txn_is_empty ())
79- << " \t " << std::endl;
80- send (sender);
76+ std::cout << " Transaction is declared" << std::endl;
77+ send ();
8178 }
8279
83- void on_sendable (proton::sender &s) override {
84- std::cout << " [OnSendable] session: " << &session
85- << std::endl;
86- send (s);
80+ void on_sendable (proton::sender&) override {
81+ send ();
8782 }
8883
89- void send (proton::sender &s ) {
84+ void send () {
9085 static int unique_id = 10000 ;
86+ proton::session session = sender.session ();
9187 while (session.txn_is_declared () && sender.credit () &&
9288 (committed + current_batch) < total) {
9389 proton::message msg;
@@ -96,47 +92,42 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler {
9692
9793 msg.id (unique_id++);
9894 msg.body (m);
99- std::cout << " ##### [example] transaction send msg: " << msg
100- << std::endl;
95+ std::cout << " Sending: " << msg << std::endl;
10196 session.txn_send (sender, msg);
10297 current_batch += 1 ;
10398 if (current_batch == batch_size)
10499 {
105- std::cout << " >> Txn attempt commit" << std::endl;
106100 if (batch_index % 2 == 0 ) {
101+ std::cout << " Commiting transaction..." << std::endl;
107102 session.txn_commit ();
108103 } else {
104+ std::cout << " Aborting transaction..." << std::endl;
109105 session.txn_abort ();
110- }
106+ }
111107 batch_index++;
112108 }
113109 }
114110 }
115111
116- void on_tracker_accept (proton::tracker &t) override {
117- confirmed += 1 ;
118- std::cout << " [example] on_tracker_accept:" << confirmed
119- << std::endl;
120- }
121-
122112 void on_transaction_committed (proton::session s) override {
123113 committed += current_batch;
124114 current_batch = 0 ;
125- std::cout<< " [OnTxnCommitted] Committed: " << committed << std::endl;
115+ std::cout << " Transaction commited " << std::endl;
126116 if (committed == total) {
127- std::cout << " All messages committed" << std::endl;
117+ std::cout << " All messages committed, closing connection. " << std::endl;
128118 s.connection ().close ();
129119 }
130120 else {
131- std::cout << " redlcaring txn " << std::endl;
132- session .declare_transaction (*this );
121+ std::cout << " Re-declaring transaction now... " << std::endl;
122+ s .declare_transaction (*this );
133123 }
134124 }
135125
136126 void on_transaction_aborted (proton::session s) override {
137- std::cout << " Meesages Aborted ....." << std::endl;
127+ std::cout << " Transaction aborted!" << std::endl;
128+ std::cout << " Re-delaring transaction now..." << std::endl;
138129 current_batch = 0 ;
139- session .declare_transaction (*this );
130+ s .declare_transaction (*this );
140131 }
141132
142133 void on_sender_close (proton::sender &s) override {
0 commit comments