Version:

Optimizing I/O

Since Kinetica is designed to work with big data sets and many-node setups, a multi-head input (multi-head ingest) and output (multi-head egress) mechanism are provided for fast data ingestion and fast record retrieval. Most interactions are done through the single head node, and from there, parcelled out to the rest of the cluster. However, multi-head operations allow insertion and retrieval of data through transactions with cluster nodes directly, bypassing the head node and improving performance.

API Support:

Language Multi-head Egress Mechanism Multi-head Ingest Mechanism
C++ X GPUdbIngestor
C# X KineticaIngestor
Java RecordRetriever BulkInserter
Javascript X X
Node.js X X
Python RecordRetriever GPUdbIngestor
REST X X

In order for the cluster nodes to send and receive data directly, the configuration on each node needs to be updated to allow the incoming HTTP requests that carry the data. The /opt/gpudb/core/etc/gpudb.conf file needs to have the following property set for both multi-head ingest and multi-head egress to work properly:

# Enable worker HTTP servers, each process runs its own server for direct ingest.
enable_worker_http_servers = true

Both the HA and HTTPD configurations will be taken into account, as well as any public URLs (which override all other settings) defined in /opt/gpudb/core/etc/gpudb.conf when returning multi-head URLs for the worker nodes. Unless you plan on using the Python background multi-head process, the multi-head ingest and egress objects require a worker list to distribute the workload, with one entry in the list for each node/process. This list can be autopopulated simply by using a GPUdb connection object, which will retrieve the list of available cluster nodes from the database itself. Below is a code snippet (in Java) showing an automatically populated worker list:

GPUdb gpudb = new GPUdb("http://localhost:9191");
WorkerList workers = new WorkerList(gpudb);
bulkInserter = new BulkInserter<T>(gpudb, tableName, type, bulkThreshold, null, workers);

Note that in some cases, workers may be configured to use more than one IP address, not all of which may be accessible to the client; the worker list constructor uses the first IP returned by the server for each worker. In cases where workers may use more than one IP address and public URLs are not configured, you can use an ipRegex or ipPrefix to match the correct worker IP addresses:

// Java Worker List

WorkerList(GPUdb gpudb, Pattern ipRegex);
// or
WorkerList(GPUdb gpudb, String ipPrefix);

Multi-Head Egress

Before data is read, a RecordRetriever object can be invoked to allow multiple nodes to help serve up the data. This greatly increases the speed of retrieval by parallelizing the output of data and spreading the network traffic across multiple nodes. All the functionality for this is encapsulated into the RecordRetriever object.

The RecordRetriever looks up the given shard key value in the table associated with the RecordRetriever object, filtering out any records that don't match the optional expression. Note that multi-head egress only allows shard key value lookups--it does not support arbitrary queries against tables. Note also that multi-head egress currently does not support retrieving records using a null lookup value or expression functions.

Important

The shard key and any columns used in the expression must be indexed for multi-head egress to work properly. There is an implicit index for a primary key, so an explicit column index is not required when the primary key is the shard key.

Examples

The following is a Python API code block that demonstrates the use of the background RecordRetriever for retrieving data.

h_db = gpudb.GPUdb(encoding = 'BINARY', host = '127.0.0.1', port = '9191')

table = gpudb.GPUdbTable(None, 'stocks', db = h_db, use_multihead_io = True)

# Ensure a column index exists on the sharded column
if 'Symbol' not in table.show_table()['additional_info'][0]['attribute_indexes']:
    table.alter_table(action = 'create_index', value = 'Symbol')

# Get stock records for S&P
table.get_records_by_key(key_values = {'Symbol':'SPGI'})

Multi-Head Ingest

To fully utilize multi-head ingest, the ingestion process should use a multi-node parallel processing framework, such as MapReduce, Storm, or Spark. As data is divided up and flows through the processing nodes, a Kinetica BulkIngest / GPUdbIngestor object can be instantiated in each node to push data to the proper Kinetica cluster node. This greatly increases the speed of ingestion by both parallelizing the ingestion process and spreading the network traffic across multiple nodes. All the functionality for this is encapsulated into the BulkInserter object.

Example

The following is a Java API code block that demonstrates the use of the BulkInserter for ingesting data.

// Create a bulk inserter for batch data ingestion
BulkInserter<MyType> bulkInserter = new BulkInserter<MyType>(gpudb, tableName, type, batchSize, null);

// Generate data to be inserted into the table, automatically inserting
// records whenever the batchSize limit is reached
for (int i = 0; i < numRecords; i++)
{
    MyType record = new MyType();
    record.put( 0, (i + 0.1) ); // col1
    record.put( 1, ("string " + String.valueOf( i ) ) ); // col2
    record.put( 2, "Group 1" );  // group_id
    bulkInserter.insert( record );
}

// To ensure all records are inserted, flush the bulk inserter object.
bulkInserter.flush();

Python Background Multihead

In the Python API, if you've created a GPUdbTable object for the table, you can use the built-in use_multihead_io parameter to have the GPUdbTable interface handle all Ingestor and RecordRetriever interactions in the background:

my_table_obj = gpudb.GPUdbTable(_type=my_columns, name="my_test_table", db=h_db, use_multihead_io=True)

Cautions

  • If using the Java API and MapReduce, there is a conflict between the version of Avro used by Kinetica and the one used by MapReduce. This conflict can be overcome by using the Maven shade plug-in with the relocation tag:
<relocation>
  <pattern>org.apache.avro</pattern>
  <shadedPattern>org.shaded.apache.avro</shadedPattern>
</relocation>