Version:

Table Monitor Tutorial

The following is an excerpted tutorial, extracted from a complete example using the Python Table Monitor API, demonstrating the basic use of the three table monitor types:

  • table insert monitor - watches a table for newly inserted records and can make use of those records elsewhere
  • table update monitor - watches a table for updates and reports the number of records updated in each update request issued
  • table delete monitor - watches a table for deletes and reports the number of records deleted in each delete request issued

The table monitors will be managed via instances of the GPUdbTableMonitor.Client class, which can create, process, & remove table monitors.

Prerequisites

The prerequisites for running the table monitor example are listed below:

Python API Installation

The native Kinetica Python API is accessible through the following means:

  • For development on the Kinetica server:
  • For development not on the Kinetica server:

Kinetica RPM

In default Kinetica installations, the native Python API is located in the /opt/gpudb/api/python directory. The /opt/gpudb/bin/gpudb_python wrapper script is provided, which sets the execution environment appropriately.

Test the installation:

/opt/gpudb/bin/gpudb_python /opt/gpudb/api/python/examples/example.py

Important

When developing on the Kinetica server, use /opt/gpudb/bin/gpudb_python to run Python programs and /opt/gpudb/bin/gpudb_pip to install dependent libraries.

Git

  1. In the desired directory, run the following but be sure to replace <kinetica-version> with the name of the installed Kinetica version, e.g., v7.1:

    git clone -b release/<kinetica-version> --single-branch https://github.com/kineticadb/kinetica-api-python.git
    
  2. Change directory into the newly downloaded repository:

    cd kinetica-api-python
    
  3. In the root directory of the unzipped repository, install the Kinetica API:

    sudo python setup.py install
    
  4. Test the installation (Python 2.7 (or greater) is necessary for running the API example):

    python examples/example.py
    

PyPI

The Python package manager, pip, is required to install the API from PyPI.

  1. Install the API:

    pip install gpudb --upgrade
    
  2. Test the installation:

    python -c "import gpudb;print('Import Successful')"
    

    If Import Successful is displayed, the API has been installed as is ready for use.

Script Detail

In this example, a set of batches of randomly-generated weather data for a fixed set of cities will be inserted into a "history" table. A table monitor client (via GPUdbTableMonitor.Client) will be created to watch for those newly inserted records and update a "status" table of those cities with their latest randomly-generated temperature values.

Afterwards, a set of updates will be made to the "status" table, followed by a set of deletes. A second table monitor client will be created to watch for these two event types and output the numbers of records updated & deleted to the console.

There are four main components involved in this example:

  • The "history" table, into which weather records will be inserted, in batch
  • The "status" table, which reflects the latest weather data for each city
  • The table monitor client for the "history" table, which will watch for inserts into the table and update the "status" table with those records
  • The table monitor client for the "status" table, which will watch for updates & deletes on the table and output the corresponding records counts

The key parts of table monitor processing via Python are:

Imports & Aliases

The first step is to import Kinetica libraries, aliasing some of the classes for ease of use. Other supporting libraries need to be imported, as well.

import gpudb
from gpudb import GPUdbRecordColumn as GRC
from gpudb import GPUdbColumnProperty as GCP
from gpudb import GPUdbTableMonitor as GTM

import json
from tabulate import tabulate

Creating a Table Monitor Client

Next, create a connection to Kinetica.

# Establish connection with an instance of Kinetica on port 9191
kinetica = gpudb.GPUdb(host = ['http://' + args.host + ':9191'], username = args.username, password = args.password)

Then, subclass GPUdbTableMonitor.Client, the table monitor client class, and configure it to monitor the "history" table, which will use the connection to manage the table's monitor as well as process the insert events on it.

class StatusUpdater(GTM.Client):

    def __init__(self, kinetica):

Create a list of callback types to use when monitoring the "history" table.

        callbacks = [
            GTM.Callback(
                GTM.Callback.Type.INSERT_DECODED,
                self.on_insert_decoded,
                event_options = GTM.Callback.InsertDecodedOptions(
                    GTM.Callback.InsertDecodedOptions.DecodeFailureMode.ABORT
                )
            )
        ]

Initialize the class via the parent, passing the name of the table to monitor, the callback type list, and a list of options. Also, grab a handle to the "status" table for issuing updates.

        super(StatusUpdater, self).__init__(
            kinetica,
            HISTORY_TABLE,
            callback_list = callbacks,
            options = GTM.Options(dict(inactivity_timeout=0.1))
        )

        self.status_table = gpudb.GPUdbTable(name = STATUS_TABLE, db = kinetica)

Define the callback function that will be used by the table monitor client, which will update the "status" table with new "history" table records.

    def on_insert_decoded(self, history_record):

        status_update_record = [
            history_record["city"],
            history_record["state_province"],
            history_record["country"],
            history_record["temperature"],
            history_record["ts"]
        ]

        self.status_table.insert_records(status_update_record, options = {"update_on_existing_pk": "true"})

Define a second table monitor client, this time for monitoring updates & deletes on the "status" table.

class StatusReporter(GTM.Client):

    def __init__(self, kinetica):

Create a list of callback types to use when monitoring the "status" table.

        callbacks = [
            GTM.Callback(
                GTM.Callback.Type.UPDATED,
                self.on_update
            ),
            GTM.Callback(
                GTM.Callback.Type.DELETED,
                self.on_delete
            )
        ]

Initialize the class via the parent, passing the name of the table to monitor, the callback type list, and a list of options. Also, grab a handle to the "status" table for outputting records after each update/delete.

        super(StatusReporter, self).__init__(
            kinetica,
            STATUS_TABLE,
            callback_list = callbacks,
            options = GTM.Options(dict(inactivity_timeout=0.1))
        )

        self.status_table = gpudb.GPUdbTable(name = STATUS_TABLE, db = kinetica)

Define one callback function for updates and one for deletes that will be used by the table monitor client. Each of these will output their respective event counts and output the full set of records from the "status" table.

    def on_update(self, count):

        print "[TM/SR]  Updated <%s> city temperature statuses:" % (count)
        self.show_status()
    def on_delete(self, count):

        print "[TM/SR]  Deleted <%s> city temperature statuses:" % (count)
        self.show_status()

Lastly, define the show_status function that will be called by each event handler to output the records in the "status" table.

    def show_status(self):

        self.status_table.get_records_by_column(
            column_names = ["city", "state_province", "country", "temperature", "last_update_ts"],
            options = {"sort_by":"city"},
            print_data = True
        )

Displaying a Table's Monitors

After the table monitor clients have been created, confirm this by looking up the table monitors attached to the "history" & "status" tables and display the monitored event and event topic ID for each table monitor the tables have.

table_monitor_headers = ["Table Name", "Monitor Type", "Topic ID"]
table_monitor_records = []

# Show table monitors on all the tables
for table_name in [
    HISTORY_TABLE,
    STATUS_TABLE
]:
    table_info = kinetica.show_table(table_name)['additional_info'][0]
    
    if 'table_monitor' in table_info:

        table_monitor_info = json.loads(table_info['table_monitor'])

        for monitor_type in table_monitor_info:
            table_monitor_records.append([table_name, monitor_type, table_monitor_info[monitor_type]])

print( tabulate( table_monitor_records, headers = table_monitor_headers, tablefmt = 'grid' ) ) 
+----------------------------------------------+----------------+--------------------------+
| Table Name                                   | Monitor Type   | Topic ID                 |
+==============================================+================+==========================+
| tutorial_table_monitor.table_monitor_history | insert         | 3Rq3COxirg3YZk4Daqnl1w== |
+----------------------------------------------+----------------+--------------------------+
| tutorial_table_monitor.table_monitor_status  | update         | YOAIeVIBwJ1CFK+UTFW/DQ== |
+----------------------------------------------+----------------+--------------------------+
| tutorial_table_monitor.table_monitor_status  | delete         | zS+skKevnKMZZGT94uzpig== |
+----------------------------------------------+----------------+--------------------------+

Starting/Stopping the Table Monitor Client

To start the table monitor clients, instantiate each container class with a handle to the database and call their respective start functions.

status_updater = StatusUpdater(kinetica)
status_reporter = StatusReporter(kinetica)

status_updater.start_monitor()
status_reporter.start_monitor()

To stop and remove the table monitor clients (and their corresponding managed table monitors), call their respective stop functions.

status_updater.stop_monitor()
status_reporter.stop_monitor()

Download & Run

Included below is a complete example of the Python Table Monitor API, containing all of the above code samples, as well as the loading of data into the "history" & "status" tables and the verification output of records.

To run the complete sample, ensure the table_monitor.py file is in the current directory, and do the following:

  • If on the Kinetica host:

    /opt/gpudb/bin/gpudb_python table_monitor.py [--username <username> --password <password>]
    
  • If running after using PyPI or GitHub to install the Python API:

    python table_monitor.py [--host <target_host_ip>] [--username <username> --password <password>]
    

Note

As this script creates a schema and several database objects within it, system admin permission is required to run it.