-
Notifications
You must be signed in to change notification settings - Fork 217
PROTON-1442: [Cpp] Support for local transactions #437
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
74818fc to
eb4514b
Compare
3e96834 to
fa9236d
Compare
astitcher
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've taken a fairly thorough look. If you have any questions about these review items we should discuss further.
cpp/include/proton/transaction.hpp
Outdated
| class | ||
| PN_CPP_CLASS_EXTERN transaction_handler { | ||
| public: | ||
| PN_CPP_EXTERN virtual ~transaction_handler(); | ||
|
|
||
| /// Called when a local transaction is declared. | ||
| PN_CPP_EXTERN virtual void on_transaction_declared(transaction); | ||
|
|
||
| /// Called when a local transaction is discharged successfully. | ||
| PN_CPP_EXTERN virtual void on_transaction_committed(transaction); | ||
|
|
||
| /// Called when a local transaction is discharged unsuccessfully (aborted). | ||
| PN_CPP_EXTERN virtual void on_transaction_aborted(transaction); | ||
|
|
||
| /// Called when a local transaction declare fails. | ||
| PN_CPP_EXTERN virtual void on_transaction_declare_failed(transaction); | ||
|
|
||
| /// Called when the commit of a local transaction fails. | ||
| PN_CPP_EXTERN virtual void on_transaction_commit_failed(transaction); | ||
| }; | ||
|
|
||
| } // namespace proton | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be in a different header file like message and messaging_handler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if the API makes transactions hidden in a session then these callbacks either go away or instead become session callbacks: on_session_transaction_committed(session&), etc. I think transaction_declared() goes away entirely and it's purpose is now another use for on_session_open(session&). on_.._declare_failed should be handled by the on_session_error(session&).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated. Now they are session callbacks, transaction_declared() becomes handler->on_session.open() and on_declare_failed is now handled by on_session_error().
fa9236d to
347cf6e
Compare
astitcher
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've not reviewed this comprehensively, but I think I'm now very much leaning towards not making the transaction class visible to the API user at all and having the transaction methods on the session. This is essentially the API in JMS and CMS.
cpp/include/proton/transaction.hpp
Outdated
| class | ||
| PN_CPP_CLASS_EXTERN transaction_handler { | ||
| public: | ||
| PN_CPP_EXTERN virtual ~transaction_handler(); | ||
|
|
||
| /// Called when a local transaction is declared. | ||
| PN_CPP_EXTERN virtual void on_transaction_declared(transaction); | ||
|
|
||
| /// Called when a local transaction is discharged successfully. | ||
| PN_CPP_EXTERN virtual void on_transaction_committed(transaction); | ||
|
|
||
| /// Called when a local transaction is discharged unsuccessfully (aborted). | ||
| PN_CPP_EXTERN virtual void on_transaction_aborted(transaction); | ||
|
|
||
| /// Called when a local transaction declare fails. | ||
| PN_CPP_EXTERN virtual void on_transaction_declare_failed(transaction); | ||
|
|
||
| /// Called when the commit of a local transaction fails. | ||
| PN_CPP_EXTERN virtual void on_transaction_commit_failed(transaction); | ||
| }; | ||
|
|
||
| } // namespace proton | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if the API makes transactions hidden in a session then these callbacks either go away or instead become session callbacks: on_session_transaction_committed(session&), etc. I think transaction_declared() goes away entirely and it's purpose is now another use for on_session_open(session&). on_.._declare_failed should be handled by the on_session_error(session&).
cpp/examples/tx_recv.cpp
Outdated
| transaction.accept(d); | ||
| current_batch += 1; | ||
| if(current_batch == batch_size) { | ||
| transaction = proton::transaction(); // null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we should do rather commit here, that way it works as expected and the receiver is closed after expected number of messages received (with the current implementation it's not).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we should do rather commit here, that way it works as expected and the receiver is closed after expected number of messages received (with the current implementation it's not).
above mentioned change:
- transaction = proton::transaction(); // null
+ transaction.commit();
works as expected (against Artemis broker) when using multicast. However, when a pre-defined anycast queue is used, the commit() call ends with client segmentation fault.
cpp/examples/tx_recv.cpp
Outdated
|
|
||
| void on_message(proton::delivery &d, proton::message &msg) override { | ||
| std::cout<<"# MESSAGE: " << msg.id() <<": " << msg.body() << std::endl; | ||
| transaction.accept(d); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw. I'm thinking if transaction.accept() does actually have any effect? I mean, when I comment it out, the program behaves the same. btw. is this transaction accept any different from delivery accept? looks both settles the messages related to the delivery.
Does explicit delivery settling play role in transaction mode? or are they mutually exclusive and only commits and aborts applies? Can ie. single message in transaction be rejected while other messages accepted?
I tried to accept / reject the delivery and it seems to have no effect in transaction mode (while in python, doing so makes the messages to be threat outside of the transaction, just like a normal messages, https://issues.redhat.com/browse/ENTMQCL-513).
4c795ef to
eb8afbd
Compare
astitcher
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lopts of detailed comments here, but the main change I'd like to see is either make coordinator a subclass of target or fold it into a simple boolean option of target.
eb8afbd to
4c795ef
Compare
578445f to
6fd1434
Compare
cpp/include/proton/session.hpp
Outdated
| struct pn_session_t; | ||
|
|
||
| namespace proton { | ||
| class transaction_impl; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this actually needed here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
transaction_impl is pirely internal and should not be in an externally visible header file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
astitcher
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First batch of comments
cpp/include/proton/session.hpp
Outdated
| struct pn_session_t; | ||
|
|
||
| namespace proton { | ||
| class transaction_impl; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
transaction_impl is pirely internal and should not be in an externally visible header file
6eb58f4 to
2d440ef
Compare
3dcb6a7 to
f04bc43
Compare
f04bc43 to
ac1950f
Compare
DreamPearl
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rebased!
cpp/include/proton/transaction.hpp
Outdated
| class | ||
| PN_CPP_CLASS_EXTERN transaction_handler { | ||
| public: | ||
| PN_CPP_EXTERN virtual ~transaction_handler(); | ||
|
|
||
| /// Called when a local transaction is declared. | ||
| PN_CPP_EXTERN virtual void on_transaction_declared(transaction); | ||
|
|
||
| /// Called when a local transaction is discharged successfully. | ||
| PN_CPP_EXTERN virtual void on_transaction_committed(transaction); | ||
|
|
||
| /// Called when a local transaction is discharged unsuccessfully (aborted). | ||
| PN_CPP_EXTERN virtual void on_transaction_aborted(transaction); | ||
|
|
||
| /// Called when a local transaction declare fails. | ||
| PN_CPP_EXTERN virtual void on_transaction_declare_failed(transaction); | ||
|
|
||
| /// Called when the commit of a local transaction fails. | ||
| PN_CPP_EXTERN virtual void on_transaction_commit_failed(transaction); | ||
| }; | ||
|
|
||
| } // namespace proton | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated. Now they are session callbacks, transaction_declared() becomes handler->on_session.open() and on_declare_failed is now handled by on_session_error().
cpp/include/proton/transaction.hpp
Outdated
| PN_CPP_EXTERN void declare(); | ||
| PN_CPP_EXTERN void handle_outcome(proton::tracker); | ||
| PN_CPP_EXTERN proton::tracker send(proton::sender s, proton::message msg); | ||
| PN_CPP_EXTERN void accept(delivery &t); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
cpp/include/proton/session.hpp
Outdated
| struct pn_session_t; | ||
|
|
||
| namespace proton { | ||
| class transaction_impl; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
|
Running the transaction test under valgrind reveals some errors: Transaction Test Errors |
astitcher
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've made a bunch of suggested changes to this branch which address some of the comments here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
None of the changes in this file relate to transactions, I recommend removing all changes in this file:
The change is useful, but something unnecessary for a simple example, moving the code more in the direction of a generally useful utility. Having such a general receive utility would be good, but maybe should be in a different place.
| PN_CPP_EXTERN void transaction_declare(proton::messaging_handler &handler, bool settle_before_discharge = false); | ||
| PN_CPP_EXTERN bool transaction_is_declared(); | ||
| PN_CPP_EXTERN proton::binary transaction_id() const; | ||
| PN_CPP_EXTERN void transaction_commit(); | ||
| PN_CPP_EXTERN void transaction_abort(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need documentation for these APIs
| /// Set the target be of type coordinator. | ||
| /// This immediately override the currently assigned type. | ||
| PN_CPP_EXTERN target_options& make_coordinator(); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is needed to use the transaction API. If it's used in the implementation it needs to be purely internal. But I don't think it's actually used at all.
| #include "reconnect_options_impl.hpp" | ||
|
|
||
| #include "proton/work_queue.hpp" | ||
| #include "proton/session.hpp" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't see a reason this include should be needed by the new lines.
|
|
||
| #include <proton/connection.h> | ||
| #include <proton/delivery.h> | ||
| #include <proton/disposition.h> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this needed?
| state = State::FREE; | ||
| transaction_delete(session); | ||
| // on_transaction_declare_failed | ||
| handler->on_session_error(session); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use after free: handler is used after the transaction_impl is deleted! Also no need to set the state as the impl is just about to be deleted anyway.
| state = State::FREE; | ||
| transaction_delete(session); | ||
| handler->on_session_transaction_commit_failed(session); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use after free again
cpp/src/session.cpp
Outdated
| state = State::FREE; | ||
| transaction_delete(session); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for state change as deleted
| state = State::FREE; | ||
| transaction_delete(session); | ||
| handler->on_session_transaction_aborted(session); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use after free
| state = State::FREE; | ||
| transaction_delete(session); | ||
| handler->on_session_transaction_committed(session); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use after free
|
My suggested changes are in branch https://github.com/astitcher/qpid-proton/commits/local-transactions |
PROTON-1442
AMQP Transaction Sequence:
Client establishes link to Broker (Transaction resource) to target with transaction coordinator type (usual types are sender/receiver) (ATTACH frame)
Client (Transaction Controller) sends a special message to that link to create transaction (TRANSFER frame)
Broker returns a disposition with the transaction id (DISPOSITION frame)