I recently coded up several computations in differential dataflow that are meant to track the twenty two computations in the TPC-H benchmark. I even made a page for them, with numbers and code and stuff. The computations in the benchmark are meant to be a bit more "realistic", let's say, than repeatedly doing breadth-first search over and over again. And indeed, I think I learned some things! I thought I would walk you through some of the queries and tell you some of the more interesting stories.
The TPCH benchmark models some firm whose business appears to be the sales of "parts". Parts have suppliers, are sold in orders, whose deliveries are recorded as line items. There are also customers, regions, and nations to think about. The relationships between these relations is not wildly surprising, and we will only get in to the details if they matter for a query.
The data are randomly generated by a dbgen
program that the benchmark makes available. I'm using a "scale-factor one" dataset, which means it intends to be about one gigabyte. It ends up being a little more, because I use things like sixty-four bit identifiers when I could be using something smaller; it might be worth economizing somewhere, in that I'm just missing the ability to process the scale-factor ten dataset on my laptop.
There are eight relations, with numbers of tuples:
tuples
-------
lineitems 6001215
orders 1500000
parts 200000
partsupps 800000
customers 150000
suppliers 10000
nations 25
regions 5
They mostly seem to have random data, generated according to some rules, but we are going to try to avoid exploiting any properties like that. The only thing I've done, initially subconsciously but now more formally, is notice how large each relation is. This is probably safe.
The streaming setting we are emulating is the one put forward in the paper How to Win a Hot Dog Eating Contest, a recent publication on batch sizing in incremental view maintenance from the dbtoaster.org group at EPFL. The idea is to evaluate the "loading" of the relations, by alternating through the relations adding one record each and updating each of the TPCH queries. This loads small relations like nations
and regions
pretty quickly, which is good because they are probably mostly static anyhow (and the computations would be weird/defective if "EUROPE" only showed up at the end).
The hot dog eating paper (I just can't help calling it this) studies among other things what I'll call "logical batching": what happens as inputs are batched into insertions that happen at the same logical time. This gives a system the opportunity to do more work at one time, and has the potential to reduce the complexity of the output of the computation: one million updates will just change an aggregate count once, rather than one million times. At the same time, logical batching changes the computation; we aren't computing the same result any more. But, it could be a good technique for dealing with load spikes, as we end up at the same state at the end, and the competition is "just drop input records lol" which isn't how we roll.
I'm also interested in understanding what I'll call "physical batching", where the logical times of updates are unchanged but the system has access to the updates of multiple logical times at once. This allows the system to do more work at a go, which can reduce various overheads, but does not change the output of the computation. Instead, it trades away the lowest of latencies for increased throughput. It is another complementary strategy for dealing with load, as long as things only get more efficient the larger the batches are (not always, unfortunately).
Having chatted with the hot dog authors, their benchmark loads up the data to process ahead of time (good, because parsing and such takes lots of time), and pre-batches the data so that when it comes time to compute we just need to drop the data in and watch it go. They do a few other things that I haven't done (mostly out of laziness) like treat nations
and regions
as static collections which can be strength-reduced in the computation; for example, a join
with nations
to find the name of a nation could be turned into a look-up into a small local map, which would save greatly if we spend lots of time doing that. I should say, I am interpreting some of their results, and may do so incorrectly; any restrictive statements about their work should be taken with a grain of salt.
The TPCH queries are "analytical", meaning I guess that they are relatively large aggregates over the whole datasets, rather than look-ups of individual entries. By design, they have different characteristics and actually do seem to do a good job exercising various systems dark corners. Also, I think, by design: the data schema and queries are all tractable: there are no weird motif-finding queries, iterative graph processing, or exciting things like this. All of the queries can be maintained with work linear in the number of updates (though this may be non-obvious on first reading of the queries).
Let's start at the beginning! A very good place to start.
Actually, this might be the worst query in the bunch, because its relative simplicity suggests a really rawking implementation, and differential dataflow does so-so. Let's look at the query as described by the TPC-H folks:
select
l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
lineitem
where
l_shipdate <= date '1998-12-01' - interval ':1' day (3)
group by
l_returnflag,
l_linestatus
order by
l_returnflag,
l_linestatus;
I can't really read SQL all that well, so it is to my great delight that each query also has an English language description of the intent.
The Pricing Summary Report Query provides a summary pricing report for all lineitems shipped as of a given date. The date is within 60 - 120 days of the greatest ship date contained in the database. The query lists totals for extended price, discounted extended price, discounted extended price plus tax, average quantity, average extended price, and average discount. These aggregates are grouped by RETURNFLAG and LINESTATUS, and listed in ascending order of RETURNFLAG and LINESTATUS. A count of the number of lineitems in each group is included.
This query is actually pretty readable I feel, but we get the message pretty clearly: add a bunch of things up, but not everything (because of some date constraint) and do the adding independently for different settings of return flags and statuses.
Let's imagine we just wanted to do a count
, because that is pretty easy and plus differential dataflow has a count
operator. How would we write this in differential dataflow? There is actual code written, but stylized you might write
lineitems
.map(|x| (x.return_flag, x.line_status))
.count()
This fragment extracts the "return flag" and "line status" fields from the tuple ("projection" in database parlance), and then invokes the count
method. If lineitems
were a static collection we would expect this to tell us how many of each record (each pair of return flag and line status) were seen in the input. Instead, it gives us access to a stream of updates to these counts, reporting with each change to the lineitem
input how the set of output counts has changed.
This works great. I mean, great-ish; we will see some performance shortcomings in just a moment. But it has a bigger problem: it doesn't actually compute the thing we were asked to compute, which is more than a count.
Until about a month ago, if you wanted to compute the sum of a thing other than the number of records, you had to cheat a bit. You had to create a new collection whose counts were the thing you wanted to sum. For example, if we wanted to sum quantity
(which we do) we would need to transform the lineitems
relation into one where each record has weight equal to its quantity, rather than "one". The resulting collection is much "larger", but this is no big deal because we just track all the counts as isize
fields anyhow. A count()
will now determine the accumulated quantity across all the records. We would have to repeat this for each aggregation we want to take.
About a month ago support for general aggregatable types landed. This allows us to replace our isize
"changes to counts" with more general Abelian groups (things that support addition, subtraction, have a zero, and don't much care which order you do the operations in). I initially called them "rings" because join
requires multiplication, but that was wrong.
The more general aggregatable types allow us to bundle all of the data we want to aggregate together in one type, and then call the increasingly ill-named count()
to promote the accumulated values as data. The actual Rust code looks like
collections
.lineitems()
.inner
.flat_map(|(item, time, diff)|
if item.ship_date < ::types::create_date(1998, 9, 1) {
Some(((item.return_flag, item.line_status), time,
DiffPair::new(diff as i64 * item.quantity,
DiffPair::new(diff as i64 * item.extended_price,
DiffPair::new(diff as i64 * item.extended_price * (100 - item.discount) / 100,
DiffPair::new(diff as i64 * item.extended_price * (100 - item.discount) * (100 + item.tax) / 10000,
DiffPair::new(diff as i64 * item.discount, diff))))))).into_iter()
}
else {
None.into_iter()
}
)
.as_collection()
.count()
.probe()
.0
There is some syntactic horror going on, and I apologize for this. Let me explain what is happening and why.
- We access the
.inner
field of the stream of updates. We are diving down to timely dataflow here. The action we want to happen is a re-writing of the difference field of an update, and I haven't figured out what the right name for that is yet. The complementary operator iscount()
, whose name also sucks. - Even if we had a good name, we kind of want to perform
flat_map
at the same time. Flat map is the operator that allows us to simultaneously filter and project data, and we want a very efficient "first touch" on the data: we want to throw away as much as possible, both in terms of whole records (filtering) and fields of those records (projection). - For "reasons", we can't easily use tuples as our differences (reason: tuples do not derive
Add
,Neg
traits). So, we wrap up the new differences in a stack ofDiffPair
types, which are just pairs of differences, nested. - Timely's
flat_map
method requires iterators as output, where it could requireI: IntoIterator
, which would simplify things further.
In a perfect (future) world we might write:
collections
.lineitems()
.flat_map(|item|
if item.ship_date < ::types::create_date(1998, 9, 1) {
Some(((item.return_flag, item.line_status), // <-- data
(item.quantity, // <-- diffs
item.extended_price,
item.extended_price * (100 - item.discount) / 100,
item.extended_price * (100 - item.discount) * (100 + item.tax) / 10000,
item.discount,
1)))
}
else { None }
)
.re_differnate(|((flag, status), stuff)| ((flag, status), stuff))
.count()
Brilliant. Ok, so new magical differential dataflow technology realizes we can abstract "+" and "-". Prize please!
Let's run this thing and see what happens.
1,000 | 10,000 | 100,000 | 1,000,000 | peak rate | hot dog | |
---|---|---|---|---|---|---|
query01 | 4.15s | 4.75s | 5.88s | 6.34s | 1.45M/s | 1.27M/s |
Now, these aren't very good numbers. I can say that because they are mostly my numbers, but how long should it take to scan six million things and accumulate six quantities (with some multiplication to determine them). Processing one million tuples per second means about a micro-second, a few thousand cycles, to do all this addition. Remember how I said this is probably the worst query for differential dataflow? This is it.
Differential dataflow does a bunch of things that it really doesn't need to do for this query. Mainly, it imagines all sorts of degrees of freedom that this example does not demonstrate. It imagines data that could be temporally out of order and must be sorted, data with massive numbers of keys that must be visited carefully, data that might be re-used in the future and must be thoughtfully indexed, data whose times are only partially ordered and consequently cannot be updated in-place.
This isn't to say that differential dataflow is justified in doing these things. They are important in something like breadth-first search, but pretending that people don't often want to see streams of counts is just wrong.
This does raise the question of whether people really want to see streams of counts. Obviously this is a natural output for a database query, but if you are monitoring a stream of aggregate quantities, you may or may not actually want to see the counts themselves. It is pretty easy for your UI to accumulate the changes as they arrive, and arguably the decision to turn a stream of updates into a stream of count changes (where the counts are data) is something the consumer of the stream should make. If we skip the count()
and just hand back differences for you to accumulate, the times drop to somewhere between one quarter and one half of a second, for reference.
If you thought query 1 was easy, query 6 is meant to be really easy. At least, I think that is the intent. It looks like:
select
sum(l_extendedprice * l_discount) as revenue
from
lineitem
where
l_shipdate >= date ':1'
and l_shipdate < date ':1' + interval '1' year
and l_discount between :2 - 0.01 and :2 + 0.01
and l_quantity < :3;
This query mostly throws away its input, and then just counts up what remains. Here is how it looks in differential dataflow:
collections
.lineitems()
.inner
.flat_map(|(x, time, diff)|
if create_date(1994, 1, 1) <= x.ship_date && x.ship_date < create_date(1995, 1, 1) && 5 <= x.discount && x.discount < 7 && x.quantity < 24 {
Some(((), time, (x.extended_price * x.discount / 100) * diff as i64)).into_iter()
}
else { None.into_iter() }
)
.as_collection()
.count()
We are doing that weird .inner
thing again, followed by a flat_map
to simultaneously filter, project, and re-differnate the updates. It's not entirely clear we should call this "differential dataflow", but bear with me. We then just count()
the results.
Let's see how it runs.
1,000 | 10,000 | 100,000 | 1,000,000 | peak rate | hot dog | |
---|---|---|---|---|---|---|
query06 | 0.31s | 0.17s | 0.17s | 0.17s | 36.35M/s | 138.22M/s |
Wow. While we are doing pretty well, we are also getting smoked. It wasn't always this way, though. Previously we weren't doing very well and were getting really smoked.
This query seems to exercise just how light a touch you can have on the data. Just about all of the data are discarded, and the query measures how little work you can do in discovering this. In fact, I'm not sure if any data are being passed, so we can take this as a benchmark of how efficiently differential dataflow can examine and then throw away data.
The first version of this code did several things differently, each of which had performance issues. Notice that at 36M/s we have about a 30ns budget for each record.
-
Initially the data were not pre-batched, and the batches were discovered as we walked through the data. This added a substantial amount of per-record overhead. Worse, each time around the "what should we do next" loop we checked out each of the eight relations, all to produce just one
lineitem
record. Horrible. We have since pre-batched things. -
Timely dataflow's input operator only recently got the ability to accept batches of input, rather than records at a time. I think that previously I was amazed that record-at-a-time ingestion got inlined and turned into a
memcpy
, but .. now we can just accept the batches. -
Our first version of the code had a
filter
followed by amap
. This meant that although we filtered fairly aggressively, we would still copy all the fields of the relatively large record. Fusing the two inflat_map
prevents this. -
The
LineItem
type has a variable-length text field for "comments". I thought that aString
would be a great way to represent this. It wasn't. Apparently it takes quite a while to de-allocate the memory for six million strings, which we spent about one second doing. This was replaced with anarrayvec::ArrayString
, which is a statically allocated inline hunk of memory with a fixed upper bound on length. The TPCH schema indicates an upper length of 44 bytes, and it is probably this generous thinking that is why your customer feedback form truncates to halfway through your first sentence.
If you attach a profiler to the current program, roughly half of the remaining time (about 80ms) is now spent de-allocating buffers of LineItem
entries, the roughly 1GB of input data that we retire in the flat_map
, in chunks of whatever our batch size is. I don't know enough to know if this is surprisingly large, or otherwise avoidable. In a "real" system we would almost certainly want to recycle these buffers rather than drop them, but since all of our input are pre-prepared, we don't have anything to do with recycled buffers. If we went columnar, we could avoid even having the other fields around in the first place, but that feels a bit like cheating (or smarts, hard to tell the difference some times).
Query 13 counts up the number of orders for each customer, including customers with zero orders. We can compute this pretty easily by concatenating the set of customer keys (from customers
) with the keys in orders
. Then we count them up for each customer, and then we count the number of customers with each count. There is some pre-filtering based on the comments while I'll just skip for the moment. The remaining fragment is
collections
.customers()
.map(|c| c.cust_key)
.concat(&orders)
.count()
.map(|(_cust_key, count)| (count-1) as usize)
.count()
This is a total sane query, but it doesn't exploit some sweet structure we have: the keys for each of the count
calls are themselves unsigned integers. Rather than hashing these integers all over the place (which is how we work with generic keys) we can use a special form of count
which operates on unsigned integer keys: count_u
.
1,000 | 10,000 | 100,000 | 1,000,000 | peak rate | hot dog | |
---|---|---|---|---|---|---|
query13 | 9.18s | 5.99s | 4.79s | 3.77s | 438.03K/s | 779.52K/s |
query13+ | 6.24s | 4.58s | 3.83s | 3.05s | 541.68K/s |
This is a decent improvement, and it applies pretty broadly. To be honest, I did this last because I totally forgot about it, so several of the other queries that have joins and groupings on keys, which are all nice integers in this setting, should be similarly improved with join_u
, semijoin_u
, group_u
and all these things. The final measurements at the bottom reflect these updates, but the various query vignettes do not.
Query 4 does a few things, but from our point of view it (i) filters about half of the lineitems
away, and then (ii) calls distinct
on the remaining order_key
fields in evidence. Roughly, we need to filter down orders by those with a line item entry violating a delivery commitment; about half of the line item records violate their delivery commitment, so perhaps we shouldn't try too hard to emulate this particular firm.
let lineitems =
collections
.lineitems()
.flat_map(|l| if l.commit_date < l.receipt_date { Some(l.order_key).into_iter() } else { None.into_iter() })
.distinct();
collections
.orders()
.flat_map(|o|
if o.order_date >= create_date(1993, 7, 1) && o.order_date < create_date(1993, 10, 1) {
Some((o.order_key, o.order_priority)).into_iter()
}
else { None.into_iter() }
)
.semijoin(&lineitems)
.map(|o| o.1)
.count()
If we run the query as written we get performance like so:
1,000 | 10,000 | 100,000 | 1,000,000 | peak rate | hot dog | |
---|---|---|---|---|---|---|
query04 | 13.34s | 7.64s | 4.46s | 2.86s | 2.62M/s | 10.08M/s |
This query gives us a great opportunity to show off an excellent feature of differential dataflow! Let's hope that it has a noticeable (and positive) impact.
The distinct
operator, and group
operators generally, maintain an indexed form of their output, so that they can easily determine what it was they previously produced as output. It happens that this indexed form of output is exactly the form that join
and group
operators want to consume as input, although for reasons of abstraction they don't do this by default. Instead, distinct
s output gets turned back into a stream of updates, and the semijoin
operator re-indexes it. What a waste!
Here is the distinct
implementation:
fn distinct(&self) -> Collection<G, K, isize> {
self.arrange_by_self()
.group_arranged(|_k,_s,t| t.push(((), 1)), DefaultKeyTrace::new())
.as_collection(|k,_| k.item.clone())
}
And here is the semijoin
implementation (apologies for the unclear generic parameters):
fn semijoin<R2: Diff>(&self, other: &Collection<G, K, R2>) -> Collection<G, (K, V), <R as Mul<R2>>::Output>
where R: Mul<R2>, <R as Mul<R2>>::Output: Diff {
let arranged1 = self.arrange_by_key_hashed();
let arranged2 = other.arrange_by_self();
arranged1.join_arranged(&arranged2, |k,v,_| (k.item.clone(), v.clone()))
}
All we need to do is stitch these together by hand, removing the as_collection()
call from distinct
and the arrange_by_self()
call from semijoin
. And add a bunch of use
statements for all the traits that aren't normally exposed.
Fun fact: Operators that can act on streams of pre-arranged data look something like a cheap-o version of operator fusion: while in principle we could in-line the distinct
and semijoin
code (as it works by the same key), this does something fairly similar and allows us more flexibility (or more free time for not writing the code analysis stuff).
When we do this, we see
1,000 | 10,000 | 100,000 | 1,000,000 | peak rate | hot dog | |
---|---|---|---|---|---|---|
query04 | 13.34s | 7.64s | 4.46s | 2.86s | 2.62M/s | 10.08M/s |
query04+ | 12.47s | 6.98s | 4.09s | 2.62s | 2.87M/s |
This is better, but not mind-blowingly better. Part of the reason, I think, is that the output of distinct
doesn't change all that much in this computation. We are only adding records to the input, so each order_key
enters the set at most once and never leaves. There are lots of orders, still, but not as much churn as we might expect. Also, the work we've save is just some fraction of the work we are doing; we still need to arrange the input to distinct
and the other input to semijoin
and the input to count
.
We will see another opportunity to apply this optimization to a more churn-y group
operator in a later query! Maybe it will work out better there.
Query 18 takes all of our trusty line items, and asks "how many orders have at least 300 units of quantity in them?". This hits lineitems
with a count
and then uses the results in a join
to restrict our attention to such orders. The counts change a lot more often than the distinct order keys from up in query 4, and should be a much better example of fusing the output of count
with the input to semijoin
. Let's look at the code:
collections
.lineitems()
.inner
.map(|(l, t, d)| (l.order_key, t, (l.quantity as isize) * d))
.as_collection()
.count()
.filter(|x| x.1 > 300)
.join(&collections.orders().map(|o| (o.order_key, (o.cust_key, o.order_date, o.total_price))))
.map(|(okey, quantity, (custkey, date, price))| (custkey, (okey, date, price, quantity)))
.join(&collections.customers().map(|c| (c.cust_key, c.name.to_string())))
Oh hell. That filter
lives between the count and the join. We could try moving the filter
after the join
, which would mean we would look up even small orders, but we would save ourselves the effort of re-organizing the stream of incoming changes to counts (roughly twelve million, as each line item changes a count, resulting in an addition and subtraction in the output for all but the first occurrence of each order).
Let's try it both ways and see how we do.
1,000 | 10,000 | 100,000 | 1,000,000 | peak rate | hot dog | |
---|---|---|---|---|---|---|
query18 | 38.28s | 23.57s | 16.99s | 12.04s | 637.39K/s | 1.13M/s |
query18+ | 35.24s | 21.64s | 14.92s | 10.30s | 743.00K/s |
This is definitely an improvement, and is interesting because it runs a little counter to the traditional wisdom that you should push filters as early as possible. Of course, we could wish to push the filter up, but we would need to find a way to re-use the existing index and stream of batches, which we should be able to do with a bit of friendly wrapping. There is an issue on the differential dataflow repo to start to think about exactly this sort of thing (generalized; it also helps with time stamp manipulation, and might help with some instances of map
).
This is the query that the dbtoaster.org folks performed the worst on, so I thought this would be good exercise. It is indeed a pretty non-standard query, or it certainly is expressed in a non-standard way. The query intent is pretty simple: report the suppliers with the largest revenue. This is awkwardly expressed as a selection where there is a nested query for the maximum sum of revenue, and the requirement that the revenue for the selected rows equal this maximum.
I find this a lot simpler in differential dataflow: from lineitems
we define a collection whose elements are supplier_key
with weight equal to their revenue. We then do a group that produces the elements with the largest revenue:
revenue.group(|_k, s, t| {
let max = s.iter().map(|x| x.1).max();
t.extend(s.iter().filter(|x| x.1 == max);
})
How does it work? Given that we are re-forming the input to the group
with every update, it works surprisingly well.
1,000 | 10,000 | 100,000 | 1,000,000 | peak rate | hot dog | |
---|---|---|---|---|---|---|
query15 | 35.58s | 29.46s | 31.43s | 41.29s | 204.03K/s | 17/s |
The rate looks pretty good, despite our primitive approach, but remember that there is an aggressive filter in front of all of this. It reduces the data down quite a bit first, but we still get credit for processing all of the tuples. The hot dog
numbers are low because in this case (and a few others like it) they fall back to a strategy of full recomputation, which is essentially fatal for this sort of problem.
Ok, can we be smarter maybe? Yes, always!
The maximum we computed up above is "global", in that the key we use is ()
because we want the actual maximum, not the maximum by nation, say. This means that when there is a change we have to reform the whole input, and there are something like 10,000 suppliers. Let's be smarter by doing a multi-level maximum:
revenue
.map(|supp| (supp % 100, supp))
.group(|_k, s, t| {
let max = s.iter().map(|x| x.1).max();
t.extend(s.iter().filter(|x| x.1 == max)
})
.map(|(_, supp)| ((), supp))
.group(|_k, s, t| {
let max = s.iter().map(|x| x.1).max();
t.extend(s.iter().filter(|x| x.1 == max);
});
We are just doing the max
twice, first independently on groups of 100 suppliers, and then on all of the winners of each of those contents. When one input changes, we must re-consider two inputs (for each group
) but their sizes are each reduced by roughly 100x.
1,000 | 10,000 | 100,000 | 1,000,000 | peak rate | hot dog | |
---|---|---|---|---|---|---|
query15 | 35.58s | 29.46s | 31.43s | 41.29s | 204.03K/s | 17/s |
query15+ | 2.84s | 1.32s | 0.77s | 0.85s | 7.76M/s |
As advertised, this is almost a 50x improvement (i.e. 100x / 2). For kicks, we can do a four-level maximum where at each we reduce the key space by a factor of ten.
1,000 | 10,000 | 100,000 | 1,000,000 | peak rate | hot dog | |
---|---|---|---|---|---|---|
query15 | 35.58s | 29.46s | 31.43s | 41.29s | 204.03K/s | 17/s |
query15+ | 2.84s | 1.32s | 0.77s | 0.85s | 7.76M/s | |
query15++ | 1.76s | 0.91s | 0.48s | 0.39s | 15.24M/s |
So that is neat. This went from one of the worst-performing queries, to one of our best performing queries.
There is something a bit dissatisfying about this approach. We've clearly done something a bit hack-y, and it might be nice to have a more plausibly robust version. We can do that, but I should warn you that it is not live yet. That being said, let me talk you through it.
We used the weight component of the revenue
collection to hold the revenue, and we scanned all of the suppliers looking for the largest revenue. Instead, we could have first done a count
, which would promote the revenue to data. We would have a (supplier, revenue)
collection, and our group would look pretty similar, except it would look at a data field rather than the accumulated count.
What you may not know is that group
presents the input data in a standard format: all values must implement Ord
meaning they can be sorted, and the group
operator presents input values in that order. We want the largest revenue elements, so perhaps we could swing our tuples around and put the revenue first and maybe negate it to make very large revenues come first:
revenue
.count()
.map(|(supplier, revenue)| (-revenue, supplier))
.group(|_k, s, t| {
let max = (s[0].0).0;
t.extend(s.iter().take_while(|x| (x.0).0 == max));
})
.map(|(neg_rev, supplier)| (supplier, -neg_rev));
There are two important changes from the version above, other than flipping the revenue around: (i) we determine max
just by reading the first field of the first value, and (ii) we use take_while
rather than filter
to extract the relevant prefix of the input.
Hey, we probably didn't even look at most of the input data, did we?
Unfortunately, at the moment the signature of the function you provide to group
is
L: Fn(&K, &[(V1, R1)], &mut Vec<(V2, R2)>)
This has a lot of guff in it, but the main point is that in addition to a key reference, and an output vector into which to dump results, you get provided access to the input as a fully formed slice. This means that group
actually went and did all the work of preparing the input collection for you. Even the parts you aren't going to look at. It does this because it is easier to do that than to let you interactively explore the input, but you totally could do that. In fact, that is how the version of differential dataflow that is live on crates.io
works.
Rather than eagerly construct the input for you, the group
operator could provide a cursor that lets you navigate the input, exploiting its order. It would only need to do the work of reconstructing the values you investigate (or those you pass over, if they accumulate to zero). This can be a substantial reduction in work, especially in the case of something like min
or max
, where you just want a few elements from the front of the list.
This is waaaaay more subtle than it looks, because when you look at the input collection you are not only communicating what bits of data you are interested in, you are also implicitly communicating to group
which updates would you notice if they came into scope. This is only really a thing for partially ordered times, but it can be a huge optimization. At the same time, it is important to actually do it correctly, and this is worth mathing out for partial orders (even if trivial and boring for total orders).
Better still, the underlying shared collection traces are organized by the same ordering on values, so you could make the cursor be very lazy and not even load data you don't ask for. This is very appealing for several reasons, not least of which that about half of the time in group
is spent merging data from the underlying mergeable lists. It is also subtle and complicated, because the moment a new value goes live (when your code first looks at it) we may be halfway through a computation with some supposed invariants about the logical times we are examining. It should all work out, but again, worth doing correctly. Watch this space.
This is another neat query, in the spirit of query 15. Rather than extract the suppliers with maximum revenue, the query asks for those parts whose total value (cost times quantity) is some non-trivial fraction of the total value (for suppliers in some given nation).
This is a bit like the maximum, in that we want only the most significant folks, but we need to use a function of the total as the threshold, rather than the maximum. If we tried the approach up above, we would need to scan through all records just to get the total.
At the same time, we can compute and maintain the total relatively efficiently not using a group
, just by using count
with the value stashed in the count field. How can we combine the efficiently maintained count information with the constituent contributions? The only technique that leaps to mind is join
, which matches each contribution up with the total, which we can then filter down. This has the defect that whenever the total changes we must re-consider each of the contributions, and imagine that most of them are still as useless as ever.
The correct answer is a generalization of group
called cogroup
, which isn't correctly implemented at the moment. It is basically just group
with two inputs. You get to specify a function
L: Fn(&K, &[(V1, R1)], &[(V2, R2)], &mut Vec<(V3, R3)>)
The function sees two input collections, and gets to populate a third output collection. The idea would be that we have, like above, contributions ordered by magnitude on the first input, and the total on the second input. Whenever we re-evaluate, we need only explore the total, and then the prefix of the contributions that are significant enough. We ignore the less interesting contributions just as we ignored small values in the example of maximum up above.
Of course none of this works today. Instead, we recompute all over the place like we did with the first instance of maximum. We get similarly mediocre performance:
1,000 | 10,000 | 100,000 | 1,000,000 | peak rate | hot dog | |
---|---|---|---|---|---|---|
query11 | 10.41s | 9.46s | 10.11s | 11.18s | 85.58K/s | 768/s |
This should probably be up in the millions per second, and will be once I get off my duff and implement the cursor-based group
and cogroup
operators. The main reason it might not is if there in fact lots of records that are "significant fractions" of the total, in which case we'll have to look at all of them.
Query 22 is a bit like queries 15 and 11: it asks for a subset of the customers that have an above-average account balance. Unfortunately, there are probably lots of customers with an above-average account balance, and we shouldn't expect to look at just a few records only because we sorted the data. Even more unfortunately, the average isn't over the same set of customers: we want customers with a specific property (no orders present) with balance above the average of all customers; we would probably be in trouble if we just formed up all customers and worked with that set.
This query presents some problems, but it does have an efficient implementation. Just, not in the current differential dataflow operators. Join with inequality constraints are not hard to incrementally update: if your customers are ordered by balance and your threshold changes, the change in output is exactly those customers between the old and new threshold, which can be found by e.g. binary search.
That being said, there are some complexities that make this a bit awkward. We cannot simply use join as it currently exists, as we will produce too much data to consider (we want to ignore everyone outside of the new and old thresholds). We cannot use group
or the hypothetical cogroup
because we are not allowed to override their "difference production" computation. We also have some structural constraints that must stay true, like "the threshold collection must have only a single value"; if this ever stops being the case, either an empty threshold or two of them, I have no idea what happens semantically.
So this is future work, in that I know what I would code up if I just needed to solve this problem once. I'd like to think more about the more general framework that would support this sort of operator, and how much can be handled automatically.
In the meantime, here are some mediocre performance numbers:
1,000 | 10,000 | 100,000 | 1,000,000 | peak rate | hot dog | |
---|---|---|---|---|---|---|
query22 | 33.72s | 39.73s | 46.59s | 45.11s | 49.01K/s | 189/s |
These numbers seem to have lots of room for improvement. If nothing else, they get worse with increasing physical batch size, which I think of as a bug. I haven't sorted out exactly who the culprit is; the only intended non-linear behavior is sorting, although it would be surprising if that was the most expensive part (given how expensive everything else seems to be).
How do things look as we make time more and more coarse-grained? We cut down on the number of distinct updates that have to happen, and in some computations we might expect this to be very helpful. In other computations, there may be no particular reason for things to get any better. Let's see!
We are going to hold a fixed product of logical and physical batching, so that we are holding the number of tuples introduced fixed. I decided on 100,000 because there is an awkward internal threshold around 1,000,000 where differential decides that attempting in-place compaction might be a good idea, and without lots of logical batching this does approximately zero for us (it is mostly present as insurance against overrunning memory when consolidation could happen, as is often the case in graph processing).
1 x 100000 | 10 x 10000 | 100 x 1000 | 1000 x 100 | 10000 x 10 | 100000 x 1 | peak rate | |
---|---|---|---|---|---|---|---|
query01 | 5.09s | 4.42s | 1.67s | 0.88s | 0.80s | 0.79s | 7.59M/s |
query02 | 0.35s | 0.32s | 0.32s | 0.31s | 0.32s | 0.40s | 3.30M/s |
query03 | 1.74s | 1.10s | 0.95s | 0.93s | 0.97s | 0.93s | 8.25M/s |
query04 | 1.94s | 1.87s | 1.45s | 1.40s | 1.32s | 1.25s | 5.99M/s |
query05 | 1.79s | 2.12s | 2.33s | 2.14s | 2.19s | 2.19s | 3.58M/s |
query06 | 0.17s | 0.17s | 0.17s | 0.17s | 0.17s | 0.16s | 36.54M/s |
query07 | 1.42s | 1.49s | 1.38s | 1.42s | 1.43s | 1.50s | 5.55M/s |
query08 | 3.07s | 3.19s | 2.95s | 3.34s | 3.32s | 3.40s | 2.60M/s |
query09 | 4.70s | 5.09s | 5.00s | 5.04s | 5.09s | 4.72s | 1.81M/s |
query10 | 2.72s | 2.66s | 2.41s | 2.51s | 2.44s | 2.39s | 3.20M/s |
query11 | 9.99s | 9.65s | 7.89s | 2.16s | 0.42s | 0.30s | 2.67M/s |
query12 | 0.58s | 0.57s | 0.59s | 0.58s | 0.58s | 0.57s | 13.08M/s |
query13 | 3.80s | 3.65s | 2.84s | 2.47s | 2.33s | 2.21s | 746.20K/s |
query14 | 0.37s | 0.36s | 0.37s | 0.37s | 0.36s | 0.38s | 17.06M/s |
query15 | 0.48s | 0.48s | 0.48s | 0.48s | 0.47s | 0.46s | 13.11M/s |
query16 | 0.91s | 0.68s | 0.55s | 0.50s | 0.51s | 0.58s | 2.04M/s |
query17 | 2.85s | 2.76s | 2.75s | 2.84s | 2.77s | 2.72s | 2.28M/s |
query18 | 10.09s | 8.93s | 3.63s | 3.26s | 3.29s | 3.15s | 2.43M/s |
query19 | 0.25s | 0.25s | 0.26s | 0.25s | 0.25s | 0.24s | 26.18M/s |
query20 | 0.69s | 0.71s | 0.70s | 0.69s | 0.69s | 0.74s | 9.88M/s |
query21 | 5.78s | 6.26s | 5.32s | 4.90s | 4.92s | 5.14s | 1.52M/s |
query22 | 36.63s | 36.12s | 29.71s | 7.74s | 1.48s | 0.99s | 1.66M/s |
There are a bunch of interesting bits in here. Many computations get no better with logical batching. This could make sense (I haven't looked yet!) when the supplied records mostly end up in the output, rather than being accumulated down. We see some pretty epic improvements, often in more problematic queries (e.g. q11
and q22
). All of the queries get up to half a million updates per second, suggesting they could sustain this rate if we can coarsen time to cope with load. Queries q05
, q16
, and q21
have a bit of a wobbly profile, which seems reproducible and worth looking into; the improvement with logical batching is only due to operator code being written to take advantage of it.
How do things look as we add more workers. Or, in this case, as we add one more worker! Several computations, in particular those with global aggregation, aren't going to gain a lot from a second worker. Differential dataflow doesn't currently have anything in place for pre-aggregation, though it fits perfectly well (we just need to lie about the need for partitioning the data). We currently avoid pre-aggregation because we would like to keep the per-key state linear in the number of keys, rather than keys times workers; if the user wants to opt out of this, super! We should let them.
To take an example, query 15 initially has a global aggregation to determine the maximum. This puts all of the work on one worker. As we introduced the hierarchical aggregation, with the goal of improving the incremental update times, we also introduce the opportunity for concurrent work due to the multiple keys involved.
100000 x 1 | 100000 x 1 -w2 | peak rate | improvement | |
---|---|---|---|---|
query01 | 0.79s | 0.67s | 8.91M/s | 1.13x |
query02 | 0.40s | 0.26s | 3.94M/s | 1.46x |
query03 | 0.93s | 0.67s | 11.38M/s | 1.54x |
query04 | 1.25s | 0.87s | 8.65M/s | 1.73x |
query05 | 2.19s | 1.80s | 4.26M/s | 1.31x |
query06 | 0.16s | 0.12s | 49.22M/s | 1.33x |
query07 | 1.50s | 0.96s | 7.97M/s | 1.48x |
query08 | 3.40s | 2.58s | 2.97M/s | 1.25x |
query09 | 4.72s | 4.01s | 2.12M/s | 1.16x |
query10 | 2.39s | 1.57s | 4.88M/s | 1.51x |
query11 | 0.30s | 0.22s | 3.58M/s | 1.44x |
query12 | 0.57s | 0.35s | 21.39M/s | 1.52x |
query13 | 2.21s | 1.14s | 1.45M/s | 1.94x |
query14 | 0.38s | 0.27s | 23.19M/s | 1.47x |
query15 | 0.46s | 0.29s | 20.66M/s | 1.48x |
query16 | 0.58s | 0.38s | 2.65M/s | 1.64x |
query17 | 2.72s | 2.35s | 2.64M/s | 1.15x |
query18 | 3.15s | 2.33s | 3.28M/s | 1.67x |
query19 | 0.24s | 0.16s | 39.92M/s | 1.50x |
query20 | 0.74s | 0.58s | 11.60M/s | 1.40x |
query21 | 5.14s | 3.60s | 2.08M/s | 1.41x |
query22 | 0.99s | 0.58s | 2.83M/s | 1.88x |
Oh hooray! They are all above one million tuples per second now! One million tuples piped into who knows what computations. It is possible that some of these are even correct, but let's not get ahead of ourselves. If nothing else, differential dataflow is good at moving tuples around.
Also, if you are the sort of person who notices, all of the two-worker times are better than the corresponding one-worker time. And we didn't even write the code with parallelism in mind. I swear.
It is pretty nice having a load of computations which exercise different aspects of computation. We have some that stress the internals, and how quickly we can pick up and put down data. We have others that stress the expressive power of differential dataflow, and whose performance increases by orders of magnitude when we write a more clever program. I have several more things to work on now, and specific numbers to monitor.
We may have produced approximations to the TPCH workload; I'm still not sure. If we have, then we have a suite of relatively decent numbers, showing that with multiple workers and logical batching you can accept one million updates per second for any of the queries. Running all of the queries concurrently would also be cool, but given that there is no sharing going on, it would probably drop down to one twenty-second of one million tuples per second.
If you are familiar with the How to Win a Hot Dog Eating Contest paper, this post reaches some conclusions that are similar, some that are different. We see consistent high throughput across the queries, where they saw a mix of great and mediocre throughputs. With batching, many of their queries improve, but some still lag behind (e.g. their q09
, q15
, and q16
). Scaling out to multiple workers always helps us, where it doesn't always help them.
As a meta-conclusion, Naiad (and I guess implicitly differential dataflow) got dinged in their related work section thusly:
Naiad [27] and Trill [11] support flat LINQ-style continuous queries, while for complex queries with nested aggregates the user has to encode efficient execution plans. Spark Streaming [39] allows running simple SQL queries but only for windowed data. In contrast to these systems, our approach: 1) favors declarativity as the user only needs to specify input SQL queries without execution plans; 2) can incrementally maintain queries with equality-correlated nested aggregates; 3) generates code tailored to the given workload; our compilation framework can target any scalable system with a synchronous execution model.
While it is easy to understand the advantage of declarativity, especially if it opens up the space to analysts who don't have a background in "programming", some of the numbers we saw could shine some light on current limitations of declarativity from the perspective of performance. In several cases, but most notably query 15, our ability to specify a semantically equivalent (I hope!) computation with an improved implementation led to multiple orders of magnitude performance improvements. I'm personally a fan of declarative programming idioms where you can drop in more instructive operations, down to the level of imperative programming if required. It's all in service of better performing systems, and you should use whatever works best; some times that is the clever database optimizer, and many times it just isn't.