Optimizing Query Latency: Partitioning and Replication Strategies
The following topics are covered in this section:
Using Column vs Row Table
A columnar table data is stored in a sequence of columns, whereas, in a row table it stores table records in a sequence of rows.
Using Column Tables
Analytical Queries: A column table has distinct advantages for OLAP queries and therefore large tables involved in such queries are recommended to be created as columnar tables. These tables are rarely mutated (deleted/updated). For a given query on a column table, only the required columns are read (since only the required subset columns are to be scanned), which gives a better scan performance. Thus, aggregation queries execute faster on a column table compared to a row table.
Compression of Data: Another advantage that the column table offers is it allows highly efficient compression of data which reduces the total storage footprint for large tables.
Column tables are not suitable for OLTP scenarios. In this case, row tables are recommended.
Using Row Tables
OLTP Queries: Row tables are designed to return the entire row efficiently and are suited for OLTP scenarios when the tables are required to be mutated frequently (when the table rows need to be updated/deleted based on some conditions). In these cases, row tables offer distinct advantages over the column tables.
Point queries: Row tables are also suitable for point queries (for example, queries that select only a few records based on certain where clause conditions).
Small Dimension Tables: Row tables are also suitable to create small dimension tables as these can be created as replicated tables (table data replicated on all data servers).
Create Index: Row tables also allow the creation of an index on certain columns of the table which improves performance.
Using Partitioned vs Replicated Row Table
In SnappyData, row tables can be either partitioned across all servers or replicated on every server. For row tables, large fact tables should be partitioned whereas, dimension tables can be replicated.
The SnappyData architecture encourages you to denormalize “dimension” tables into fact tables when possible, and then replicate remaining dimension tables to all datastores in the distributed system.
Most databases follow the star schema design pattern where large “fact” tables store key information about the events in a system or a business process. For example, a fact table would store rows for events like product sales or bank transactions. Each fact table generally has foreign key relationships to multiple “dimension” tables, which describe further aspects of each row in the fact table.
When designing a database schema for SnappyData, the main goal with a typical star schema database is to partition the entities in fact tables. Slow-changing dimension tables should then be replicated on each data store that hosts a partitioned fact table. In this way, a join between the fact table and any number of its dimension tables can be executed concurrently on each partition, without requiring multiple network hops to other members in the distributed system.
Applying Partitioning Scheme
In SQL, a JOIN clause is used to combine data from two or more tables, based on a related column between them. JOINS have traditionally been expensive in distributed systems because the data for the tables involved in the JOIN may reside on different physical nodes and the operation has to first move/shuffle the relevant data to one node and perform the operation. SnappyData offers a way to declaratively "co-locate" tables to prevent or reduce shuffling to execute JOINS. When two tables are partitioned on columns and colocated, it forces partitions having the same values for those columns in both tables to be located on the same SnappyData member. Therefore, in a join query, the join operation is performed on each node's local data. Eliminating data shuffling improves performance significantly. For examples refer to, How to colocate tables for doing a colocated join.
A bucket is the smallest unit of in-memory storage for SnappyData tables. Data in a table is distributed evenly across all the buckets. For more information on BUCKETS, refer to BUCKETS.
The default number of buckets in SnappyData cluster mode is 128. In the local mode it is cores*2, subject to maximum of 64 buckets and minumum of 8 buckets.
The number of buckets has an impact on query performance, storage density, and ability to scale the system as data volumes grow.
When a new server joins or an existing server leaves the cluster, buckets are moved around in order to ensure that data is balanced across the nodes where the table is defined.
The -rebalance option on the startup command-line triggers bucket rebalancing and the new server becomes the primary for some of the buckets (and secondary for some if REDUNDANCY>0 has been specified). You can also set the system procedure call sys.rebalance_all_buckets() to trigger rebalance.
Criteria for Column Partitioning
It is recommended to use a relevant dimension for partitioning so that all partitions are active and the query is executed concurrently. If only a single partition is active and is used largely by queries (especially concurrent queries) it means a significant bottleneck where only a single partition is active all the time, while others are idle. This serializes execution into a single thread handling that partition. Therefore, it is not recommended to use DATE/TIMESTAMP as partitioning.
REDUNDANCY clause of CREATE TABLE specifies the number of secondary copies you want to maintain for your partitioned table. This allows the table data to be highly available even if one of the SnappyData members fails or shuts down.
A REDUNDANCY value of 1 is recommended to maintain a secondary copy of the table data. A large value for REDUNDANCY clause has an adverse impact on performance, network usage, and memory usage.
For an example on the REDUNDANCY clause refer to Tables in SnappyData.
In SnappyData, row and column tables by default overflow to disk (which is equivalent to setting OVERFLOW to 'true'), based on EVICTION_BY criteria. Users cannot set OVERFLOW to 'false', except when EVICTION_BY is not set, in which case it disables the eviction.
For example, setting EVICTION_BY to
LRUHEAPPERCENT allows table data to be evicted to disk based on the current memory consumption of the server.
By default eviction is set to
Change NOT IN queries to use NOT EXISTS if possible
NOT IN queries use an unoptimized plan and lead to a nested-loop-join which can take long if both sides are large (https://issues.apache.org/jira/browse/SPARK-16951). Change your queries to use
NOT EXISTS which uses an optimized anti-join plan.
For example a query like:
select count(*) from T1 where id not in (select id from T2)
can be changed to:
select count(*) from T1 where not exists (select 1 from T2 where T1.id = T2.id)
Be aware of the different null value semantics of the two operators as noted here for Spark.
In a nutshell, the
NOT IN operator is null-aware and skips the row if the sub-query has a null value, while the
NOT EXISTS operator ignores the null values in the sub-query. In other words, the following two are equivalent when dealing with null values:
select count(*) from T1 where id not in (select id from T2 where id is not null) select count(*) from T1 where not exists (select 1 from T2 where T1.id = T2.id)