-
Notifications
You must be signed in to change notification settings - Fork 233
Add CollaboratorSerialiser
middleware for serialisation/deserialisation between collaborator and aggregator client
#1476
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Conversation
The changes in tests mostly include moving tests between the now appropriate modules and/or renaming them. |
CollaboratorSerialiser
middleware for serialisation/deserialisation between collaborator and aggregator clientCollaboratorSerialiser
middleware for serialisation/deserialisation between collaborator and aggregator client
def serialise(self, tensor_key, nparray, lossless=True): | ||
"""Construct the NamedTensor Protobuf. | ||
def find_dependencies(self, tensor_key, send_model_deltas): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method has been moved to Collaborator
after removing these lines
if self.compression_pipeline.is_lossy():
new_tags = ("aggregated", "delta", "lossy_compressed")
else:
new_tags = ("aggregated", "delta", "compressed")
tensor_key_dependencies.append(
TensorKey(tensor_name, origin, round_number, report, new_tags)
)
As can be seen from the usage of the returned dependencies in the Collaborator.get_data_for_tensorkey
, only the first tensor dependency is used, which is always
TensorKey(tensor_name, origin, round_number - 1, report, tags)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that's quite right. When use_delta_updates
is set to true, there are two dependencies: the prior version of the model, and the delta that should be applied (either compressed with a lossless or lossy pipeline) to that model to bring it to the current version. The 2nd tensor dependency (tensor_dependency[1]
) is used later in the function that you reference here. If you're only seeing the first tensor dependency for every run, then what's likely happening is that db_store_rounds
for the collaborator is set to 1 (it needs to be set to 2 in order to retain the prior version of the model locally). Before these lines are removed, you should add tests for use_delta_updates=True
and use a lossy compression pipeline (such as this one).
write_logs=False, | ||
callbacks: Optional[List] = [], | ||
secure_aggregation=False, | ||
serialisation_middleware: CollaboratorSerialiser = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also consider removing client
completely from the collaborator since the middleware already has the client object.
The client object is currently used by the secure aggregation callback which could potentially also work with the middleware given the changes proposed in #1518
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On the contrary, I'm wondering whether the collaborator shouldn't just have a client
object, while any "middlewares" are added as decorators to the AggregatorGRPCClient
class. Those decorators could receive certain configurations via the FL plan (such as the compression algo).
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see your point, I hadn't considered using decorators here!
I went with "middleware" approach as I feel it is easier to "chain" - multiple layers can be used and each of them can be easily added/removed without affecting any of the other layers.
With the middleware approach, the only change required to switch between gRPC and REST would be in how the client is initialized. Similarly, in the future if we decide to make changes to the compression pipeline or such, we can look to make changes only the serialisation layer.
Gotta admit, I’m a bit biased toward this approach 😁 but totally open to other perspectives!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am leaning towards having entire approach encapsulated within the client (@teoparvanov's approach is one of the ways to do that).
The current implementation removes compression pipeline from the collaborator and it adds a new serializer object. From an abstraction standpoint it would be worth asking: does a collaborator need to know about serialization? Having it all in the client marks that separation very well.
I see the benefit in your approach of having a middleware when a different use case leverages it. Could you list any planned features that may benefit from this approach?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My original point was that, once we have sorted out the secure aggregation serialisation, we can fully replace the client object with this proposed class object.
A key argument for keeping this logic separate from the client is the potential addition of a REST client. With the approach proposed in this PR, the serialiser would simply reference the enabled transport client, REST or gRPC.
An argument can be made for going with the decorator approach as mentioned by Teo but in my opinion, the middleware approach provides us more flexibility.
# def test_find_dependencies(collaborator_mock, tensor_key): | ||
# """Test that find_dependencies works correctly.""" | ||
# collaborator_mock.use_delta_updates = True | ||
# tensor_name, origin, round_number, report, tags = tensor_key | ||
# round_number = 2 | ||
# tensor_key = TensorKey( | ||
# tensor_name, origin, round_number, report, ('model',) | ||
# ) | ||
# tensor_key_dependencies = collaborator_mock._find_dependencies(tensor_key) | ||
|
||
# assert len(tensor_key_dependencies) == 2 | ||
# tensor_key_dependency_0, tensor_key_dependency_1 = tensor_key_dependencies | ||
# assert tensor_key_dependency_0.round_number == round_number - 1 | ||
# assert tensor_key_dependency_0.tags == tensor_key.tags | ||
# assert tensor_key_dependency_1.tags == ('aggregated', 'delta', 'compressed') | ||
|
||
|
||
# def test_find_dependencies_is_lossy(collaborator_mock, tensor_key): | ||
# """Test that find_dependencies works correctly with lossy_compressed.""" | ||
# collaborator_mock.use_delta_updates = True | ||
# collaborator_mock.compression_pipeline.is_lossy = mock.Mock(return_value=True) | ||
# tensor_name, origin, round_number, report, tags = tensor_key | ||
# round_number = 2 | ||
# tensor_key = TensorKey( | ||
# tensor_name, origin, round_number, report, ('model',) | ||
# ) | ||
# tensor_key_dependencies = collaborator_mock._find_dependencies(tensor_key) | ||
|
||
# assert len(tensor_key_dependencies) == 2 | ||
# tensor_key_dependency_0, tensor_key_dependency_1 = tensor_key_dependencies | ||
# assert tensor_key_dependency_0.round_number == round_number - 1 | ||
# assert tensor_key_dependency_0.tags == tensor_key.tags | ||
# assert tensor_key_dependency_1.tags == ('aggregated', 'delta', 'lossy_compressed') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests are commented out and planned to be removed, refer #1476 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @theakshaypant , thanks for starting this effort.
To begin with, could you add a "why" section in the PR description? In addition to the obvious encapsulation benefits, what framework capabilities do you expect this refactoring to enable down the road?
Additional comments based on the code itself:
write_logs=False, | ||
callbacks: Optional[List] = [], | ||
secure_aggregation=False, | ||
serialisation_middleware: CollaboratorSerialiser = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On the contrary, I'm wondering whether the collaborator shouldn't just have a client
object, while any "middlewares" are added as decorators to the AggregatorGRPCClient
class. Those decorators could receive certain configurations via the FL plan (such as the compression algo).
WDYT?
from openfl.pipelines import TensorCodec | ||
|
||
|
||
class CollaboratorSerialiser: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class indeed looks like a decorator of AggregatorGRPCClient
, as it implements the same interface, but adds some orthogonal aspects like encoding/decoding via the tensor codec.
@teoparvanov Thanks for the comments! Updated the PR description. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't gotten to the point of reviewing the architectural changes proposed, but I see some changes that appear to break the delta update functionality. I see some positive steps taken overall - reducing code within the collaborator is always a good thing. 2nd half of the review to come tomorrow.
def serialise(self, tensor_key, nparray, lossless=True): | ||
"""Construct the NamedTensor Protobuf. | ||
def find_dependencies(self, tensor_key, send_model_deltas): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that's quite right. When use_delta_updates
is set to true, there are two dependencies: the prior version of the model, and the delta that should be applied (either compressed with a lossless or lossy pipeline) to that model to bring it to the current version. The 2nd tensor dependency (tensor_dependency[1]
) is used later in the function that you reference here. If you're only seeing the first tensor dependency for every run, then what's likely happening is that db_store_rounds
for the collaborator is set to 1 (it needs to be set to 2 in order to retain the prior version of the model locally). Before these lines are removed, you should add tests for use_delta_updates=True
and use a lossy compression pipeline (such as this one).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I understand the motivation behind adding a new component, this adds significant LOC for what is effectively adding deserialize/serialize
calls into each RPC.
IMO let us migrate those calls to grpc client to land a minimum API change version.
When we realize the need for a serializer to do more than what is being done today, I clearly see value in having a dedicated component that forwards across RPC methods like we do here.
write_logs=False, | ||
callbacks: Optional[List] = [], | ||
secure_aggregation=False, | ||
serialisation_middleware: CollaboratorSerialiser = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am leaning towards having entire approach encapsulated within the client (@teoparvanov's approach is one of the ways to do that).
The current implementation removes compression pipeline from the collaborator and it adds a new serializer object. From an abstraction standpoint it would be worth asking: does a collaborator need to know about serialization? Having it all in the client marks that separation very well.
I see the benefit in your approach of having a middleware when a different use case leverages it. Could you list any planned features that may benefit from this approach?
* Changes to reduce unnecessary allocations Signed-off-by: Shah, Karan <[email protected]> * Remove compression/decompression for _prepare_trained Signed-off-by: Shah, Karan <[email protected]> * Execute SendLocalTaskResults in a queued call Signed-off-by: Shah, Karan <[email protected]> * Update test with the remove col_name argument Signed-off-by: Shah, Karan <[email protected]> * Use allclose instead of equal Signed-off-by: Shah, Karan <[email protected]> * Revert "Remove compression/decompression for _prepare_trained" This reverts commit cb0adeb. Signed-off-by: Shah, Karan <[email protected]> * Move lock decorator to common Signed-off-by: Shah, Karan <[email protected]> --------- Signed-off-by: Shah, Karan <[email protected]> Signed-off-by: Pant, Akshay <[email protected]>
…update utilities for delta operations Signed-off-by: Pant, Akshay <[email protected]>
* setuptools upgrade pr Signed-off-by: payalcha <[email protected]> * setuptools upgrade pr Signed-off-by: payalcha <[email protected]> --------- Signed-off-by: payalcha <[email protected]> Signed-off-by: Pant, Akshay <[email protected]>
…aml file (securefederatedai#1479) * Taking latest nbdev as previous giving error Signed-off-by: Chaurasiya, Payal <[email protected]> * Taking latest nbdev as previous giving error Signed-off-by: Chaurasiya, Payal <[email protected]> * Security TLS certificate report tests Signed-off-by: Chaurasiya, Payal <[email protected]> * Security TLS certificate report tests Signed-off-by: Chaurasiya, Payal <[email protected]> * FedEval fix Signed-off-by: Chaurasiya, Payal <[email protected]> * Small change Signed-off-by: Chaurasiya, Payal <[email protected]> * Logs correction Signed-off-by: Chaurasiya, Payal <[email protected]> * Format changes Signed-off-by: Chaurasiya, Payal <[email protected]> * To streamline Collaborator yaml for torch/mnist with all other cols.yaml file Signed-off-by: Chaurasiya, Payal <[email protected]> * change tp fpdf2 from fpdf Signed-off-by: Chaurasiya, Payal <[email protected]> * change tp fpdf2 from fpdf Signed-off-by: Chaurasiya, Payal <[email protected]> * review comments Signed-off-by: Chaurasiya, Payal <[email protected]> * Update taskrunner tutorial Signed-off-by: payalcha <[email protected]> --------- Signed-off-by: Chaurasiya, Payal <[email protected]> Signed-off-by: payalcha <[email protected]> Co-authored-by: Noopur <[email protected]> Signed-off-by: Pant, Akshay <[email protected]>
…curefederatedai#1470) * chore: update openfl version to 1.8 (securefederatedai#1464) Signed-off-by: Pant, Akshay <[email protected]> * fix(secagg): correct module name to import module (securefederatedai#1467) Signed-off-by: Pant, Akshay <[email protected]> * fix(secagg): correct module name to import module (securefederatedai#1466) Signed-off-by: Pant, Akshay <[email protected]> Signed-off-by: kta-intel <[email protected]> * update README.md Signed-off-by: kta-intel <[email protected]> * 1.8: Raise error if data is not found when running flower-app-pytorch workspace (securefederatedai#1473) * fix(secagg): correct module name to import module (securefederatedai#1466) Signed-off-by: Pant, Akshay <[email protected]> Signed-off-by: kta-intel <[email protected]> * raise error if data is not found Signed-off-by: kta-intel <[email protected]> --------- Signed-off-by: Pant, Akshay <[email protected]> Signed-off-by: kta-intel <[email protected]> Co-authored-by: Akshay Pant <[email protected]> * add fqdn flag to aggregator commands, fix auto_shutdown key, add additional notes to experiment configuration Signed-off-by: kta-intel <[email protected]> * restructure: remove trailing whitespace (securefederatedai#1483) Signed-off-by: Pant, Akshay <[email protected]> --------- Signed-off-by: Pant, Akshay <[email protected]> Signed-off-by: kta-intel <[email protected]> Signed-off-by: Pant, Akshay <[email protected]> Co-authored-by: Akshay Pant <[email protected]> Co-authored-by: Noopur <[email protected]> Signed-off-by: Pant, Akshay <[email protected]>
…rkspace (securefederatedai#1472) * fix(secagg): correct module name to import module (securefederatedai#1466) Signed-off-by: Pant, Akshay <[email protected]> Signed-off-by: kta-intel <[email protected]> * raise error if data is not found Signed-off-by: kta-intel <[email protected]> --------- Signed-off-by: Pant, Akshay <[email protected]> Signed-off-by: kta-intel <[email protected]> Co-authored-by: Akshay Pant <[email protected]> Co-authored-by: Noopur <[email protected]> Signed-off-by: Pant, Akshay <[email protected]>
…tedai#1485) Signed-off-by: noopur <[email protected]> Signed-off-by: Pant, Akshay <[email protected]>
Signed-off-by: payalcha <[email protected]> Signed-off-by: Pant, Akshay <[email protected]>
Signed-off-by: Teodor Parvanov <[email protected]> Signed-off-by: Pant, Akshay <[email protected]>
…1494) * Test Flower automation Signed-off-by: noopur <[email protected]> * Run only once Signed-off-by: noopur <[email protected]> * Added Flower as separate job Signed-off-by: noopur <[email protected]> * Extra closing bracket Signed-off-by: noopur <[email protected]> * Removed dependencies installation step Signed-off-by: noopur <[email protected]> * Correction of step name and logging Signed-off-by: noopur <[email protected]> * New marker for flower model Signed-off-by: noopur <[email protected]> * Separate workflow for Flower Signed-off-by: noopur <[email protected]> * Changes for dockerized ws Signed-off-by: noopur <[email protected]> * Modified test name to reflect flower model Signed-off-by: noopur <[email protected]> --------- Signed-off-by: noopur <[email protected]> Signed-off-by: Pant, Akshay <[email protected]>
securefederatedai#1488) (securefederatedai#1490) * bind socket to a specific interface * add input validation and allow list * address filepath coverity issue by creating a function to check path safety using set of allowed characters * pin upper bound to flwr * formatting --------- Signed-off-by: kta-intel <[email protected]> Signed-off-by: Pant, Akshay <[email protected]>
Signed-off-by: noopur <[email protected]> Signed-off-by: Pant, Akshay <[email protected]>
…ai#1495) * Add trufflehog to scan secrets in repo and logs file Signed-off-by: payalcha <[email protected]> * title fix Signed-off-by: payalcha <[email protected]> * Review comments Signed-off-by: payalcha <[email protected]> --------- Signed-off-by: payalcha <[email protected]> Signed-off-by: Pant, Akshay <[email protected]>
* Change openfl version from 1.8 to 1.9.0-dev Signed-off-by: noopur <[email protected]> * Change openfl version from 1.8 to 1.9.0.dev Signed-off-by: noopur <[email protected]> * Modified version file as well Signed-off-by: noopur <[email protected]> --------- Signed-off-by: noopur <[email protected]> Signed-off-by: Pant, Akshay <[email protected]>
…urefederatedai#1501) * Initial changes for testing Signed-off-by: noopur <[email protected]> * Initial changes for testing Signed-off-by: noopur <[email protected]> * All workflows modified Signed-off-by: noopur <[email protected]> * All workflows modified Signed-off-by: noopur <[email protected]> * All workflows modified Signed-off-by: noopur <[email protected]> * Correction of job name Signed-off-by: noopur <[email protected]> * Correction of job name Signed-off-by: noopur <[email protected]> * Testing one workflow Signed-off-by: noopur <[email protected]> * Final set of changes Signed-off-by: noopur <[email protected]> * Correct the repo URL Signed-off-by: noopur <[email protected]> * Only commit related changes Signed-off-by: noopur <[email protected]> * Correction in trufflehog job name Signed-off-by: noopur <[email protected]> * Added commit_id to SSL wf after rebase Signed-off-by: noopur <[email protected]> * Modified version file as well Signed-off-by: noopur <[email protected]> * Make commit_id optional input Signed-off-by: noopur <[email protected]> * Job name added for Trufflehog Signed-off-by: noopur <[email protected]> --------- Signed-off-by: noopur <[email protected]> Co-authored-by: Payal Chaurasiya <[email protected]> Signed-off-by: Pant, Akshay <[email protected]>
Signed-off-by: Pant, Akshay <[email protected]>
Signed-off-by: Pant, Akshay <[email protected]>
* added publications Signed-off-by: sarthakpati <[email protected]> * added conda installation instructions Signed-off-by: sarthakpati <[email protected]> --------- Signed-off-by: sarthakpati <[email protected]> Signed-off-by: Pant, Akshay <[email protected]>
Bumps [pytest-asyncio](https://github.com/pytest-dev/pytest-asyncio) from 0.25.3 to 0.26.0. - [Release notes](https://github.com/pytest-dev/pytest-asyncio/releases) - [Commits](pytest-dev/pytest-asyncio@v0.25.3...v0.26.0) --- updated-dependencies: - dependency-name: pytest-asyncio dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Payal Chaurasiya <[email protected]> Signed-off-by: Pant, Akshay <[email protected]>
* add hippmapp3r Signed-off-by: porteratzo <[email protected]> * addresed comments Signed-off-by: porteratzo <[email protected]> * add reference Signed-off-by: porteratzo <[email protected]> --------- Signed-off-by: porteratzo <[email protected]> Signed-off-by: Pant, Akshay <[email protected]>
Signed-off-by: payalcha <[email protected]> Signed-off-by: Pant, Akshay <[email protected]>
securefederatedai#1514) * Run resiliency 3 times Signed-off-by: noopur <[email protected]> * Run 50 rounds Signed-off-by: noopur <[email protected]> * Extra debugging Signed-off-by: noopur <[email protected]> * Extra debugging Signed-off-by: noopur <[email protected]> * Testing Signed-off-by: noopur <[email protected]> * Testing Signed-off-by: noopur <[email protected]> * Modified Signed-off-by: noopur <[email protected]> * Remove extra logging Signed-off-by: noopur <[email protected]> * Remove extra logging Signed-off-by: noopur <[email protected]> * Kill process as a one-liner Signed-off-by: noopur <[email protected]> * Kill process as a one-liner Signed-off-by: noopur <[email protected]> * Upgraded pytest from 8.3.4 to 8.3.5 Signed-off-by: noopur <[email protected]> * Specific functions to fetch pids and kill processes Signed-off-by: noopur <[email protected]> * Multiple changes done Signed-off-by: noopur <[email protected]> * File changes as part of lint fixing Signed-off-by: noopur <[email protected]> * Minor logging fixes and copyright year correction Signed-off-by: noopur <[email protected]> * Review comments addressed Signed-off-by: noopur <[email protected]> * Revert pytest version upgrade change Signed-off-by: noopur <[email protected]> * Check if start_process is present Signed-off-by: noopur <[email protected]> --------- Signed-off-by: noopur <[email protected]> Signed-off-by: Pant, Akshay <[email protected]>
Bumps [pytest](https://github.com/pytest-dev/pytest) from 8.3.4 to 8.3.5. - [Release notes](https://github.com/pytest-dev/pytest/releases) - [Changelog](https://github.com/pytest-dev/pytest/blob/main/CHANGELOG.rst) - [Commits](pytest-dev/pytest@8.3.4...8.3.5) --- updated-dependencies: - dependency-name: pytest dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Payal Chaurasiya <[email protected]> Co-authored-by: Noopur <[email protected]> Signed-off-by: Pant, Akshay <[email protected]>
…eratedai#1512) Bumps the github-actions group with 1 update: [aquasecurity/trivy-action](https://github.com/aquasecurity/trivy-action). Updates `aquasecurity/trivy-action` from 0.29.0 to 0.30.0 - [Release notes](https://github.com/aquasecurity/trivy-action/releases) - [Commits](aquasecurity/trivy-action@0.29.0...0.30.0) --- updated-dependencies: - dependency-name: aquasecurity/trivy-action dependency-type: direct:production update-type: version-update:semver-minor dependency-group: github-actions ... Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Payal Chaurasiya <[email protected]> Co-authored-by: Akshay Pant <[email protected]> Signed-off-by: Pant, Akshay <[email protected]>
…#1517) * Change to add time taken for each round in metrics Signed-off-by: payalcha <[email protected]> * Review comments Signed-off-by: payalcha <[email protected]> --------- Signed-off-by: payalcha <[email protected]> Signed-off-by: Pant, Akshay <[email protected]>
Signed-off-by: Pant, Akshay <[email protected]>
Signed-off-by: Pant, Akshay <[email protected]>
…comments update Signed-off-by: Pant, Akshay <[email protected]>
…orator Signed-off-by: Pant, Akshay <[email protected]>
…lise Signed-off-by: Pant, Akshay <[email protected]>
Signed-off-by: Pant, Akshay <[email protected]>
b1e6ec3
to
37c1055
Compare
37c1055
to
b9944a4
Compare
Summary
Moving serialisation/deserialisation logic out of collaborator into a separate middleware.
Why?
By clearly separating concerns across layers, it aims to enhance code readability for contributors and improves debuggability in case of failures.
This middleware introduction is similar to "middleware chaining" paradigm followed widely in Go where the same function is implemented across multiple "layers", each handling a distinct aspect of the functionality. In the context of this PR, this is done such that
Collaborator
): Performs operations central to FL.CollaboratorSerialiser
): Performs serialisation/desrialisation of numpy array.Type of Change (Mandatory)
Description (Mandatory)
TensorCodec
to convert tensors to protobuf format and vice versa in order to communicate with the aggregator.Testing
Manual and CI.
Eden compression CI
Additional Information