-
Notifications
You must be signed in to change notification settings - Fork 34
Spark HyperLogLog Functions
Precise distinct counts are expensive to compute in Spark because:
-
When large cardinalities are involved, precise distinct counts require a lot of memory/IO, and
-
Every row of data has to be processed for every query because distinct counts cannot be reaggregated
HyperLogLog (HLL) can address both problems. Memory use is reduced via the tight binary sketch representation and the data can be pre-aggregated because HLL binary sketches--unlike distinct counts--are mergeable. It is unfortunate that Spark's HLL implementation does not expose the binary HLL sketches, which makes its usefulness rather limited: it addresses (1) but not (2) above.
The HLL Spark native functions in spark-alchemy provide two key benefits:
-
They expose HLL sketches as binary columns, enabling 1,000+x speedups in approximate distinct count computation via pre-aggregation.
-
They enable interoperability at the HLL sketch level with other data processing systems. We use an open-source HLL library with an independent storage specification and built-in support for Postgres-compatible databases and even JavaScript. This allows Spark to serve as a universal data (pre-)processing platform for systems that require fast query turnaround times, e.g., portals & dashboards.
spark-alchemy provides a richer set of HLL functions than either Spark or BigQuery. The full list of HLL functions is:
-
hll_cardinality(hll_sketch)
: returns the cardinality (distinct count) of items in the set represented by the HLL sketch. -
hll_intersect_cardinality(hll_sketch, hll_sketch)
: computes a merged (unioned) sketch and uses the fact that |A intersect B| = (|A| + |B|) - |A union B| to estimate the intersection cardinality of the two sketches. -
hll_init(column[, precision])
: creates an HLL sketch (a binary column) for each value incolumn
.hll_init()
is designed for use outside of aggregation (GROUP BY) as a privacy protection tool (because it hashes potentially sensitive IDs) and/or to prepare granular data for re-aggregation. Instead ofhll_init()
, you will typically usehll_init_agg()
.-
precision
: here and everywhere else, determines the maximum error for cardinality computation. The smaller the precision, the greater the size of the binary sketch. The default value is 0.05, just as with Spark'sapprox_distinct_count()
. See HLL in Spark for background on the relationship between precision and computation cost. Sometimes, it is worth using low precision and suffering the slow computation as long as the data can be pre-aggregated as follow-on re-aggregation will perform much faster on the existing aggregates.
-
-
hll_init_agg(column[, precision])
: likehll_init()
, it creates an HLL sketch, but designed for use with aggregation (GROUP BY), i.e., it creates a sketch for all values in a group. Logically equivalent tohll_merge(hll_init(...))
. -
hll_init_collection(array_or_map_column[, precision])
: creates an HLL sketch for each value in the column, which has to be an array or a map. Having collection versions of HLL functions is a performance optimization, eliminating the need to explode & re-group data. -
hll_init_collection_agg(array_or_map_column[, precision])
: likehll_init_collection()
, but designed for use with aggregation (GROUP BY), i.e., it creates a sketch for all values in the arrays/maps of a group. Logically equivalent tohll_merge(hll_init_collection(...))
. -
hll_merge(hll_sketch)
: merges HLL sketches during an aggregation operation. -
hll_row_merge(hll_sketches*)
: merges multiple sketches in one row into a single field.
For maximum performance and flexibility, spark-alchemy
implements HLL functionality as native Spark functions. Similar to user-defined functions, native functions require registration using the following Scala command.
// Register spark-alchemy HLL functions for use from SparkSQL
com.swoop.alchemy.spark.expressions.hll.HLLFunctionRegistration.registerFunctions(spark)
import com.swoop.alchemy.spark.expressions.hll.functions._
The following examples assume we are working with 100,000 distinct IDs from 0..99,999 in a dataframe with a single column id
. SparkSQL examples assume this data is available as a table/view called ids
. You can create it using
spark.range(100000).createOrReplaceTempView("ids")
Let's compute the distinct count of IDs using exact counting, Spark's built-in approximate counting function and spark-alchemy's functions. We'll look at the output at different precisions.
You will note that for basic approximate counts spark-alchemy requires two functions (hll_cardinality(hll_init_agg(...))
) vs. Spark's single (approx_count_distinct(...)
). The reason will become apparent in the next section.
select
-- exact distinct count
count(distinct id) as cntd,
-- Spark's HLL implementation with default 5% precision
approx_count_distinct(id) as anctd_spark_default,
-- approximate distinct count with default 5% precision
hll_cardinality(hll_init_agg(id)) as acntd_default,
-- approximate distinct counts with custom precision
map(
0.005, hll_cardinality(hll_init_agg(id, 0.005)),
0.020, hll_cardinality(hll_init_agg(id, 0.020)),
0.050, hll_cardinality(hll_init_agg(id, 0.050)),
0.100, hll_cardinality(hll_init_agg(id, 0.100))
) as acntd
from ids
import org.apache.spark.sql.functions._
import com.swoop.alchemy.spark.expressions.hll.functions._
spark.range(100000).select(
// exact distinct count
countDistinct('id).as("cntd"),
// Spark's HLL implementation with default 5% precision
approx_count_distinct('id).as("anctd_spark_default"),
// approximate distinct count with default 5% precision
hll_cardinality(hll_init_agg('id)).as("acntd_default"),
// approximate distinct counts with custom precision
map(
Seq(0.005, 0.02, 0.05, 0.1).flatMap { error =>
lit(error) :: hll_cardinality(hll_init_agg('id, error)) :: Nil
}: _*
).as("acntd")
).show(false)
+------+-------------------+-------------+-------------------------------------------------------------+
|cntd |anctd_spark_default|acntd_default|acntd |
+------+-------------------+-------------+-------------------------------------------------------------+
|100000|95546 |98566 |[0.005 -> 99593, 0.02 -> 98859, 0.05 -> 98566, 0.1 -> 106476]|
+------+-------------------+-------------+-------------------------------------------------------------+
The following example shows how to use the pre-aggregate -> re-aggregate -> finalize pattern for high-performance distinct counting. We will calculate approximate distinct counts for odd vs. even (modulo 2) IDs in three steps:
- Pre-aggregate the data modulo 10 using
hll_init_agg()
. - Re-aggregate modulo 2 using
hll_merge()
. - Produce a final result using
hll_cardinality()
.
In a product environment, the pre-aggregates from step (1) and, in the case of very large data, re-aggregations at various granularities will be computed and persisted so that final reports can be created without having to look at the rows of data from step (1).
with
-- pre-aggregates contain a binary HLL sketch column
pre_aggregate as (
select
id % 10 as id_mod10,
hll_init_agg(id) as hll_id
from ids
group by id % 10
),
-- HLL sketch columns can be re-aggregated using hll_merge() just like counts can be re-aggregated with sum().
-- Note that you cannot combine distinct counts with sum(); this is where HLL sketches shine.
aggregate as (
select
id_mod10 % 2 as id_mod2,
hll_merge(hll_id) as hll_id
from pre_aggregate
group by id_mod10 % 2
)
-- When a final result has to be produced, use hll_cardinality() on the HLL sketches
select
id_mod2,
hll_cardinality(hll_id) as acntd
from aggregate
order by id_mod2
spark.range(100000)
// pre-aggregate
.groupBy(('id % 10).as("id_mod10"))
.agg(hll_init_agg('id).as("hll_id"))
// reaggregate
.groupBy(('id_mod10 % 2).as("id_mod2"))
.agg(hll_merge('hll_id).as("hll_id"))
// final report
.select('id_mod2, hll_cardinality('hll_id).as("acntd"))
.orderBy('id_mod2)
.show(false)
+-------+-----+
|id_mod2|acntd|
+-------+-----+
|0 |47305|
|1 |53156|
+-------+-----+
Let's change the previous example such that, instead of grouping the ID modulo 10 into HLL sketches, we collect them in arrays.
with
-- Rather than grouping the modulo 10 IDs into binary sketches, collect them in arrays
grouped as (
select
id % 10 as id_mod10,
collect_list(id) as ids
from ids
group by id % 10
),
-- For aggregation, use hll_init_collection_agg() to create HLL sketches
aggregate as (
select
id_mod10 % 2 as id_mod2,
hll_init_collection_agg(ids) as hll_id
from grouped
group by id_mod10 % 2
)
-- When a final result has to be produced, use hll_cardinality() as before
select
id_mod2,
hll_cardinality(hll_id) as acntd
from aggregate
order by id_mod2
import org.apache.spark.sql.functions._
import com.swoop.alchemy.spark.expressions.hll.functions._
spark.range(100000)
// group into arrays
.groupBy(('id % 10).as("id_mod10"))
.agg(collect_list('id).as("ids"))
// aggregate
.groupBy(('id_mod10 % 2).as("id_mod2"))
.agg(hll_init_collection_agg('ids).as("hll_id"))
// final report
.select('id_mod2, hll_cardinality('hll_id).as("acntd"))
.orderBy('id_mod2)
.show(false)
+-------+-----+
|id_mod2|acntd|
+-------+-----+
|0 |47305|
|1 |53156|
+-------+-----+
Imagine yourself having to build HLL sketches for various columns using the same precision. Now imagine yourself having to do this over and over in many different cells in a notebook. Wouldn't it be nice to not have to keep typing the precision when you use hll_init_*
in your transformations? Some of us at Swoop thought so and we added BoundHLL
just for this purpose. In the following slightly longer example we will compute approximate distinct counts for odd vs. even IDs using different precisions.
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions._
import com.swoop.alchemy.spark.expressions.hll.functions._
import com.swoop.alchemy.spark.expressions.hll.BoundHLL
def preAggregateIds(error: Double)(ds: Dataset[_]) = {
val hll = BoundHLL(error)
import hll._ // imports hll_init_* versions bound to error
ds.toDF("id")
.groupBy(('id % 2).as("id_mod"))
.agg(hll_init_agg('id).as("hll_id"))
.withColumn("error", lit(error))
}
val ids = spark.range(100000)
Seq(0.005, 0.01, 0.02, 0.05, 0.1)
.map(error => ids.transform(preAggregateIds(error)))
.reduce(_ union _)
.groupBy('error).pivot('id_mod)
.agg(hll_cardinality(hll_merge('hll_id)).as("acntd"))
.orderBy('error)
.show(false)
+-----+-----+-----+
|error|0 |1 |
+-----+-----+-----+
|0.005|49739|49908|
|0.01 |49740|49662|
|0.02 |51024|49712|
|0.05 |47305|53156|
|0.1 |52324|56113|
+-----+-----+-----+
You can use the import hll._
pattern in a notebook cell to bind all hll_init_*()
functions in all notebook cells to the precision provided to BoundHLL
but you have to keep two things in mind that another import, e.g., com.swoop.alchemy.spark.expressions.hll.functions._
can override the bound import. Also, if you attach the notebook to a new Spark/REPL session, you have to re-run the import. For that reason, we typically recommend that notebook-level precision binding happens in a single cell, e.g.,
import com.swoop.alchemy.spark.expressions.hll.functions._
import com.swoop.alchemy.spark.expressions.hll.BoundHLL
val hll = BoundHLL(0.02) // Spark default is 0.05
import hll._
When you bind the precision of hll_init_*()
, attempting to use a version with an explicit precision will generate a compiler error. This is by design. To keep things consistent, we also bind the precision of Spark's own approx_count_distinct()
.
HLL sketch size depends on the desired precision and is independent of data size. A simple rule of thumb is that a 2x increase in HLL cardinality estimation precision requires a 4x increase in the size of HLL sketches.
spark.range(100000)
.select(map(
Seq(0.005, 0.01, 0.02, 0.03, 0.04, 0.05, 0.06, 0.07, 0.08, 0.09, 0.1).flatMap { error =>
lit(error) :: length(hll_init_agg('id, error)) :: Nil
}: _*
).as("lengths"))
.select(explode('lengths).as("error" :: "sketch_size_in_bytes" :: Nil))
.show(false)
+-----+--------------------+
|error|sketch_size_in_bytes|
+-----+--------------------+
|0.005|43702 |
|0.01 |10933 |
|0.02 |2741 |
|0.03 |1377 |
|0.04 |693 |
|0.05 |353 |
|0.06 |353 |
|0.07 |181 |
|0.08 |181 |
|0.09 |181 |
|0.1 |96 |
+-----+--------------------+
spark-alchemy is maintained by the team at Swoop. If you'd like to contribute to our open-source efforts, by joining our team or from your company, let us know at spark-interest at swoop dot com
.