# About This document is an updated version of the original design documents by Spencer Kimball from early 2014. It may not always be completely up to date. For a more approachable explanation of how CockroachDB works, consider reading the [Architecture docs](https://www.cockroachlabs.com/docs/stable/architecture/overview.html). # Overview CockroachDB is a distributed SQL database. The primary design goals are **scalability**, **strong consistency** and **survivability** (hence the name). CockroachDB aims to tolerate disk, machine, rack, and even **datacenter failures** with minimal latency disruption and **no manual intervention**. CockroachDB nodes are symmetric; a design goal is **homogeneous deployment** (one binary) with minimal configuration and no required external dependencies. The entry point for database clients is the SQL interface. Every node in a CockroachDB cluster can act as a client SQL gateway. A SQL gateway transforms and executes client SQL statements to key-value (KV) operations, which the gateway distributes across the cluster as necessary and returns results to the client. CockroachDB implements a **single, monolithic sorted map** from key to value where both keys and values are byte strings. The KV map is logically composed of smaller segments of the keyspace called ranges. Each range is backed by data stored in a local KV storage engine (we use [RocksDB](http://rocksdb.org/), a variant of [LevelDB](https://github.com/google/leveldb)). Range data is replicated to a configurable number of additional CockroachDB nodes. Ranges are merged and split to maintain a target size, by default `64M`. The relatively small size facilitates quick repair and rebalancing to address node failures, new capacity and even read/write load. However, the size must be balanced against the pressure on the system from having more ranges to manage. CockroachDB achieves horizontally scalability: - adding more nodes increases the capacity of the cluster by the amount of storage on each node (divided by a configurable replication factor), theoretically up to 4 exabytes (4E) of logical data; - client queries can be sent to any node in the cluster, and queries can operate independently (w/o conflicts), meaning that overall throughput is a linear factor of the number of nodes in the cluster. - queries are distributed (ref: distributed SQL) so that the overall throughput of single queries can be increased by adding more nodes. CockroachDB achieves strong consistency: - uses a distributed consensus protocol for synchronous replication of data in each key value range. We’ve chosen to use the [Raft consensus algorithm](https://raftconsensus.github.io); all consensus state is stored in RocksDB. - single or batched mutations to a single range are mediated via the range's Raft instance. Raft guarantees ACID semantics. - logical mutations which affect multiple ranges employ distributed transactions for ACID semantics. CockroachDB uses an efficient **non-locking distributed commit** protocol. CockroachDB achieves survivability: - range replicas can be co-located within a single datacenter for low latency replication and survive disk or machine failures. They can be distributed across racks to survive some network switch failures. - range replicas can be located in datacenters spanning increasingly disparate geographies to survive ever-greater failure scenarios from datacenter power or networking loss to regional power failures (e.g. `{ US-East-1a, US-East-1b, US-East-1c }`, `{ US-East, US-West, Japan }`, `{ Ireland, US-East, US-West}`, `{ Ireland, US-East, US-West, Japan, Australia }`). CockroachDB provides [snapshot isolation](http://en.wikipedia.org/wiki/Snapshot_isolation) (SI) and serializable snapshot isolation (SSI) semantics, allowing **externally consistent, lock-free reads and writes**--both from a historical snapshot timestamp and from the current wall clock time. SI provides lock-free reads and writes but still allows write skew. SSI eliminates write skew, but introduces a performance hit in the case of a contentious system. SSI is the default isolation; clients must consciously decide to trade correctness for performance. CockroachDB implements [a limited form of linearizability ](#strict-serializability-linearizability), providing ordering for any observer or chain of observers. Similar to [Spanner](http://static.googleusercontent.com/media/research.google.com/en/us/archive/spanner-osdi2012.pdf) directories, CockroachDB allows configuration of arbitrary zones of data. This allows replication factor, storage device type, and/or datacenter location to be chosen to optimize performance and/or availability. Unlike Spanner, zones are monolithic and don’t allow movement of fine grained data on the level of entity groups. # Architecture CockroachDB implements a layered architecture. The highest level of abstraction is the SQL layer (currently unspecified in this document). It depends directly on the [*SQL layer*](#sql), which provides familiar relational concepts such as schemas, tables, columns, and indexes. The SQL layer in turn depends on the [distributed key value store](#key-value-api), which handles the details of range addressing to provide the abstraction of a single, monolithic key value store. The distributed KV store communicates with any number of physical cockroach nodes. Each node contains one or more stores, one per physical device. ![Architecture](media/architecture.png) Each store contains potentially many ranges, the lowest-level unit of key-value data. Ranges are replicated using the Raft consensus protocol. The diagram below is a blown up version of stores from four of the five nodes in the previous diagram. Each range is replicated three ways using raft. The color coding shows associated range replicas. ![Ranges](media/ranges.png) Each physical node exports two RPC-based key value APIs: one for external clients and one for internal clients (exposing sensitive operational features). Both services accept batches of requests and return batches of responses. Nodes are symmetric in capabilities and exported interfaces; each has the same binary and may assume any role. Nodes and the ranges they provide access to can be arranged with various physical network topologies to make trade offs between reliability and performance. For example, a triplicated (3-way replica) range could have each replica located on different: - disks within a server to tolerate disk failures. - servers within a rack to tolerate server failures. - servers on different racks within a datacenter to tolerate rack power/network failures. - servers in different datacenters to tolerate large scale network or power outages. Up to `F` failures can be tolerated, where the total number of replicas `N = 2F + 1` (e.g. with 3x replication, one failure can be tolerated; with 5x replication, two failures, and so on). # Keys Cockroach keys are arbitrary byte arrays. Keys come in two flavors: system keys and table data keys. System keys are used by Cockroach for internal data structures and metadata. Table data keys contain SQL table data (as well as index data). System and table data keys are prefixed in such a way that all system keys sort before any table data keys. System keys come in several subtypes: - **Global** keys store cluster-wide data such as the "meta1" and "meta2" keys as well as various other system-wide keys such as the node and store ID allocators. - **Store local** keys are used for unreplicated store metadata (e.g. the `StoreIdent` structure). "Unreplicated" indicates that these values are not replicated across multiple stores because the data they hold is tied to the lifetime of the store they are present on. - **Range local** keys store range metadata that is associated with a global key. Range local keys have a special prefix followed by a global key and a special suffix. For example, transaction records are range local keys which look like: `\x01ktxn-`. - **Replicated Range ID local** keys store range metadata that is present on all of the replicas for a range. These keys are updated via Raft operations. Examples include the range lease state and abort span entries. - **Unreplicated Range ID local** keys store range metadata that is local to a replica. The primary examples of such keys are the Raft state and Raft log. Table data keys are used to store all SQL data. Table data keys contain internal structure as described in the section on [mapping data between the SQL model and KV](#data-mapping-between-the-sql-model-and-kv). # Versioned Values Cockroach maintains historical versions of values by storing them with associated commit timestamps. Reads and scans can specify a snapshot time to return the most recent writes prior to the snapshot timestamp. Older versions of values are garbage collected by the system during compaction according to a user-specified expiration interval. In order to support long-running scans (e.g. for MapReduce), all versions have a minimum expiration. Versioned values are supported via modifications to RocksDB to record commit timestamps and GC expirations per key. # Lock-Free Distributed Transactions Cockroach provides distributed transactions without locks. Cockroach transactions support two isolation levels: - snapshot isolation (SI) and - *serializable* snapshot isolation (SSI). *SI* is simple to implement, highly performant, and correct for all but a handful of anomalous conditions (e.g. write skew). *SSI* requires just a touch more complexity, is still highly performant (less so with contention), and has no anomalous conditions. Cockroach’s SSI implementation is based on ideas from the literature and some possibly novel insights. SSI is the default level, with SI provided for application developers who are certain enough of their need for performance and the absence of write skew conditions to consciously elect to use it. In a lightly contended system, our implementation of SSI is just as performant as SI, requiring no locking or additional writes. With contention, our implementation of SSI still requires no locking, but will end up aborting more transactions. Cockroach’s SI and SSI implementations prevent starvation scenarios even for arbitrarily long transactions. See the [Cahill paper](https://ses.library.usyd.edu.au/bitstream/handle/2123/5353/michael-cahill-2009-thesis.pdf) for one possible implementation of SSI. This is another [great paper](http://cs.yale.edu/homes/thomson/publications/calvin-sigmod12.pdf). For a discussion of SSI implemented by preventing read-write conflicts (in contrast to detecting them, called write-snapshot isolation), see the [Yabandeh paper](https://courses.cs.washington.edu/courses/cse444/10sp/544M/READING-LIST/fekete-sigmod2008.pdf), which is the source of much inspiration for Cockroach’s SSI. Both SI and SSI require that the outcome of reads must be preserved, i.e. a write of a key at a lower timestamp than a previous read must not succeed. To this end, each range maintains a bounded *in-memory* cache from key range to the latest timestamp at which it was read. Most updates to this *timestamp cache* correspond to keys being read, though the timestamp cache also protects the outcome of some writes (notably range deletions) which consequently must also populate the cache. The cache’s entries are evicted oldest timestamp first, updating the low water mark of the cache appropriately. Each Cockroach transaction is assigned a random priority and a "candidate timestamp" at start. The candidate timestamp is the provisional timestamp at which the transaction will commit, and is chosen as the current clock time of the node coordinating the transaction. This means that a transaction without conflicts will usually commit with a timestamp that, in absolute time, precedes the actual work done by that transaction. In the course of coordinating a transaction between one or more distributed nodes, the candidate timestamp may be increased, but will never be decreased. The core difference between the two isolation levels SI and SSI is that the former allows the transaction's candidate timestamp to increase and the latter does not. **Hybrid Logical Clock** Each cockroach node maintains a hybrid logical clock (HLC) as discussed in the [Hybrid Logical Clock paper](http://www.cse.buffalo.edu/tech-reports/2014-04.pdf). HLC time uses timestamps which are composed of a physical component (thought of as and always close to local wall time) and a logical component (used to distinguish between events with the same physical component). It allows us to track causality for related events similar to vector clocks, but with less overhead. In practice, it works much like other logical clocks: When events are received by a node, it informs the local HLC about the timestamp supplied with the event by the sender, and when events are sent a timestamp generated by the local HLC is attached. For a more in depth description of HLC please read the paper. Our implementation is [here](https://github.com/cockroachdb/cockroach/blob/master/pkg/util/hlc/hlc.go). Cockroach picks a Timestamp for a transaction using HLC time. Throughout this document, *timestamp* always refers to the HLC time which is a singleton on each node. The HLC is updated by every read/write event on the node, and the HLC time >= wall time. A read/write timestamp received in a cockroach request from another node is not only used to version the operation, but also updates the HLC on the node. This is useful in guaranteeing that all data read/written on a node is at a timestamp < next HLC time. **Transaction execution flow** Transactions are executed in two phases: 1. Start the transaction by selecting a range which is likely to be heavily involved in the transaction and writing a new transaction record to a reserved area of that range with state "PENDING". In parallel write an "intent" value for each datum being written as part of the transaction. These are normal MVCC values, with the addition of a special flag (i.e. “intent”) indicating that the value may be committed after the transaction itself commits. In addition, the transaction id (unique and chosen at txn start time by client) is stored with intent values. The txn id is used to refer to the transaction record when there are conflicts and to make tie-breaking decisions on ordering between identical timestamps. Each node returns the timestamp used for the write (which is the original candidate timestamp in the absence of read/write conflicts); the client selects the maximum from amongst all write timestamps as the final commit timestamp. 2. Commit the transaction by updating its transaction record. The value of the commit entry contains the candidate timestamp (increased as necessary to accommodate any latest read timestamps). Note that the transaction is considered fully committed at this point and control may be returned to the client. In the case of an SI transaction, a commit timestamp which was increased to accommodate concurrent readers is perfectly acceptable and the commit may continue. For SSI transactions, however, a gap between candidate and commit timestamps necessitates transaction restart (note: restart is different than abort--see below). After the transaction is committed, all written intents are upgraded in parallel by removing the “intent” flag. The transaction is considered fully committed before this step and does not wait for it to return control to the transaction coordinator. In the absence of conflicts, this is the end. Nothing else is necessary to ensure the correctness of the system. **Conflict Resolution** Things get more interesting when a reader or writer encounters an intent record or newly-committed value in a location that it needs to read or write. This is a conflict, usually causing either of the transactions to abort or restart depending on the type of conflict. ***Transaction restart:*** This is the usual (and more efficient) type of behaviour and is used except when the transaction was aborted (for instance by another transaction). In effect, that reduces to two cases; the first being the one outlined above: An SSI transaction that finds upon attempting to commit that its commit timestamp has been pushed. The second case involves a transaction actively encountering a conflict, that is, one of its readers or writers encounter data that necessitate conflict resolution (see transaction interactions below). When a transaction restarts, it changes its priority and/or moves its timestamp forward depending on data tied to the conflict, and begins anew reusing the same txn id. The prior run of the transaction might have written some write intents, which need to be deleted before the transaction commits, so as to not be included as part of the transaction. These stale write intent deletions are done during the reexecution of the transaction, either implicitly, through writing new intents to the same keys as part of the reexecution of the transaction, or explicitly, by cleaning up stale intents that are not part of the reexecution of the transaction. Since most transactions will end up writing to the same keys, the explicit cleanup run just before committing the transaction is usually a NOOP. ***Transaction abort:*** This is the case in which a transaction, upon reading its transaction record, finds that it has been aborted. In this case, the transaction can not reuse its intents; it returns control to the client before cleaning them up (other readers and writers would clean up dangling intents as they encounter them) but will make an effort to clean up after itself. The next attempt (if applicable) then runs as a new transaction with **a new txn id**. ***Transaction interactions:*** There are several scenarios in which transactions interact: - **Reader encounters write intent or value with newer timestamp far enough in the future**: This is not a conflict. The reader is free to proceed; after all, it will be reading an older version of the value and so does not conflict. Recall that the write intent may be committed with a later timestamp than its candidate; it will never commit with an earlier one. - **Reader encounters write intent or value with newer timestamp in the near future:** In this case, we have to be careful. The newer intent may, in absolute terms, have happened in our read's past if the clock of the writer is ahead of the node serving the values. In that case, we would need to take this value into account, but we just don't know. Hence the transaction restarts, using instead a future timestamp (but remembering a maximum timestamp used to limit the uncertainty window to the maximum clock offset). In fact, this is optimized further; see the details under "choosing a time stamp" below. - **Reader encounters write intent with older timestamp**: the reader must follow the intent’s transaction id to the transaction record. If the transaction has already been committed, then the reader can just read the value. If the write transaction has not yet been committed, then the reader has two options. If the write conflict is from an SI transaction, the reader can *push that transaction's commit timestamp into the future* (and consequently not have to read it). This is simple to do: the reader just updates the transaction’s commit timestamp to indicate that when/if the transaction does commit, it should use a timestamp *at least* as high. However, if the write conflict is from an SSI transaction, the reader must compare priorities. If the reader has the higher priority, it pushes the transaction’s commit timestamp (that transaction will then notice its timestamp has been pushed, and restart). If it has the lower or same priority, it retries itself using as a new priority `max(new random priority, conflicting txn’s priority - 1)`. - **Writer encounters uncommitted write intent**: If the other write intent has been written by a transaction with a lower priority, the writer aborts the conflicting transaction. If the write intent has a higher or equal priority the transaction retries, using as a new priority *max(new random priority, conflicting txn’s priority - 1)*; the retry occurs after a short, randomized backoff interval. - **Writer encounters newer committed value**: The committed value could also be an unresolved write intent made by a transaction that has already committed. The transaction restarts. On restart, the same priority is reused, but the candidate timestamp is moved forward to the encountered value's timestamp. - **Writer encounters more recently read key**: The *read timestamp cache* is consulted on each write at a node. If the write’s candidate timestamp is earlier than the low water mark on the cache itself (i.e. its last evicted timestamp) or if the key being written has a read timestamp later than the write’s candidate timestamp, this later timestamp value is returned with the write. A new timestamp forces a transaction restart only if it is serializable. **Transaction management** Transactions are managed by the client proxy (or gateway in SQL Azure parlance). Unlike in Spanner, writes are not buffered but are sent directly to all implicated ranges. This allows the transaction to abort quickly if it encounters a write conflict. The client proxy keeps track of all written keys in order to resolve write intents asynchronously upon transaction completion. If a transaction commits successfully, all intents are upgraded to committed. In the event a transaction is aborted, all written intents are deleted. The client proxy doesn’t guarantee it will resolve intents. In the event the client proxy restarts before the pending transaction is committed, the dangling transaction would continue to "live" until aborted by another transaction. Transactions periodically heartbeat their transaction record to maintain liveness. Transactions encountered by readers or writers with dangling intents which haven’t been heartbeat within the required interval are aborted. In the event the proxy restarts after a transaction commits but before the asynchronous resolution is complete, the dangling intents are upgraded when encountered by future readers and writers and the system does not depend on their timely resolution for correctness. An exploration of retries with contention and abort times with abandoned transaction is [here](https://docs.google.com/document/d/1kBCu4sdGAnvLqpT-_2vaTbomNmX3_saayWEGYu1j7mQ/edit?usp=sharing). **Transaction Records** Please see [pkg/roachpb/data.proto](https://github.com/cockroachdb/cockroach/blob/master/pkg/roachpb/data.proto) for the up-to-date structures, the best entry point being `message Transaction`. **Pros** - No requirement for reliable code execution to prevent stalled 2PC protocol. - Readers never block with SI semantics; with SSI semantics, they may abort. - Lower latency than traditional 2PC commit protocol (w/o contention) because second phase requires only a single write to the transaction record instead of a synchronous round to all transaction participants. - Priorities avoid starvation for arbitrarily long transactions and always pick a winner from between contending transactions (no mutual aborts). - Writes not buffered at client; writes fail fast. - No read-locking overhead required for *serializable* SI (in contrast to other SSI implementations). - Well-chosen (i.e. less random) priorities can flexibly give probabilistic guarantees on latency for arbitrary transactions (for example: make OLTP transactions 10x less likely to abort than low priority transactions, such as asynchronously scheduled jobs). **Cons** - Reads from non-lease holder replicas still require a ping to the lease holder to update the *read timestamp cache*. - Abandoned transactions may block contending writers for up to the heartbeat interval, though average wait is likely to be considerably shorter (see [graph in link](https://docs.google.com/document/d/1kBCu4sdGAnvLqpT-_2vaTbomNmX3_saayWEGYu1j7mQ/edit?usp=sharing)). This is likely considerably more performant than detecting and restarting 2PC in order to release read and write locks. - Behavior different than other SI implementations: no first writer wins, and shorter transactions do not always finish quickly. Element of surprise for OLTP systems may be a problematic factor. - Aborts can decrease throughput in a contended system compared with two phase locking. Aborts and retries increase read and write traffic, increase latency and decrease throughput. **Choosing a Timestamp** A key challenge of reading data in a distributed system with clock offset is choosing a timestamp guaranteed to be greater than the latest timestamp of any committed transaction (in absolute time). No system can claim consistency and fail to read already-committed data. Accomplishing consistency for transactions (or just single operations) accessing a single node is easy. The timestamp is assigned by the node itself, so it is guaranteed to be at a greater timestamp than all the existing timestamped data on the node. For multiple nodes, the timestamp of the node coordinating the transaction `t` is used. In addition, a maximum timestamp `t+ε` is supplied to provide an upper bound on timestamps for already-committed data (`ε` is the maximum clock offset). As the transaction progresses, any data read which have timestamps greater than `t` but less than `t+ε` cause the transaction to abort and retry with the conflicting timestamp tc, where tc \> t. The maximum timestamp `t+ε` remains the same. This implies that transaction restarts due to clock uncertainty can only happen on a time interval of length `ε`. We apply another optimization to reduce the restarts caused by uncertainty. Upon restarting, the transaction not only takes into account tc, but the timestamp of the node at the time of the uncertain read tnode. The larger of those two timestamps tc and tnode (likely equal to the latter) is used to increase the read timestamp. Additionally, the conflicting node is marked as “certain”. Then, for future reads to that node within the transaction, we set `MaxTimestamp = Read Timestamp`, preventing further uncertainty restarts. Correctness follows from the fact that we know that at the time of the read, there exists no version of any key on that node with a higher timestamp than tnode. Upon a restart caused by the node, if the transaction encounters a key with a higher timestamp, it knows that in absolute time, the value was written after tnode was obtained, i.e. after the uncertain read. Hence the transaction can move forward reading an older version of the data (at the transaction's timestamp). This limits the time uncertainty restarts attributed to a node to at most one. The tradeoff is that we might pick a timestamp larger than the optimal one (> highest conflicting timestamp), resulting in the possibility of a few more conflicts. We expect retries will be rare, but this assumption may need to be revisited if retries become problematic. Note that this problem does not apply to historical reads. An alternate approach which does not require retries makes a round to all node participants in advance and chooses the highest reported node wall time as the timestamp. However, knowing which nodes will be accessed in advance is difficult and potentially limiting. Cockroach could also potentially use a global clock (Google did this with [Percolator](https://www.usenix.org/legacy/event/osdi10/tech/full_papers/Peng.pdf)), which would be feasible for smaller, geographically-proximate clusters. # Strict Serializability (Linearizability) Roughly speaking, the gap between strict serializability (which we use interchangeably with linearizability) and CockroachDB's default isolation level (serializable) is that with linearizable transactions, causality is preserved. That is, if one transaction (say, creating a posting for a user) waits for its predecessor (creating the user in the first place) to complete, one would hope that the logical timestamp assigned to the former is larger than that of the latter. In practice, in distributed databases this may not hold, the reason typically being that clocks across a distributed system are not perfectly synchronized and the "later" transaction touches a part disjoint from that on which the first transaction ran, resulting in clocks with disjoint information to decide on the commit timestamps. In practice, in CockroachDB many transactional workloads are actually linearizable, though the precise conditions are too involved to outline them here. Causality is typically not required for many transactions, and so it is advantageous to pay for it only when it *is* needed. CockroachDB implements this via causality tokens: When committing a transaction, a causality token can be retrieved and passed to the next transaction, ensuring that these two transactions get assigned increasing logical timestamps. Additionally, as better synchronized clocks become a standard commodity offered by cloud providers, CockroachDB can provide global linearizability by doing much the same that [Google's Spanner](http://research.google.com/archive/spanner.html) does: wait out the maximum clock offset after committing, but before returning to the client. See the blog post below for much more in-depth information. https://www.cockroachlabs.com/blog/living-without-atomic-clocks/ # Logical Map Content Logically, the map contains a series of reserved system key/value pairs preceding the actual user data (which is managed by the SQL subsystem). - `\x02`: Range metadata for range ending `\x03`. This a "meta1" key. - ... - `\x02`: Range metadata for range ending `\x03`. This a "meta1" key. - `\x03`: Range metadata for range ending ``. This a "meta2" key. - ... - `\x03`: Range metadata for range ending ``. This a "meta2" key. - `\x04{desc,node,range,store}-idegen`: ID generation oracles for various component types. - `\x04status-node-`: Store runtime metadata. - `\x04tsd`: Time-series data key. - ``: A user key. In practice, these keys are managed by the SQL subsystem, which employs its own key anatomy. # Stores and Storage Nodes contain one or more stores. Each store should be placed on a unique disk. Internally, each store contains a single instance of RocksDB with a block cache shared amongst all of the stores in a node. And these stores in turn have a collection of range replicas. More than one replica for a range will never be placed on the same store or even the same node. Early on, when a cluster is first initialized, the few default starting ranges will only have a single replica, but as soon as other nodes are available they will replicate to them until they've reached their desired replication factor, the default being 3. Zone configs can be used to control a range's replication factor and add constraints as to where the range's replicas can be located. When there is a change in a range's zone config, the range will up or down replicate to the appropriate number of replicas and move its replicas to the appropriate stores based on zone config's constraints. # Self Repair If a store has not been heard from (gossiped their descriptors) in some time, the default setting being 5 minutes, the cluster will consider this store to be dead. When this happens, all ranges that have replicas on that store are determined to be unavailable and removed. These ranges will then upreplicate themselves to other available stores until their desired replication factor is again met. If 50% or more of the replicas are unavailable at the same time, there is no quorum and the whole range will be considered unavailable until at least greater than 50% of the replicas are again available. # Rebalancing As more data are added to the system, some stores may grow faster than others. To combat this and to spread the overall load across the full cluster, replicas will be moved between stores maintaining the desired replication factor. The heuristics used to perform this rebalancing include: - the number of replicas per store - the total size of the data used per store - free space available per store In the future, some other factors that might be considered include: - cpu/network load per store - ranges that are used together often in queries - number of active ranges per store - number of range leases held per store # Range Metadata The default approximate size of a range is 64M (2\^26 B). In order to support 1P (2\^50 B) of logical data, metadata is needed for roughly 2\^(50 - 26) = 2\^24 ranges. A reasonable upper bound on range metadata size is roughly 256 bytes (3\*12 bytes for the triplicated node locations and 220 bytes for the range key itself). 2\^24 ranges \* 2\^8 B would require roughly 4G (2\^32 B) to store--too much to duplicate between machines. Our conclusion is that range metadata must be distributed for large installations. To keep key lookups relatively fast in the presence of distributed metadata, we store all the top-level metadata in a single range (the first range). These top-level metadata keys are known as *meta1* keys, and are prefixed such that they sort to the beginning of the key space. Given the metadata size of 256 bytes given above, a single 64M range would support 64M/256B = 2\^18 ranges, which gives a total storage of 64M \* 2\^18 = 16T. To support the 1P quoted above, we need two levels of indirection, where the first level addresses the second, and the second addresses user data. With two levels of indirection, we can address 2\^(18 + 18) = 2\^36 ranges; each range addresses 2\^26 B, and altogether we address 2\^(36+26) B = 2\^62 B = 4E of user data. For a given user-addressable `key1`, the associated *meta1* record is found at the successor key to `key1` in the *meta1* space. Since the *meta1* space is sparse, the successor key is defined as the next key which is present. The *meta1* record identifies the range containing the *meta2* record, which is found using the same process. The *meta2* record identifies the range containing `key1`, which is again found the same way (see examples below). Concretely, metadata keys are prefixed by `\x02` (meta1) and `\x03` (meta2); the prefixes `\x02` and `\x03` provide for the desired sorting behaviour. Thus, `key1`'s *meta1* record will reside at the successor key to `\x02`. Note: we append the end key of each range to meta{1,2} records because the RocksDB iterator only supports a Seek() interface which acts as a Ceil(). Using the start key of the range would cause Seek() to find the key *after* the meta indexing record we’re looking for, which would result in having to back the iterator up, an option which is both less efficient and not available in all cases. The following example shows the directory structure for a map with three ranges worth of data. Ellipses indicate additional key/value pairs to fill an entire range of data. For clarity, the examples use `meta1` and `meta2` to refer to the prefixes `\x02` and `\x03`. Except for the fact that splitting ranges requires updates to the range metadata with knowledge of the metadata layout, the range metadata itself requires no special treatment or bootstrapping. **Range 0** (located on servers `dcrama1:8000`, `dcrama2:8000`, `dcrama3:8000`) - `meta1\xff`: `dcrama1:8000`, `dcrama2:8000`, `dcrama3:8000` - `meta2`: `dcrama1:8000`, `dcrama2:8000`, `dcrama3:8000` - `meta2`: `dcrama4:8000`, `dcrama5:8000`, `dcrama6:8000` - `meta2\xff`: `dcrama7:8000`, `dcrama8:8000`, `dcrama9:8000` - ... - ``: `` **Range 1** (located on servers `dcrama4:8000`, `dcrama5:8000`, `dcrama6:8000`) - ... - ``: `` **Range 2** (located on servers `dcrama7:8000`, `dcrama8:8000`, `dcrama9:8000`) - ... - ``: `` Consider a simpler example of a map containing less than a single range of data. In this case, all range metadata and all data are located in the same range: **Range 0** (located on servers `dcrama1:8000`, `dcrama2:8000`, `dcrama3:8000`)* - `meta1\xff`: `dcrama1:8000`, `dcrama2:8000`, `dcrama3:8000` - `meta2\xff`: `dcrama1:8000`, `dcrama2:8000`, `dcrama3:8000` - ``: `` - `...` Finally, a map large enough to need both levels of indirection would look like (note that instead of showing range replicas, this example is simplified to just show range indexes): **Range 0** - `meta1`: Range 0 - `meta1\xff`: Range 1 - `meta2`: Range 1 - `meta2`: Range 2 - `meta2`: Range 3 - ... - `meta2`: Range 262143 **Range 1** - `meta2`: Range 262144 - `meta2`: Range 262145 - ... - `meta2\xff`: Range 500,000 - ... - ``: `` **Range 2** - ... - ``: `` **Range 3** - ... - ``: `` **Range 262144** - ... - ``: `` **Range 262145** - ... - ``: `` Note that the choice of range `262144` is just an approximation. The actual number of ranges addressable via a single metadata range is dependent on the size of the keys. If efforts are made to keep key sizes small, the total number of addressable ranges would increase and vice versa. From the examples above it’s clear that key location lookups require at most three reads to get the value for ``: 1. lower bound of `meta1` 2. lower bound of `meta2`, 3. ``. For small maps, the entire lookup is satisfied in a single RPC to Range 0. Maps containing less than 16T of data would require two lookups. Clients cache both levels of range metadata, and we expect that data locality for individual clients will be high. Clients may end up with stale cache entries. If on a lookup, the range consulted does not match the client’s expectations, the client evicts the stale entries and possibly does a new lookup. # Raft - Consistency of Range Replicas Each range is configured to consist of three or more replicas, as specified by their ZoneConfig. The replicas in a range maintain their own instance of a distributed consensus algorithm. We use the [*Raft consensus algorithm*](https://raftconsensus.github.io) as it is simpler to reason about and includes a reference implementation covering important details. [ePaxos](https://www.cs.cmu.edu/~dga/papers/epaxos-sosp2013.pdf) has promising performance characteristics for WAN-distributed replicas, but it does not guarantee a consistent ordering between replicas. Raft elects a relatively long-lived leader which must be involved to propose commands. It heartbeats followers periodically and keeps their logs replicated. In the absence of heartbeats, followers become candidates after randomized election timeouts and proceed to hold new leader elections. Cockroach weights random timeouts such that the replicas with shorter round trip times to peers are more likely to hold elections first (not implemented yet). Only the Raft leader may propose commands; followers will simply relay commands to the last known leader. Our Raft implementation was developed together with CoreOS, but adds an extra layer of optimization to account for the fact that a single Node may have millions of consensus groups (one for each Range). Areas of optimization are chiefly coalesced heartbeats (so that the number of nodes dictates the number of heartbeats as opposed to the much larger number of ranges) and batch processing of requests. Future optimizations may include two-phase elections and quiescent ranges (i.e. stopping traffic completely for inactive ranges). # Range Leases As outlined in the Raft section, the replicas of a Range are organized as a Raft group and execute commands from their shared commit log. Going through Raft is an expensive operation though, and there are tasks which should only be carried out by a single replica at a time (as opposed to all of them). In particular, it is desirable to serve authoritative reads from a single Replica (ideally from more than one, but that is far more difficult). For these reasons, Cockroach introduces the concept of **Range Leases**: This is a lease held for a slice of (database, i.e. hybrid logical) time. A replica establishes itself as owning the lease on a range by committing a special lease acquisition log entry through raft. The log entry contains the replica node's epoch from the node liveness table--a system table containing an epoch and an expiration time for each node. A node is responsible for continuously updating the expiration time for its entry in the liveness table. Once the lease has been committed through raft the replica becomes the lease holder as soon as it applies the lease acquisition command, guaranteeing that when it uses the lease it has already applied all prior writes on the replica and can see them locally. To prevent two nodes from acquiring the lease, the requestor includes a copy of the lease that it believes to be valid at the time it requests the lease. If that lease is still valid when the new lease is applied, it is granted, or another lease is granted in the interim and the requested lease is ignored. A lease can move from node A to node B only after node A's liveness record has expired and its epoch has been incremented. Note: range leases for ranges within the node liveness table keyspace and all ranges that precede it, including meta1 and meta2, are not managed using the above mechanism to prevent circular dependencies. A replica holding a lease at a specific epoch can use the lease as long as the node epoch hasn't changed and the expiration time hasn't passed. The replica holding the lease may satisfy reads locally, without incurring the overhead of going through Raft, and is in charge or involved in handling Range-specific maintenance tasks such as splitting, merging and rebalancing All Reads and writes are generally addressed to the replica holding the lease; if none does, any replica may be addressed, causing it to try to obtain the lease synchronously. Requests received by a non-lease holder (for the HLC timestamp specified in the request's header) fail with an error pointing at the replica's last known lease holder. These requests are retried transparently with the updated lease by the gateway node and never reach the client. ## Colocation with Raft leadership The range lease is completely separate from Raft leadership, and so without further efforts, Raft leadership and the Range lease might not be held by the same Replica. Since it's expensive to not have these two roles colocated (the lease holder has to forward each proposal to the leader, adding costly RPC round-trips), each lease renewal or transfer also attempts to colocate them. In practice, that means that the mismatch is rare and self-corrects quickly. ## Command Execution Flow This subsection describes how a lease holder replica processes a read/write command in more details. Each command specifies (1) a key (or a range of keys) that the command accesses and (2) the ID of a range which the key(s) belongs to. When receiving a command, a node looks up a range by the specified Range ID and checks if the range is still responsible for the supplied keys. If any of the keys do not belong to the range, the node returns an error so that the client will retry and send a request to a correct range. When all the keys belong to the range, the node attempts to process the command. If the command is an inconsistent read-only command, it is processed immediately. If the command is a consistent read or a write, the command is executed when both of the following conditions hold: - The range replica has a range lease. - There are no other running commands whose keys overlap with the submitted command and cause read/write conflict. When the first condition is not met, the replica attempts to acquire a lease or returns an error so that the client will redirect the command to the current lease holder. The second condition guarantees that consistent read/write commands for a given key are sequentially executed. When the above two conditions are met, the lease holder replica processes the command. Consistent reads are processed on the lease holder immediately. Write commands are committed into the Raft log so that every replica will execute the same commands. All commands produce deterministic results so that the range replicas keep consistent states among them. When a write command completes, all the replica updates their response cache to ensure idempotency. When a read command completes, the lease holder replica updates its timestamp cache to keep track of the latest read for a given key. There is a chance that a range lease gets expired while a command is executed. Before executing a command, each replica checks if a replica proposing the command has a still lease. When the lease has been expired, the command will be rejected by the replica. # Splitting / Merging Ranges Nodes split or merge ranges based on whether they exceed maximum or minimum thresholds for capacity or load. Ranges exceeding maximums for either capacity or load are split; ranges below minimums for *both* capacity and load are merged. Ranges maintain the same accounting statistics as accounting key prefixes. These boil down to a time series of data points with minute granularity. Everything from number of bytes to read/write queue sizes. Arbitrary distillations of the accounting stats can be determined as the basis for splitting / merging. Two sensible metrics for use with split/merge are range size in bytes and IOps. A good metric for rebalancing a replica from one node to another would be total read/write queue wait times. These metrics are gossipped, with each range / node passing along relevant metrics if they’re in the bottom or top of the range it’s aware of. A range finding itself exceeding either capacity or load threshold splits. To this end, the range lease holder computes an appropriate split key candidate and issues the split through Raft. In contrast to splitting, merging requires a range to be below the minimum threshold for both capacity *and* load. A range being merged chooses the smaller of the ranges immediately preceding and succeeding it. Splitting, merging, rebalancing and recovering all follow the same basic algorithm for moving data between roach nodes. New target replicas are created and added to the replica set of source range. Then each new replica is brought up to date by either replaying the log in full or copying a snapshot of the source replica data and then replaying the log from the timestamp of the snapshot to catch up fully. Once the new replicas are fully up to date, the range metadata is updated and old, source replica(s) deleted if applicable. **Coordinator** (lease holder replica) ``` if splitting SplitRange(split_key): splits happen locally on range replicas and only after being completed locally, are moved to new target replicas. else if merging Choose new replicas on same servers as target range replicas; add to replica set. else if rebalancing || recovering Choose new replica(s) on least loaded servers; add to replica set. ``` **New Replica** *Bring replica up to date:* ``` if all info can be read from replicated log copy replicated log else snapshot source replica send successive ReadRange requests to source replica referencing snapshot if merging combine ranges on all replicas else if rebalancing || recovering remove old range replica(s) ``` Nodes split ranges when the total data in a range exceeds a configurable maximum threshold. Similarly, ranges are merged when the total data falls below a configurable minimum threshold. **TBD: flesh this out**: Especially for merges (but also rebalancing) we have a range disappearing from the local node; that range needs to disappear gracefully, with a smooth handoff of operation to the new owner of its data. Ranges are rebalanced if a node determines its load or capacity is one of the worst in the cluster based on gossipped load stats. A node with spare capacity is chosen in the same datacenter and a special-case split is done which simply duplicates the data 1:1 and resets the range configuration metadata. # Node Allocation (via Gossip) New nodes must be allocated when a range is split. Instead of requiring every node to know about the status of all or even a large number of peer nodes --or-- alternatively requiring a specialized curator or master with sufficiently global knowledge, we use a gossip protocol to efficiently communicate only interesting information between all of the nodes in the cluster. What’s interesting information? One example would be whether a particular node has a lot of spare capacity. Each node, when gossiping, compares each topic of gossip to its own state. If its own state is somehow “more interesting” than the least interesting item in the topic it’s seen recently, it includes its own state as part of the next gossip session with a peer node. In this way, a node with capacity sufficiently in excess of the mean quickly becomes discovered by the entire cluster. To avoid piling onto outliers, nodes from the high capacity set are selected at random for allocation. The gossip protocol itself contains two primary components: - **Peer Selection**: each node maintains up to N peers with which it regularly communicates. It selects peers with an eye towards maximizing fanout. A peer node which itself communicates with an array of otherwise unknown nodes will be selected over one which communicates with a set containing significant overlap. Each time gossip is initiated, each nodes’ set of peers is exchanged. Each node is then free to incorporate the other’s peers as it sees fit. To avoid any node suffering from excess incoming requests, a node may refuse to answer a gossip exchange. Each node is biased towards answering requests from nodes without significant overlap and refusing requests otherwise. Peers are efficiently selected using a heuristic as described in [Agarwal & Trachtenberg (2006)](https://drive.google.com/file/d/0B9GCVTp_FHJISmFRTThkOEZSM1U/edit?usp=sharing). **TBD**: how to avoid partitions? Need to work out a simulation of the protocol to tune the behavior and see empirically how well it works. - **Gossip Selection**: what to communicate. Gossip is divided into topics. Load characteristics (capacity per disk, cpu load, and state [e.g. draining, ok, failure]) are used to drive node allocation. Range statistics (range read/write load, missing replicas, unavailable ranges) and network topology (inter-rack bandwidth/latency, inter-datacenter bandwidth/latency, subnet outages) are used for determining when to split ranges, when to recover replicas vs. wait for network connectivity, and for debugging / sysops. In all cases, a set of minimums and a set of maximums is propagated; each node applies its own view of the world to augment the values. Each minimum and maximum value is tagged with the reporting node and other accompanying contextual information. Each topic of gossip has its own protobuf to hold the structured data. The number of items of gossip in each topic is limited by a configurable bound. For efficiency, nodes assign each new item of gossip a sequence number and keep track of the highest sequence number each peer node has seen. Each round of gossip communicates only the delta containing new items. # Node and Cluster Metrics Every component of the system is responsible for exporting interesting metrics about itself. These could be histograms, throughput counters, or gauges. These metrics are exported for external monitoring systems (such as Prometheus) via a HTTP endpoint, but CockroachDB also implements an internal timeseries database which is stored in the replicated key-value map. Time series are stored at Store granularity and allow the admin dashboard to efficiently gain visibility into a universe of information at the Cluster, Node or Store level. A [periodic background process](RFCS/20160901_time_series_culling.md) culls older timeseries data, downsampling and eventually discarding it. # Zones Zones provide a method for configuring the replication of portions of the keyspace. Zone values specify a protobuf containing the datacenters from which replicas for ranges which fall under the zone must be chosen. Please see [pkg/config/zone.proto](https://github.com/cockroachdb/cockroach/blob/master/pkg/config/zone.proto) for up-to-date data structures used, the best entry point being `message ZoneConfig`. If zones are modified in situ, each node verifies the existing zones for its ranges against the zone configuration. If it discovers differences, it reconfigures ranges in the same way that it rebalances away from busy nodes, via special-case 1:1 split to a duplicate range comprising the new configuration. # SQL Each node in a cluster can accept SQL client connections. CockroachDB supports the PostgreSQL wire protocol, to enable reuse of native PostgreSQL client drivers. Connections using SSL and authenticated using client certificates are supported and even encouraged over unencrypted (insecure) and password-based connections. Each connection is associated with a SQL session which holds the server-side state of the connection. Over the lifespan of a session the client can send SQL to open/close transactions, issue statements or queries or configure session parameters, much like with any other SQL database. ## Language support CockroachDB also attempts to emulate the flavor of SQL supported by PostgreSQL, although it also diverges in significant ways: - CockroachDB exclusively implements MVCC-based consistency for transactions, and thus only supports SQL's isolation levels SNAPSHOT and SERIALIZABLE. The other traditional SQL isolation levels are internally mapped to either SNAPSHOT or SERIALIZABLE. - CockroachDB implements its own [SQL type system](RFCS/20160203_typing.md) which only supports a limited form of implicit coercions between types compared to PostgreSQL. The rationale is to keep the implementation simple and efficient, capitalizing on the observation that 1) most SQL code in clients is automatically generated with coherent typing already and 2) existing SQL code for other databases will need to be massaged for CockroachDB anyway. ## SQL architecture Client connections over the network are handled in each node by a pgwire server process (goroutine). This handles the stream of incoming commands and sends back responses including query/statement results. The pgwire server also handles pgwire-level prepared statements, binding prepared statements to arguments and looking up prepared statements for execution. Meanwhile the state of a SQL connection is maintained by a Session object and a monolithic `planner` object (one per connection) which coordinates execution between the session, the current SQL transaction state and the underlying KV store. Upon receiving a query/statement (either directly or via an execute command for a previously prepared statement) the pgwire server forwards the SQL text to the `planner` associated with the connection. The SQL code is then transformed into a SQL query plan. The query plan is implemented as a tree of objects which describe the high-level data operations needed to resolve the query, for example "join", "index join", "scan", "group", etc. The query plan objects currently also embed the run-time state needed for the execution of the query plan. Once the SQL query plan is ready, methods on these objects then carry the execution out in the fashion of "generators" in other programming languages: each node *starts* its children nodes and from that point forward each child node serves as a *generator* for a stream of result rows, which the parent node can consume and transform incrementally and present to its own parent node also as a generator. The top-level planner consumes the data produced by the top node of the query plan and returns it to the client via pgwire. ## Data mapping between the SQL model and KV Every SQL table has a primary key in CockroachDB. (If a table is created without one, an implicit primary key is provided automatically.) The table identifier, followed by the value of the primary key for each row, are encoded as the *prefix* of a key in the underlying KV store. Each remaining column or *column family* in the table is then encoded as a value in the underlying KV store, and the column/family identifier is appended as *suffix* to the KV key. For example: - after table `customers` is created in a database `mydb` with a primary key column `name` and normal columns `address` and `URL`, the KV pairs to store the schema would be: | Key | Values | | ---------------------------- | ------ | | `/system/databases/mydb/id` | 51 | | `/system/tables/customer/id` | 42 | | `/system/desc/51/42/address` | 69 | | `/system/desc/51/42/url` | 66 | (The numeric values on the right are chosen arbitrarily for the example; the structure of the schema keys on the left is simplified for the example and subject to change.) Each database/table/column name is mapped to a spontaneously generated identifier, so as to simplify renames. Then for a single row in this table: | Key | Values | | ----------------- | -------------------------------- | | `/51/42/Apple/69` | `1 Infinite Loop, Cupertino, CA` | | `/51/42/Apple/66` | `http://apple.com/` | Each key has the table prefix `/51/42` followed by the primary key prefix `/Apple` followed by the column/family suffix (`/66`, `/69`). The KV value is directly encoded from the SQL value. Efficient storage for the keys is guaranteed by the underlying RocksDB engine by means of prefix compression. Finally, for SQL indexes, the KV key is formed using the SQL value of the indexed columns, and the KV value is the KV key prefix of the rest of the indexed row. ## Distributed SQL Dist-SQL is a new execution framework being developed as of Q3 2016 with the goal of distributing the processing of SQL queries. See the [Distributed SQL RFC](RFCS/20160421_distributed_sql.md) for a detailed design of the subsystem; this section will serve as a summary. Distributing the processing is desirable for multiple reasons: - Remote-side filtering: when querying for a set of rows that match a filtering expression, instead of querying all the keys in certain ranges and processing the filters after receiving the data on the gateway node over the network, we'd like the filtering expression to be processed by the lease holder or remote node, saving on network traffic and related processing. - For statements like `UPDATE .. WHERE` and `DELETE .. WHERE` we want to perform the query and the updates on the node which has the data (as opposed to receiving results at the gateway over the network, and then performing the update or deletion there, which involves additional round-trips). - Parallelize SQL computation: when significant computation is required, we want to distribute it to multiple node, so that it scales with the amount of data involved. This applies to `JOIN`s, aggregation, sorting. The approach we took was originally inspired by [Sawzall](https://cloud.google.com/dataflow/model/programming-model) - a project by Rob Pike et al. at Google that proposes a "shell" (high-level language interpreter) to ease the exploitation of MapReduce. It provides a clear separation between "local" processes which process a limited amount of data and distributed computations, which are abstracted away behind a restricted set of conceptual constructs. To run SQL statements in a distributed fashion, we introduce a couple of concepts: - _logical plan_ - similar on the surface to the `planNode` tree described in the [SQL](#sql) section, it represents the abstract (non-distributed) data flow through computation stages. - _physical plan_ - a physical plan is conceptually a mapping of the _logical plan_ nodes to CockroachDB nodes. Logical plan nodes are replicated and specialized depending on the cluster topology. The components of the physical plan are scheduled and run on the cluster. ## Logical planning The logical plan is made up of _aggregators_. Each _aggregator_ consumes an _input stream_ of rows (or multiple streams for joins) and produces an _output stream_ of rows. Both the input and the output streams have a set schema. The streams are a logical concept and might not map to a single data stream in the actual computation. Aggregators will be potentially distributed when converting the *logical plan* to a *physical plan*; to express what distribution and parallelization is allowed, an aggregator defines a _grouping_ on the data that flows through it, expressing which rows need to be processed on the same node (this mechanism constraints rows matching in a subset of columns to be processed on the same node). This concept is useful for aggregators that need to see some set of rows for producing output - e.g. the SQL aggregation functions. An aggregator with no grouping is a special but important case in which we are not aggregating multiple pieces of data, but we may be filtering, transforming, or reordering individual pieces of data. Special **table reader** aggregators with no inputs are used as data sources; a table reader can be configured to output only certain columns, as needed. A special **final** aggregator with no outputs is used for the results of the query/statement. To reflect the result ordering that a query has to produce, some aggregators (`final`, `limit`) are configured with an **ordering requirement** on the input stream (a list of columns with corresponding ascending/descending requirements). Some aggregators (like `table readers`) can guarantee a certain ordering on their output stream, called an **ordering guarantee**. All aggregators have an associated **ordering characterization** function `ord(input_order) -> output_order` that maps `input_order` (an ordering guarantee on the input stream) into `output_order` (an ordering guarantee for the output stream) - meaning that if the rows in the input stream are ordered according to `input_order`, then the rows in the output stream will be ordered according to `output_order`. The ordering guarantee of the table readers along with the characterization functions can be used to propagate ordering information across the logical plan. When there is a mismatch (an aggregator has an ordering requirement that is not matched by a guarantee), we insert a **sorting aggregator**. ### Types of aggregators - `TABLE READER` is a special aggregator, with no input stream. It's configured with spans of a table or index and the schema that it needs to read. Like every other aggregator, it can be configured with a programmable output filter. - `JOIN` performs a join on two streams, with equality constraints between certain columns. The aggregator is grouped on the columns that are constrained to be equal. - `JOIN READER` performs point-lookups for rows with the keys indicated by the input stream. It can do so by performing (potentially remote) KV reads, or by setting up remote flows. - `SET OPERATION` takes several inputs and performs set arithmetic on them (union, difference). - `AGGREGATOR` is the one that does "aggregation" in the SQL sense. It groups rows and computes an aggregate for each group. The group is configured using the group key. `AGGREGATOR` can be configured with one or more aggregation functions: - `SUM` - `COUNT` - `COUNT DISTINCT` - `DISTINCT` An optional output filter has access to the group key and all the aggregated values (i.e. it can use even values that are not ultimately outputted). - `SORT` sorts the input according to a configurable set of columns. This is a no-grouping aggregator, hence it can be distributed arbitrarily to the data producers. This means that it doesn't produce a global ordering, instead it just guarantees an intra-stream ordering on each physical output streams). The global ordering, when needed, is achieved by an input synchronizer of a grouped processor (such as `LIMIT` or `FINAL`). - `LIMIT` is a single-group aggregator that stops after reading so many input rows. - `FINAL` is a single-group aggregator, scheduled on the gateway, that collects the results of the query. This aggregator will be hooked up to the pgwire connection to the client. ## Physical planning Logical plans are transformed into physical plans in a *physical planning phase*. See the [corresponding section](RFCS/20160421_distributed_sql.md#from-logical-to-physical) of the Distributed SQL RFC for details. To summarize, each aggregator is planned as one or more *processors*, which we distribute starting from the data layout - `TABLE READER`s have multiple instances, split according to the ranges - each instance is planned on the lease holder of the relevant range. From that point on, subsequent processors are generally either colocated with their inputs, or planned as singletons, usually on the final destination node. ### Processors When turning a _logical plan_ into a _physical plan_, its nodes are turned into _processors_. Processors are generally made up of three components: ![Processor](RFCS/images/distributed_sql_processor.png?raw=true "Processor") 1. The *input synchronizer* merges the input streams into a single stream of data. Types: * single-input (pass-through) * unsynchronized: passes rows from all input streams, arbitrarily interleaved. * ordered: the input physical streams have an ordering guarantee (namely the guarantee of the corresponding logical stream); the synchronizer is careful to interleave the streams so that the merged stream has the same guarantee. 2. The *data processor* core implements the data transformation or aggregation logic (and in some cases performs KV operations). 3. The *output router* splits the data processor's output to multiple streams; types: * single-output (pass-through) * mirror: every row is sent to all output streams * hashing: each row goes to a single output stream, chosen according to a hash function applied on certain elements of the data tuples. * by range: the router is configured with range information (relating to a certain table) and is able to send rows to the nodes that are lease holders for the respective ranges (useful for `JoinReader` nodes (taking index values to the node responsible for the PK) and `INSERT` (taking new rows to their lease holder-to-be)). To illustrate with an example from the Distributed SQL RFC, the query: ``` TABLE Orders (OId INT PRIMARY KEY, CId INT, Value DECIMAL, Date DATE) SELECT CID, SUM(VALUE) FROM Orders WHERE DATE > 2015 GROUP BY CID ORDER BY 1 - SUM(Value) ``` produces the following logical plan: ![Logical plan](RFCS/images/distributed_sql_logical_plan.png?raw=true "Logical Plan") This logical plan above could be transformed into either one of the following physical plans: ![Physical plan](RFCS/images/distributed_sql_physical_plan.png?raw=true "Physical Plan") or ![Alternate physical plan](RFCS/images/distributed_sql_physical_plan_2.png?raw=true "Alternate physical Plan") ## Execution infrastructure Once a physical plan has been generated, the system needs to divvy it up between the nodes and send it around for execution. Each node is responsible for locally scheduling data processors and input synchronizers. Nodes also communicate with each other for connecting output routers to input synchronizers through a streaming interface. ### Creating a local plan: the `ScheduleFlows` RPC Distributed execution starts with the gateway making a request to every node that's supposed to execute part of the plan asking the node to schedule the sub-plan(s) it's responsible for (except for "on-the-fly" flows, see design doc). A node might be responsible for multiple disparate pieces of the overall DAG - let's call each of them a *flow*. A flow is described by the sequence of physical plan nodes in it, the connections between them (input synchronizers, output routers) plus identifiers for the input streams of the top node in the plan and the output streams of the (possibly multiple) bottom nodes. A node might be responsible for multiple heterogeneous flows. More commonly, when a node is the lease holder for multiple ranges from the same table involved in the query, it will run a `TableReader` configured with all the spans to be read across all the ranges local to the node. A node therefore implements a `ScheduleFlows` RPC which takes a set of flows, sets up the input and output [mailboxes](#mailboxes), creates the local processors and starts their execution. ### Local scheduling of flows The simplest way to schedule the different processors locally on a node is concurrently: each data processor, synchronizer and router runs as a goroutine, with channels between them. The channels are buffered to synchronize producers and consumers to a controllable degree. ### Mailboxes Flows on different nodes communicate with each other over gRPC streams. To allow the producer and the consumer to start at different times, `ScheduleFlows` creates named mailboxes for all the input and output streams. These message boxes will hold some number of tuples in an internal queue until a gRPC stream is established for transporting them. From that moment on, gRPC flow control is used to synchronize the producer and consumer. A gRPC stream is established by the consumer using the `StreamMailbox` RPC, taking a mailbox id (the same one that's been already used in the flows passed to `ScheduleFlows`). A diagram of a simple query using mailboxes for its execution: ![Mailboxes](RFCS/images/distributed_sql_mailboxes.png?raw=true) ## A complex example: Daily Promotion To give a visual intuition of all the concepts presented, we draw the physical plan of a relatively involved query. The point of the query is to help with a promotion that goes out daily, targeting customers that have spent over $1000 in the last year. We'll insert into the `DailyPromotion` table rows representing each such customer and the sum of her recent orders. ```SQL TABLE DailyPromotion ( Email TEXT, Name TEXT, OrderCount INT ) TABLE Customers ( CustomerID INT PRIMARY KEY, Email TEXT, Name TEXT ) TABLE Orders ( CustomerID INT, Date DATETIME, Value INT, PRIMARY KEY (CustomerID, Date), INDEX date (Date) ) INSERT INTO DailyPromotion (SELECT c.Email, c.Name, os.OrderCount FROM Customers AS c INNER JOIN (SELECT CustomerID, COUNT(*) as OrderCount FROM Orders WHERE Date >= '2015-01-01' GROUP BY CustomerID HAVING SUM(Value) >= 1000) AS os ON c.CustomerID = os.CustomerID) ``` A possible physical plan: ![Physical plan](RFCS/images/distributed_sql_daily_promotion_physical_plan.png?raw=true)