Kinetica Spark Guide

The following guide provides step by step instructions to get started using Spark with Kinetica.

Spark via JDBC

SQL can be issued against Kinetica through the Spark JDBC interface. This not only allows for data ingest/egress, but provides access to native Kinetica functions in queries, including geospatial operations.

See Connecting via JDBC for obtaining the JDBC driver.

Note

JDBC queries will not be partitioned, as they were when using the Egress Processor of the Legacy Spark Connector.


Spark Egress

The following example shows how to execute queries against Kinetica. It will use JDBC as the read format, require the Kinetica JDBC driver to be accessible and load it, and allow the specified query to run. The result of the query will be loaded into a DataFrame and the schema and result set will be output to the console.

This example makes use of the NYC taxi trip table, which can be loaded using GAdmin from the Demo Data page, under Cluster > Demo.

  1. Launch Spark Shell:

    1
    
    $ spark-shell --jars kinetica-jdbc-*-fullshaded.jar
    
  2. Configure JDBC for source database and specify query for map key dbtable; be sure to provide an appropriate value for url for the target system, as well as username & password, if the database is configured to require authentication.

    Note

    If connecting over SSL, see JDBC Secure Connections for the modified URL to use.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    
    var url = s"jdbc:kinetica:URL=http://<db.host>:9191;CombinePrepareAndExecute=1[[;<parameter>=<value>]*]"
    val username = ""
    val password = ""
    val options = Map(
       "url" -> url,
       "driver" -> "com.kinetica.jdbc.Driver",
       "UID" -> username,
       "PWD" -> password,
       "dbtable" -> s"""(
          SELECT
             vendor_id,
             DECIMAL(MIN(geo_miles)) AS min_geo_miles,
             DECIMAL(AVG(geo_miles)) AS avg_geo_miles,
             DECIMAL(MAX(geo_miles)) AS max_geo_miles
          FROM
          (
             SELECT
                vendor_id,
                DECIMAL(GEODIST(pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude) * 0.000621371) AS geo_miles
             FROM demo.nyctaxi
          )
          WHERE geo_miles BETWEEN .01 AND 100
          GROUP BY vendor_id
       )"""
    )
    
  3. Read queried data from Kinetica into DataFrame:

    1
    
    val df = spark.read.format("jdbc").options(options).load()
    
  4. Output DataFrame schema for query:

    1
    
    df.printSchema
    
  5. Verify output:

    root
     |-- vendor_id: string (nullable = true)
     |-- min_geo_miles: decimal(18,4) (nullable = true)
     |-- avg_geo_miles: decimal(18,4) (nullable = true)
     |-- max_geo_miles: decimal(18,4) (nullable = true)
    
  6. Output query result set:

    1
    
    df.orderBy("vendor_id").show
    
  7. Verify output:

    +---------+-------------+-------------+-------------+
    |vendor_id|min_geo_miles|avg_geo_miles|max_geo_miles|
    +---------+-------------+-------------+-------------+
    |      CMT|       0.0100|       2.0952|      80.8669|
    |      DDS|       0.0148|       2.7350|      64.2944|
    |      NYC|       0.0101|       2.1548|      36.9236|
    |      VTS|       0.0100|       2.0584|      94.5213|
    |     YCAB|       0.0100|       2.1049|      36.0565|
    +---------+-------------+-------------+-------------+
    

Spark Ingest

The following example shows how to ingest data into Kinetica. It will use JDBC as the write format, require the Kinetica JDBC driver to be accessible and load it, and ingest the given DataFrame into a new table.

This example makes use of the DataFrame populated in the Spark Egress section, referencing it as df.

  1. Run the Spark Egress example.

  2. Update the options map, specifying the table to ingest into for map key dbtable.

    1
    2
    3
    4
    5
    6
    7
    
    val options = Map(
       "url" -> url,
       "driver" -> "com.kinetica.jdbc.Driver",
       "UID" -> username,
       "PWD" -> password,
       "dbtable" -> "demo.nyctaxi_copy"
    )
    
  3. Write data from DataFrame into Kinetica:

    1
    
    df.write.format("jdbc").options(options).mode("append").save()
    
  4. Verify the ingestion into the demo.nyctaxi_copy table using GAdmin or Workbench.


Spark Logging

Logging of Kinetica JDBC operations can be configured via the JDBC URL, by adding a LogLevel parameter to the end of the URL. Valid log levels can be found under JDBC Client Parameters.

For instance, to enable DEBUG logging:

jdbc:kinetica:URL=http://kineticahost:9191;CombinePrepareAndExecute=1;RowsPerFetch=20000;LogLevel=5

To enable both Scala and JDBC driver DEBUG logging, without having to modify the JDBC URL:

sc.setLogLevel("DEBUG")

Mapping Spark to Kinetica

Some Spark data types and functions may need custom mappings to Kinetica.

The following dialect snippet is a custom mapping, which maps:

  • Spark's CLOB/VARCHAR types to the Kinetica VARCHAR type
  • Spark's BLOB type to the Kinetica BLOB type
  • Spark's boolean type to the Kinetica TINYINT type
  • The truncate command (which does DROP/CREATE, by default) to Kinetica's TRUNCATE TABLE command
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcDialect, JdbcType}
import java.sql.Types
import java.util.Locale

import org.apache.spark.sql.types._

val KineticaDialect = new JdbcDialect {
    override def canHandle(url: String): Boolean =
        url.toLowerCase(Locale.ROOT).startsWith("jdbc:kinetica")
    override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
        case StringType => Some(JdbcType("TEXT", java.sql.Types.VARCHAR))
        case BinaryType => Some(JdbcType("VARBINARY", java.sql.Types.VARBINARY))
        case BooleanType => Option(JdbcType("TINYINT", java.sql.Types.TINYINT))
        case _ => None
    }
    override def isCascadingTruncateTable(): Option[Boolean] = Some(false)
    override def getTruncateQuery(
        table: String,
        cascade: Option[Boolean] = isCascadingTruncateTable
    ): String = { s"TRUNCATE TABLE $table" }
}
JdbcDialects.registerDialect(KineticaDialect)

After running the application code, the dialect can be unregistered as follows:

1
JdbcDialects.unregisterDialect(KineticaDialect)