SaSPartitioner is a self-adaptive stream partitioning framework that leverages Deep Reinforcement Learning based on real running metrics.
- Java 11
- Apache Flink 1.20
- Ray 2.40.0
We use a modified version of Flink 1.20 with the ability to collect metrics at custom intervals. You should compile and deploy this modified version of Flink on every machine in your cluster.
The system contains two main components: the Flink partitioner written in Java, and the reinforcement learning agent implemented with Ray RLlib.
- For Java code, point
flink.source.pathinpom.xmlto our modified Flink, then compile withmvn package. - For the RL agent, install the required Python packages in
scripts/rl/requirements.txt.
The parameters are configured in src/main/resources/params.yaml and scripts/rl/configurations.py respectively. An example
of the yaml and Python configuration file is provided in params/ and scripts/rl/configuration_pool/.
- Set the
learningPartitionerinparams.yamltodalton-offline. - Offline data collection: run the Java class
cn.edu.zju.daily.metricflux.task.wordcount.WordCountStaticDistRouteTrainingExperimentto collect the offline data. - Configure the
log_folderanddata_pathinconfigurations.py. Changerun_modetooffline. - Run
scripts/rl/offline_online_train_remote_n.pyto obtain the pre-trained model.
- Set the
learningPartitionerinparams.yamltosaspartitioner. - Set the
checkpoint_pathinconfigurations.pyto point to the offline model. - Run
scripts/rl/offline_online_train_remote_n.pyto start the RL agent server. - Run the Java class
cn.edu.zju.daily.metricflux.task.wordcount.WordCountStaticDistRouteTrainingExperimentto start the online training process.
To test the maximum throughput of the system:
- Set the
checkpoint_pathinconfigurations.pyto point to the online model. - Run
scripts/rl/offline_online_train_remote_n.pyto start the RL agent server. - Set the
partitionerinparams.yamltosaspartitioner. - Run the Java class
cn.edu.zju.daily.metricflux.task.wordcount.WordCountThroughputExperimentV2to start testing. The source data rate will gradually increase until backpressure is detected, and the maximum throughput will be logged.
You can run the following baselines to compare with SaSPartitioner by setting the partitioner to these values:
- Hash:
hash - cAM:
cam - DAGreedy:
dagreedy - FlexD:
flexd - Dalton:
dalton-original - Dalton with collected metrics as observations:
dalton-metrics
There are some bash scripts in bin/ to facilitate batch experiments. You can refer to these scripts for the TDigest task.