SnappyData Spark Extension API Reference Guide

This guide gives details of Spark extension APIs that are provided by SnappyData. The following APIs are included:

SnappySession APIs DataFrameWriter APIs SnappySessionCatalog APIs
sql
Query Using Cached Plan
putInto
Put Dataframe Content into Table
getKeyColumns
Get Key Columns of SnappyData table
sqlUncached
Query Using Fresh Plan
deleteFrom
Delete DataFrame Content from Table
getKeyColumnsAndPositions
Gets primary key or key columns with their position in the table.
createTable
Create SnappyData Managed Table
createTable
Create SnappyData Managed JDBC Table
truncateTable
Empty Contents of Table
dropTable
Drop SnappyData Table
createSampleTable
Create Stratified Sample Table
createApproxTSTopK
Create Structure to Query Top-K
setCurrentSchema
Set Current Database/schema
getCurrentSchema
Get Current Schema of Session
insert
Insert Row into an Existing Table
put
Upsert Row into an Existing Table
update
Update all Rows in Table
delete
Delete all Rows in Table
queryApproxTSTopK
Fetch the TopK Entries

SnappySession APIs

The following APIs are available for SnappySession.

sql

You can use this API to run a query with a cached plan for a given SQL.

Syntax

sql(sqlText : String)

Parameters

Parameter Description
sqlText The SQL string required to execute.
Returns Dataframe

Example

snappySession.sql(“select * from t1”)

sqlUncached

You can use this API to run a query using a fresh plan for a given SQL String.

Syntax

sqlUncached(sqlText : String)

Parameters

Parameter Description
sqlText The SQL string required to execute.
Returns Dataframe

Example

snappySession.sqlUncached(“select * from t1”)

createTable

Creates a SnappyData managed table. Any relation providers, that is the row, column etc., which are supported by SnappyData can be created here.

Syntax

createTable(
      tableName: String,
      provider: String,
      schema: StructType,
      options: Map[String, String],
      allowExisting: Boolean)

Parameters

Parameter Description
tableName Name of the table.
provider Provider name such as ‘ROW’, ‘COLUMN’' etc.
schema The table schema.
options Properties for table creation. For example, partition_by, buckets etc.
allowExisting When set to true, tables with the same name are ignored, else an AnalysisException is thrown stating that the table already exists.
Returns Dataframe

Example

case class Data(col1: Int, col2: Int, col3: Int)
val props = Map.empty[String, String]
val data = Seq(Seq(1, 2, 3), Seq(7, 8, 9), Seq(9, 2, 3), Seq(4, 2, 3), Seq(5, 6, 7))
val rdd = sc.parallelize(data, data.length).map(s => new Data(s(0), s(1), s(2)))
val dataDF = snappySession.createDataFrame(rdd)

snappySession.createTable(tableName, "column", dataDF.schema, props)

createTable

Creates a SnappyData managed JDBC table which takes a free format DDL string. The DDL string should adhere to the syntax of the underlying JDBC store. SnappyData ships with an inbuilt JDBC store, which can be accessed by the data store of Row format. The options parameter can take connection details.

Syntax

Syntax: 
  createTable(
      tableName: String,
      provider: String,
      schemaDDL: String,
      options: Map[String, String],
      allowExisting: Boolean)

Parameters

Parameter Description
tableName Name of the table.
provider Provider name such as ‘ROW’, ‘COLUMN’' etc.
schemaDDL The table schema as a string interpreted by the provider.
options Properties for table creation. For example, partition_by, buckets etc.
allowExisting

Example

   val props = Map(
      "url" -> s"jdbc:derby:$path",
      "driver" -> "org.apache.derby.jdbc.EmbeddedDriver",
      "poolImpl" -> "tomcat",
      "user" -> "app",
       "password" -> "app"
       )

    val schemaDDL = "(OrderId INT NOT NULL PRIMARY KEY,ItemId INT, ITEMREF INT)"
    snappySession.createTable("jdbcTable", "jdbc", schemaDDL, props)

truncateTable

Empties the contents of the table without deleting the catalog entry.

Syntax

truncateTable(tableName: String, ifExists: Boolean = false)

Parameters

Parameter Description
tableName Name of the table.
ifExists Attempt truncate only if the table exists.
Returns Dataframe

Example

snappySession.truncateTable(“t1”, true)

dropTable

Drop a SnappyData table created by a call to SnappySession.createTable, Catalog.createExternalTable or Dataset.createOrReplaceTempView.

Syntax

dropTable(tableName: String, ifExists: Boolean = false)

Parameters

Parameter Description
tableName Name of the table.
ifExists Attempts drop only if the table exists.
Returns Unit

Example

snappySession.dropTable(“t1”, true)

createSampleTable

Creates a stratified sample table.

!!! Note This API is not supported in the Smart Connector mode.

Syntax

createSampleTable(tableName: String,
      baseTable: Option[String],
      samplingOptions: Map[String, String],
      allowExisting: Boolean)


Parameters

Parameter Description
tableName The qualified name of the table.
baseTable The base table of the sample table, if any.
samplingOptions sampling options such as QCS, reservoir size etc.
allowExisting When set to true, tables with the same name are ignored, else a table exist exception is shown.
Returns Dataframe

Example

snappySession.createSampleTable("airline_sample",   Some("airline"), Map("qcs" -> "UniqueCarrier ,Year_ ,Month_",  "fraction" -> "0.05",  "strataReservoirSize" -> "25", "buckets" -> "57"),
 allowExisting = false)

createApproxTSTopK

Creates an approximate structure to query top-K with time series support.

!!! Note This API is not supported in the Smart Connector mode.

Syntax

createApproxTSTopK(topKName: String, baseTable: Option[String],  keyColumnName: String, inputDataSchema: StructType,       topkOptions: Map[String, String], allowExisting: Boolean = false)

Parameters

Parameter Description
topKName The qualified name of the top-K structure.
baseTable The base table of the top-K structure, if any.
keyColumnName
inputDataSchema
topkOptions
allowExisting When set to true, tables with the same name are ignored, else a table exist exception is shown.
Returns Dataframe

Example

snappySession.createApproxTSTopK("topktable", Some("hashtagTable"), "hashtag", schema, topKOption)

setCurrentSchema

Sets the current database/schema.

Syntax

schema

Parameters

Parameter Description
schemaName schema name which goes into the catalog.
Returns Unit

Example

snappySession.setSchema(“APP”)

getCurrentSchema

Gets the current schema of the session.

Syntax

getCurrentSchema

Example

snappySession.getCurrentSchema

Returns

String

insert

Inserts one or more row into an existing table.

Syntax

insert(tableName: String, rows: Row*)


Parameters

Parameter Description
tableName Table name for the insert operation.
Rows List of rows to be inserted into the table.
Returns Int

Example

val row = Row(i, i, i)
snappySession.insert("t1", row)

put

Upserts one or more row into an existing table. Only works for row tables.

Syntax

put(tableName: String, rows: Row*)

Parameters

Parameter Description
tableName Table name for the put operation
rows List of rows to be put into the table.
Returns Int

Example

snappySession.put(tableName, dataDF.collect(): _*)

update

Updates all the rows in the table that match passed filter expression. This works only for row tables.

Syntax

update(tableName: String, filterExpr: String, newColumnValues: Row,  updateColumns: String*)

Parameters

Parameter Description
tableName Th table name which needs to be updated.
filterExpr SQL WHERE criteria to select rows that will be updated.
newColumnValues A single row containing all the updated column values. They MUST match the updateColumn: list passed.
updateColumns List of all column names that are updated.
Returns Int

Example

snappySession.update("t1", "ITEMREF = 3" , Row(99) , "ITEMREF" )

delete

Deletes all the rows in the table that match passed filter expression. This works only for row tables.

Syntax

delete(tableName: String, filterExpr: String)

Parameters

Parameter Description
tableName Name of the table.
filterExpr SQL WHERE criteria to select rows that will be updated.
Returns Int

Example

snappySession.delete(“t1”, s"col1=$i"))

queryApproxTSTopK

Fetches the topK entries in the Approx TopK synopsis for the specified time interval. The time interval specified here should not be less than the minimum time interval used when creating the TopK synopsis.

!!! Note This API is not supported in the Smart Connector mode.

Syntax

queryApproxTSTopK(topKName: String,
      startTime: String = null, endTime: String = null,
      k: Int = -1)

Parameters

Parameter Description
topKName The topK structure that is to be queried.
startTime Start time as string in the format yyyy-mm-dd hh:mm:ss. If passed as null, the oldest interval is considered as the start interval.
endTime End time as string in the format yyyy-mm-dd hh:mm:ss. If passed as null, the newest interval is considered as the last interval.
k Optional. The number of elements to be queried. This is to be passed only for stream summary
Returns Dataframe

Example

snappySession.queryApproxTSTopK("topktable")

DataFrameWriter APIs

The following APIs are available for DataFrameWriter:

putInto

Puts the content of the DataFrame into the specified table. It requires that the schema of the DataFrame is the same as the schema of the table. Column names are ignored while matching the schemas and put into operation is performed using position based resolution. If some rows are already present in the table, then they are updated. Also, the table on which putInto is implemented should have defined key columns, if its a column table. If it is a row table, then it should have defined a primary key.

Syntax

putInto(tableName: String)

Parameters

Parameter Description
tableName Name of the table.
Returns Unit

Example

import org.apache.spark.sql.snappy._
df.write.putInto(“snappy_table”)

deleteFrom

The deleteFrom API deletes all those records from given snappy table which exists in the input Dataframe. Existence of the record is checked by comparing the key columns (or the primary keys) values.

To use this API, key columns(for column table) or primary keys(for row tables) must be defined in the SnappyData table.

Also, the source DataFrame must contain all the key columns or primary keys (depending upon the type of snappy table). The column existence is checked using a case-insensitive match of column names. If the source DataFrame contains columns other than the key columns, it will be ignored by the deleteFrom API.

Syntax

deleteFrom(tableName: String)

Parameters

Parameter Description
tableName Name of the table.
Returns Unit

Example

import org.apache.spark.sql.snappy._
df.write.deleteFrom(“snappy_table”)

SnappySessionCatalog APIs

The following APIs are available for SnappySessionCatalog:

Note

These are developer APIs and are subject to change in the future.

getKeyColumns

Gets primary key or key columns of a SnappyData table.

Syntax

getKeyColumns(tableName: String)

Parameters

Parameter Description
tableName Name of the table.
Returns Sequence of key columns (for column tables) or sequence of primary keys (for row tables).
Example
snappySession.sessionCatalog.getKeyColumns("t1")

getKeyColumnsAndPositions

Gets primary key or key columns of a SnappyData table along with their position in the table.

Syntax

getKeyColumnsAndPositions(tableName: String)

Parameters

Parameter Description
tableName Name of the table.
Returns Sequence of scala.Tuple2 containing column and column's position in the table for each key columns (for column tables) or sequence of primary keys (for row tables).

Example

snappySession.sessionCatalog.getKeyColumnsAndPositions("t1")