Version:

Multi-Head Egress

Multi-Head Egress is a mechanism that allows sharded data to be retrieved directly from cluster nodes, bypassing the overhead of pulling the data through the head node from the cluster nodes. This greatly increases the speed of retrieval by parallelizing the output of data and spreading the network traffic across multiple nodes.

Operationally, the egress mechanism is a lookup of a given shard key value in a given table or view, filtering out any records that don't match an optional expression.

API Support

Language Multi-head Egress Mechanism
C++ RecordRetriever
C# RecordRetriever
Java RecordRetriever
Javascript X
Node.js X
Python RecordRetriever
REST X
SQL X

Configuration

In order for the cluster nodes to send data directly to an egress client, the configuration on each node needs to be updated to allow the incoming HTTP requests for that data. The /opt/gpudb/core/etc/gpudb.conf file needs to have the following property set for multi-head egress to work properly:

# Enable worker HTTP servers...
enable_worker_http_servers = true

Both the HA and HTTPD configurations will be taken into account, as well as any public URLs (which override all other settings) defined in /opt/gpudb/core/etc/gpudb.conf when returning multi-head URLs for the worker nodes. With the exception of the Python background multi-head process, the multi-head egress object requires a list of worker nodes to use to distribute the data requests, with one entry in the list for each node/process. This list can be autopopulated simply by using a GPUdb connection object, which can retrieve the list of available cluster nodes from the database itself. Below is a code snippet (in Java) showing an automatically populated worker list:

GPUdb gpudb = new GPUdb("http://localhost:9191");
WorkerList workers = new WorkerList(gpudb);

Note that in some cases, workers may be configured to use more than one IP address, not all of which may be accessible to the client; the worker list constructor uses the first IP returned by the server for each worker. In cases where workers may use more than one IP address and public URLs are not configured, a regular expression Pattern or prefix String can be used to match the correct worker IP addresses:

// Match 172.X.Y.Z addresses
WorkerList(GPUdb gpudb, Pattern.compile("172\\..*"));
// or
WorkerList(GPUdb gpudb, "172.");

Considerations

There are several factors to consider when using multi-head egress:

  • Only sharded tables/views can be queried.
  • Only shard key value lookups are allowed; arbitrary queries against sharded tables/views are not supported.
  • Null lookup values are not supported.
  • Lookup values cannot be expression functions.
  • The shard key and any columns used in the expression must all be indexed. There is an implicit index for a primary key, so an explicit column index is not required when the primary key is the shard key, or when the primary key is a superset of the shard key and the expression references all of the primary key columns that are not part of the shard key.

Python Background Multihead Example

In the Python API, the GPUdbTable object has a use_multihead_io parameter, which allows the GPUdbTable class to handle all RecordRetriever interactions with the associated table in the background. The following is a Python API example that demonstrates the use of the background RecordRetriever for retrieving data.

Important

This example relies on the stocks data set, which can be imported into Kinetica via GAdmin. That data will be copied to two new tables and have indexes applied, as necessary.

Setup

First, connect to the database; here, the host IP address is passed in as a parameter to the Python API call:

h_db = gpudb.GPUdb(host = ['http://' + args.host + ':9191'], username = args.username, password = args.password)

Next, prepare the target table used by the single-column multi-head key lookup examples by copying the stocks table to a new table:

SQL_CREATE_TABLE = """
    CREATE TABLE example.stocks_multihead AS
    SELECT *
    FROM demo.stocks
"""

h_db.execute_sql(SQL_CREATE_TABLE, 0, 0)

Then add an index on the sharded column, Symbol, which must be the target of any multi-head lookup on the stocks table:

h_db.alter_table("example.stocks_multihead", "create_index", "Symbol")

To prepare the target table used by the multi-column multi-head key lookup examples, copy, reshard, & index the stocks table; the new shard key will be on both the Industry & Symbol columns:

SQL_CREATE_TABLE = """
    CREATE TABLE example.stocks_multihead_multicolumn AS
    SELECT *, KI_SHARD_KEY(Industry, Symbol) 
    FROM demo.stocks
"""

h_db.execute_sql(SQL_CREATE_TABLE, 0, 0)

# Create a column index on each of the sharded columns
h_db.alter_table("example.stocks_multihead_multicolumn", "create_index", "Industry")
h_db.alter_table("example.stocks_multihead_multicolumn", "create_index", "Symbol")

Key Lookup

Grab a handle to the stocks_multihead table, passing the use_multihead_io option to enable background multi-head key lookups:

stocks_table = gpudb.GPUdbTable(None, name = "example.stocks_multihead", db = h_db, use_multihead_io = True)

Then perform the multi-head key lookup, searching the shard column for the value SPGI:

response = stocks_table.get_records_by_key(["SPGI"])

Lastly, output the results:

for record in response["data"]:
    
    print("{:10s} {:8.4f} {:8.4f} {:8.4f} {:8.4f}".format(
            datetime.fromtimestamp(record["Date"]/1000.0).strftime("%Y-%m-%d"),
            record["Open"],
            record["Low"],
            record["High"],
            record["Close"]
    ))

Key Lookup with Expression

Since any column involved in a multi-head key lookup must be indexed, first add an index to the Date column, which will be used in the example filter:

h_db.alter_table("example.stocks_multihead", "create_index", "Date")

Then perform the multi-head key lookup with an extra filtering expression, searching the shard column for the value SPGI and filtering out records prior to 2017:

response = stocks_table.get_records_by_key(["SPGI"], "Date >= '2017-01-01'")

The results can be output in the same manner shown under Key Lookup.

Multi-Column Key Lookup

Grab a handle to the stocks_multihead_multicolumn table, passing the use_multihead_io option to enable background multi-head key lookups:

stocks_table = gpudb.GPUdbTable(None, name = "example.stocks_multihead_multicolumn", db = h_db, use_multihead_io = True)

Then perform the multi-head key lookup, searching the sharded Industry & Symbol columns for the values Financial Exchanges & Data & SPGI, respectively:

response = stocks_table.get_records_by_key(
        {"Industry": "Financial Exchanges & Data", "Symbol": "SPGI"}
)

The results can be output in the same manner shown under Key Lookup.

Multi-Column Key Lookup with Expression

Since any column involved in a multi-head key lookup must be indexed, first add an index to the Date column, which will be used in the example filter:

h_db.alter_table("example.stocks_multihead_multicolumn", "create_index", "Date")

Then perform the multi-head key lookup with an extra filtering expression, searching the sharded Industry & Symbol columns for the values Financial Exchanges & Data & SPGI, respectively, and filtering out records prior to 2017:

response = stocks_table.get_records_by_key(
        {"Industry": "Financial Exchanges & Data", "Symbol": "SPGI"},
        "Date >= '2017-01-01'"
)

The results can be output in the same manner shown under Key Lookup.

Result

The multi-head key lookup should consistently return faster than the same query without using multi-head. The results of a sample test run bear this out:

Lookup Type Time (No Multi-Head) Time (Multi-Head) Record Count
Single-Column Key Only 0.0043 0.0020 222
Single-Column Key + Expression 0.0031 0.0009 53
Multi-Column Key 0.0046 0.0024 222
Multi-Column Key + Expression 0.0031 0.0010 53

Cautions

  • If using the Java API and MapReduce, there is a conflict between the version of Avro used by Kinetica and the one used by MapReduce. This conflict can be overcome by using the Maven shade plug-in with the relocation tag:
<relocation>
  <pattern>org.apache.avro</pattern>
  <shadedPattern>org.shaded.apache.avro</shadedPattern>
</relocation>