Aggregations in SQL offer a concise overview when managing extensive datasets. These “GROUP BY queries” provide a high-level perspective, enabling rapid insight extraction from large data volumes.
Support for aggregations is now available in R2 SQL, Cloudflare’s serverless, distributed analytics query engine. This engine executes SQL queries on data within R2 Data Catalog. Aggregations empower R2 SQL users to identify key trends, data changes, generate reports, and detect log anomalies.
This new capability enhances existing filter queries, which are crucial for analytical tasks and help users pinpoint specific information within numerous Apache Parquet files.
This article will explore the functionality and intricacies of aggregations, detailing how R2 SQL was expanded to execute these queries across extensive datasets stored in R2 Data Catalog.
The importance of aggregations in analytics
Aggregations, also known as “GROUP BY queries,” produce a concise summary of raw data.
Generating reports is a frequent application for aggregations. For instance, a “sales” table might hold historical sales data across an organization’s countries and departments. A report on sales volume per department can be created with this aggregation query:
SELECT department, sum(value)
FROM sales
GROUP BY department
The "GROUP BY" statement segments table rows into distinct categories, each labeled by a specific department. After grouping, "sum(value)" can be computed for all rows within each category, yielding the total sales volume for that department.
When reports require focusing on departments with the highest volume, an "ORDER BY" statement is useful:
SELECT department, sum(value)
FROM sales
GROUP BY department
ORDER BY sum(value) DESC
LIMIT 10
This instructs the query engine to sort department categories by their total sales volume in descending order, returning only the top 10.
Additionally, filtering out anomalies might be necessary. For instance, to include only departments with over five total sales in a report, a "HAVING" statement can be used:
SELECT department, sum(value), count(*)
FROM sales
GROUP BY department
HAVING count(*) > 5
ORDER BY sum(value) DESC
LIMIT 10
This query incorporates "count(*)", an aggregate function that determines the number of rows in each category. This count directly reflects the number of sales per department, and a predicate in the "HAVING" statement ensures only categories with more than five rows are retained.
Two approaches to aggregation: compute sooner or later
Aggregation queries possess a unique characteristic: they can refer to columns that are not physically stored. For example, "sum(value)" is calculated dynamically by the query engine, contrasting with the "department" column, which is retrieved from Parquet files in R2. This distinction necessitates splitting queries that reference aggregates like "sum" or "count" into two phases.
The initial phase involves computing new columns. To sort data by a "count(*)" column using "ORDER BY" or filter rows with "HAVING", the values of this column must be determined first. Once these aggregate column values are established, the remaining query execution can proceed.
If a query utilizes aggregate functions in "SELECT" but not in "HAVING" or "ORDER BY", a strategy can be employed. As the aggregate function values are not required until the final output, they can be partially computed and then merged just before presentation to the user.
The core distinction between these methods lies in the timing of aggregate function computation: either in advance for subsequent calculations, or dynamically to construct results incrementally for the user.
This discussion will first explore dynamic result generation, termed "scatter-gather aggregations." Subsequently, "shuffling aggregations" will be introduced, which extend capabilities to include additional computations such as "HAVING" and "ORDER BY" on aggregate functions.
Scatter-gather aggregations
Aggregate queries lacking "HAVING" and "ORDER BY" clauses can be executed similarly to filter queries. In filter queries, R2 SQL designates a coordinator node for execution. This node analyzes the query and queries R2 Data Catalog to identify relevant Parquet row groups. Each row group is a manageable task for a single compute node. The coordinator distributes these tasks among multiple worker nodes and then gathers their results for the user.
To execute aggregate queries, the same steps are followed, distributing smaller tasks among worker nodes. However, in this case, worker nodes not only filter rows based on the "WHERE" predicate but also compute pre-aggregates.
Pre-aggregates are an intermediate state of an aggregation, representing a partially computed aggregate function on a data subset. These can be merged to determine the final aggregate function value. Decomposing aggregate functions into pre-aggregates enables horizontal scaling of aggregation computation, leveraging Cloudflare's extensive network compute resources.
For instance, a "count(*)" pre-aggregate is simply the row count within a data subset. The final "count(*)" is derived by summing these numbers. An "avg(value)" pre-aggregate comprises two numbers: "sum(value)" and "count(*)". The "avg(value)" is then calculated by summing all "sum(value)" and "count(*)" values separately, then dividing the former by the latter.
Upon completing pre-aggregate computations, worker nodes stream their results to the coordinator node. The coordinator gathers these results, calculates the final aggregate function values from the pre-aggregates, and delivers the outcome to the user.
Shuffling, beyond the limits of scatter-gather
Scatter-gather proves highly effective when the coordinator can finalize results by merging small, partial states from workers. For a query such as SELECT sum(sales) FROM orders, the coordinator receives a single number from each worker and sums them. The coordinator's memory usage remains minimal, irrespective of the data volume in R2.
This method, however, loses efficiency when a query necessitates sorting or filtering based on an aggregation's outcome. For example, consider a query to identify the top two departments by sales volume:
SELECT department, sum(sales)
FROM sales
GROUP BY department
ORDER BY sum(sales) DESC
LIMIT 2
Accurately identifying the global Top 2 demands knowledge of total sales for every department across the entire dataset. Since data is randomly distributed across Parquet files, a department's sales are often fragmented across multiple workers. A department could have low sales on individual workers, thus missing local Top 2 lists, yet achieve the highest global sales volume when aggregated.
The following diagram demonstrates why a scatter-gather approach is unsuitable for this query. 'Dept A' leads global sales, but its sales are uniformly distributed across workers, causing it to be excluded from certain local Top 2 lists and subsequently discarded by the coordinator.
Therefore, when a query orders results by global aggregation, the coordinator cannot depend on pre-filtered worker results. It must request the total count for each department from every worker to compute global totals before sorting. Grouping by high-cardinality columns, such as IP addresses or User IDs, would compel the coordinator to process and merge millions of rows, leading to a single-node resource bottleneck.
To resolve this, shuffling is necessary, which involves colocating data for specific groups prior to final aggregation.
Shuffling of aggregation data
To overcome random data distribution issues, a shuffling stage is implemented. Workers directly exchange data to colocate rows based on their grouping key, rather than sending results to the coordinator.
This routing utilizes deterministic hash partitioning. As a worker processes a row, it hashes the GROUP BY column to determine the destination worker. Due to the deterministic nature of this hash, all workers in the cluster independently agree on where to send specific data. For example, if 'Engineering' hashes to Worker 5, every worker routes 'Engineering' rows to Worker 5, eliminating the need for a central registry.
The diagram below depicts this process. Observe that 'Dept A' originates on Workers 1, 2, and 3. Since the hash function maps 'Dept A' to Worker 1, all workers direct those rows to that specific destination.
While shuffling aggregates yields accurate results, this all-to-all exchange introduces a timing dependency. If Worker 1 starts calculating the final total for 'Dept A' before Worker 3 completes sending its data share, the result will be partial.
To mitigate this, a strict synchronization barrier is enforced. The coordinator monitors the cluster's progress as workers buffer and flush outgoing data via gRPC streams to their counterparts. The coordinator issues the command to proceed only after every worker confirms completion of input file processing and shuffle buffer flushing. This barrier ensures that the dataset on each worker is complete and accurate at the start of the subsequent stage.
Local finalization
After the synchronization barrier is lifted, each worker possesses the complete dataset for its allocated groups. Worker 1, for instance, now holds all sales records for 'Dept A' and can confidently compute the final total.
This enables pushing computational logic, such as filtering and sorting, to the workers, thereby reducing the coordinator's load. For instance, if a query contains HAVING count(*) > 5, the worker can filter out groups that do not satisfy this criterion immediately post-aggregation.
Upon completion of this stage, each worker generates a sorted, finalized stream of results for its assigned groups.
The streaming merge
The coordinator represents the final component. In the scatter-gather model, it handled the resource-intensive task of aggregating and sorting the entire dataset. Under the shuffling model, its role evolves.
Since workers have already computed and locally sorted the final aggregates, the coordinator merely executes a k-way merge. It establishes a stream with each worker, reading results row by row. It then compares the current row from each worker, selects the 'winner' according to the sort order, and adds it to the query results for the user.
This method is especially effective for LIMIT queries. If a user requests the top 10 departments, the coordinator merges streams until these 10 items are found, then ceases processing. It avoids loading or merging millions of remaining rows, enabling larger-scale operations without excessive compute resource consumption.
A powerful engine for processing massive datasets
The inclusion of aggregations transforms R2 SQL from a data filtering tool into a robust engine for processing massive datasets. This is achieved through distributed execution strategies like scatter-gather and shuffling, which push computation closer to the data, leveraging Cloudflare’s global compute and network scale.
Whether generating reports, monitoring high-volume logs for anomalies, or identifying data trends, these tasks can now be performed effortlessly within Cloudflare’s Developer Platform. This eliminates the need for managing complex OLAP infrastructure or transferring data out of R2.
Try it now
Aggregation support in R2 SQL is currently available. It will be interesting to observe the applications of these new functions with data in R2 Data Catalog.
-
Get Started: Refer to the documentation for examples and syntax guides on executing aggregation queries.
-
Join the Conversation: For questions, feedback, or to share projects, connect in the Cloudflare Developer Discord.

