How to Load Data into SnappyData Tables
SnappyData relies on the Spark SQL Data Sources API to parallelly load data from a wide variety of sources. By integrating the loading mechanism with the Query engine (Catalyst optimizer) it is often possible to push down filters and projections all the way to the data source minimizing data transfer. Here is the list of important features:
Support for many Sources There is built-in support for many data sources as well as data formats. Data can be accessed from S3, file system, HDFS, Hive, RDB, etc. And the loaders have built-in support to handle CSV, Parquet, ORC, Avro, JSON, Java/Scala Objects, etc as the data formats.
Access virtually any modern data store Virtually all major data providers have a native Spark connector that complies with the Data Sources API. For e.g. you can load data from any RDB like Amazon Redshift, Cassandra, Redis, Elastic Search, Neo4J, etc. While these connectors are not built-in, you can easily deploy these connectors as dependencies into a SnappyData cluster. All the connectors are typically registered in spark-packages.org
Avoid Schema wrangling Spark supports schema inference. Which means, all you need to do is point to the external source in your 'create table' DDL (or Spark SQL API) and schema definition is learned by reading in the data. There is no need to explicitly define each column and type. This is extremely useful when dealing with disparate, complex and wide data sets.
Read nested, sparse data sets When data is accessed from a source, the schema inference occurs by not just reading a header but often by reading the entire data set. For instance, when reading JSON files the structure could change from document to document. The inference engine builds up the schema as it reads each record and keeps unioning them to create a unified schema. This approach allows developers to become very productive with disparate data sets.
Load using Spark API or SQL You can use SQL to point to any data source or use the native Spark Scala/Java API to load. For instance, you can first create an external table.
CREATE EXTERNAL TABLE <tablename> USING <any-data-source-supported> OPTIONS <options>
Next, use it in any SQL query or DDL. For example,
CREATE EXTERNAL TABLE STAGING_CUSTOMER USING parquet OPTIONS(path 'quickstart/src/main/resources/customerparquet') CREATE TABLE CUSTOMER USING column OPTIONS(buckets '8') AS ( SELECT * FROM STAGING_CUSTOMER)
Example - Load from CSV
You can either explicitly define the schema or infer the schema and the column data types. To infer the column names, we need the CSV header to specify the names. In this example we don't have the names, so we explicitly define the schema.
// Get a SnappySession in a local cluster val spark: SparkSession = SparkSession .builder .appName("CreateColumnTable") .master("local[*]") .getOrCreate val snSession = new SnappySession(spark.sparkContext)
We explicitly define the table definition first ....
snSession.sql("CREATE TABLE CUSTOMER ( " + "C_CUSTKEY INTEGER NOT NULL," + "C_NAME VARCHAR(25) NOT NULL," + "C_ADDRESS VARCHAR(40) NOT NULL," + "C_NATIONKEY INTEGER NOT NULL," + "C_PHONE VARCHAR(15) NOT NULL," + "C_ACCTBAL DECIMAL(15,2) NOT NULL," + "C_MKTSEGMENT VARCHAR(10) NOT NULL," + "C_COMMENT VARCHAR(117) NOT NULL)" + "USING COLUMN OPTIONS (PARTITION_BY 'C_CUSTKEY')")
Load data in the CUSTOMER table from a CSV file by using Data Sources API
val tableSchema = snSession.table("CUSTOMER").schema val customerDF = snSession.read.schema(schema = tableSchema).csv(s"$dataFolder/customer.csv") customerDF.write.insertInto("CUSTOMER")
The Spark SQL programming guide provides a full description of the Data Sources API
Example - Load from Parquet files
val customerDF = snSession.read.parquet(s"$dataDir/customer_parquet") customerDF.write.insertInto("CUSTOMER")
Inferring schema from data file
A schema for the table can be inferred from the data file. Data is first introspected to learn the schema (column names and types) without requring this input from the user. The example below illustrates reading a parquet data source and creates a new columnar table in SnappyData. The schema is automatically defined when the Parquet data files are read.
val customerDF = snSession.read.parquet(s"quickstart/src/main/resources/customerparquet") // props1 map specifies the properties for the table to be created // "PARTITION_BY" attribute specifies partitioning key for CUSTOMER table(C_CUSTKEY) val props1 = Map("PARTITION_BY" -> "C_CUSTKEY") customerDF.write.format("column").mode("append").options(props1).saveAsTable("CUSTOMER")
In the code snippet below a schema is inferred from a CSV file. Column names are derived from the header in the file.
val customer_csv_DF = snSession.read.option("header", "true") .option("inferSchema", "true").csv("quickstart/src/main/resources/customer_with_headers.csv") // props1 map specifies the properties for the table to be created // "PARTITION_BY" attribute specifies partitioning key for CUSTOMER table(C_CUSTKEY), // For complete list of attributes refer the documentation val props1 = Map("PARTITION_BY" -> "C_CUSTKEY") customer_csv_DF.write.format("column").mode("append").options(props1).saveAsTable("CUSTOMER")
The source code to load the data from a CSV/Parquet files is in CreateColumnTable.scala.
Example - reading JSON documents As mentioned before when dealing with JSON you have two challenges - (1) the data can be highly nested (2) the structure of the documents can keep changing.
Here is a simple example that loads multiple JSON records that show dealing with schema changes across documents - WorkingWithJson.scala
When loading data from sources like CSV or Parquet the files would need to be accessible from all the cluster members in SnappyData. Make sure it is NFS mounted or made accessible through the Cloud solution (shared storage like S3).