Kinetica in Motion
The Kinetica Blog
Blog »
Dipti Joshi

Real-Time Streaming Analytics with Kinetica

Introduction

In this age of digital innovation, organizations need to have continuous insight into data about their services, users, and the resources powering these services. In this article, we explore how Kinetica delivers high-performance streaming and live analytics at massive scale to today’s organizations.

Kinetica helps its customers solve a variety of problems including dynamic inventory replenishment, mail routing optimization, shortening oil and gas discovery time, scalable ocean trash detection for environmental improvement programs, faster new drug discovery, and many more. While these use cases are in different industries and domains, there is one common theme behind them — real-time high-throughput streaming ingestion while carrying out high-performance analytics at the same time.

To give you a better idea of the scale, consider the details of these two Kinetica customers’ use cases:

  • A retailer ingests 250 million records per day for inventory tracking on an 8-node Tesla P100 Kinetica cluster while performing 112,000 read queries per second.
  • A large mail carrier ingests a billion events per day for mail routing optimization on a 10-node P100 Kinetica cluster, while 15,000 users are simultaneously querying the data.

Kinetica’s blazingly fast streaming analytics is made possible by its native parallel ingest and optimized non-blocking data structures. Kinetica is a distributed massively parallel platform — there are multiple servers working simultaneously on distributed data. To distribute a table across many servers, Kinetica uses a special key called a shard_key that dictates the partitioning. Once a sharding scheme has been defined, data can be ingested either sequentially at head node, or in parallel by worker nodes for maximum throughput.

Sequential Ingest

When ingesting sequentially, an application sends batches of data serially to Kinetica’s head node, which will forward the data to the appropriate worker based on shard_key.

However, this means the head node can become a bottleneck for ingesting data.

Let us look at the example code for sequential ingest:

# Establish connection with a locally-running instance of Kinetica
h_db = gpudb.GPUdb(
    encoding="BINARY",
    host="127.0.0.1",
    port="9191",
    username="user1",
    password="mysecret"
)
collection = "taxi_info"
table_taxi = "taxi_trip_data"

# create collection
    collection_option_object = gpudb.GPUdbTableOptions.default().collection_name(collection)
 
# Define table columns
taxi_columns = [
        ["transaction_id", "long", "primary_key"],
        ["payment_id", "long", "primary_key", "shard_key"],
        ["vendor_id", "string", "char4"],
        ["pickup_datetime", "long", "timestamp"],
        ["dropoff_datetime", "long", "timestamp"],
        ["passenger_count", "int", "int8"],
        ["trip_distance", "float"],
        ["pickup_longitude", "float"],
        ["pickup_latitude", "float"],
        ["dropoff_longitude", "float"],
        ["dropoff_latitude", "float"]
    ]
# Clear any existing table with the same name  
    h_db.clear_table(table_name=table_taxi, options=no_error_option)

    # Create the table from the type and place it in a collection
    try:
        table_taxi_obj = gpudb.GPUdbTable(
            _type=taxi_columns,
            name=table_taxi,
            options=collection_option_object,
            db=h_db
        )
        print "Taxi table object successfully created"
    except gpudb.GPUdbException as e:
        print "Taxi table object creation failure: {}".format(str(e))

    print "\n"
    print "INSERTING DATA"
    print "--------------"
    print
# Insert records from a CSV File into the Taxi table
    print "Inserting records into the Taxi table via CSV..."
    print
    taxi_data = csv.reader(open('/tmp/data/taxi_trip_data.csv'))
    taxi_data.next()
    taxi_records = []
    for record in taxi_data:
        record_data = []
        record_data.append(long(record[0]))
        record_data.append(long(record[1]))
        record_data.append(record[2])
        record_data.append(long(record[3]))
        record_data.append(long(record[4]))
        record_data.append(int(record[5]))
        record_data.append(float(record[6]))
        record_data.append(float(record[7]))
        record_data.append(float(record[8]))
        record_data.append(float(record[9]))
        record_data.append(float(record[10]))
        taxi_records.append(record_data)
    table_taxi_obj.insert_records(taxi_records)
    print "Number of records inserted into the Taxi table: {}".format(
        table_taxi_obj.size()
    )

Parallel Ingest

To avoid a bottleneck at the head node, the application can instead choose to tell Kinetica’s Python, ODBC, or JDBC driver that the data should be ingested in parallel by all the worker nodes. By simply flipping a switch, Kinetica is able to divvy the data client-side before sending it to the designated worker node, which streamlines communication and processing time.

Now, let us look at the example code for doing parallel ingest:

# Establish connection with a locally-running instance of Kinetica 
h_db = gpudb.GPUdb(
    encoding="BINARY",
    host="127.0.0.1",
    port="9191",
    username="user1",
    password="mysecret"
)

collection = "taxi_info"
table_taxi = "taxi_trip_data"

# create collection
    collection_option_object = gpudb.GPUdbTableOptions.default().collection_name(collection)
 
# Define table columns
taxi_columns = [
        ["transaction_id", "long", "primary_key"],
        ["payment_id", "long", "primary_key", "shard_key"],
        ["vendor_id", "string", "char4"],
        ["pickup_datetime", "long", "timestamp"],
        ["dropoff_datetime", "long", "timestamp"],
        ["passenger_count", "int", "int8"],
        ["trip_distance", "float"],
        ["pickup_longitude", "float"],
        ["pickup_latitude", "float"],
        ["dropoff_longitude", "float"],
        ["dropoff_latitude", "float"]
    ]
# Clear any existing table with the same name  
    h_db.clear_table(table_name=table_taxi, options=no_error_option)

    # Create the table from the type and place it in a collection
    try:
        table_taxi_obj = gpudb.GPUdbTable(
            _type=taxi_columns,
            name=table_taxi,
            options=collection_option_object,
       use_multihead_io=True,
       multihead_ingest_batch_size=1000
        )
        print "Taxi table object successfully created"
    except gpudb.GPUdbException as e:
        print "Taxi table object creation failure: {}".format(str(e))

    print "\n"
    print "INSERTING DATA"
    print "--------------"
    print
# Insert records from a CSV File into the Taxi table
    print "Inserting records into the Taxi table via CSV..."
    print
    taxi_data = csv.reader(open('/tmp/data/taxi_trip_data.csv'))
    taxi_data.next()
    taxi_records = []
    for record in taxi_data:
        record_data = []
        record_data.append(long(record[0]))
        record_data.append(long(record[1]))
        record_data.append(record[2])
        record_data.append(long(record[3]))
        record_data.append(long(record[4]))
        record_data.append(int(record[5]))
        record_data.append(float(record[6]))
        record_data.append(float(record[7]))
        record_data.append(float(record[8]))
        record_data.append(float(record[9]))
        record_data.append(float(record[10]))
        taxi_records.append(record_data)
    table_taxi_obj.insert_records(taxi_records)
   # Flush any records at the end that is smaller than batch size
    table_taxi_obj.flush_data_to_server()

    print "Number of records inserted into the Taxi table: {}".format(
        table_taxi_obj.size()
    )

Note the difference between serial and multi-head ingest is only passing of use_multihead_io and multihead_ingest_batchsize options to GPUdbTable constructor and calling of the flush_data_to_server(). The rest of the code is identical between serial and multi-head ingest.

table_taxi_obj = gpudb.GPUdbTable(
            _type=taxi_columns,
            name=table_taxi,
            options=collection_option_object,
       use_multihead_io=True,
       multihead_ingest_batch_size=1000
        )
…
table_taxi_obj.flush_data_to_server()

A single flag and a flush call is all that is needed from the application to take advantage of Kinetica’s parallel ingest. There is no need to be aware of how many nodes are in the cluster, where they are, or how the data is distributed across them! Best of all, this capability is widely available to developers in Kinetica’s native Python, Java, and C++ APIs, and its ODBC and JDBC drivers.

Simultaneous Read and Write

In addition to high-throughput parallel ingest, Kinetica enables high-performance simultaneous read on live incoming data. Kinetica stores its data in a columnar fashion. That is, each column is stored independently from one another, enabling faster scans and projections. In addition to columnar storage, Kinetica has designed smart underlying data structures that allow write and read operations to simultaneously access the same row’s data without blocking each other. In many cases, these smart data structures can read from memory without touching the disk’s persistent data for the fastest reads possible. From a developer’s perspective, there is no need to wrap read and write operations in separate transactions. Each one is atomic unto itself.

High-throughput streaming ingest and simultaneous high-performance query, the two pillars of active analytics, results in a supercharged analytics data platform unlike any other. Thus, Kinetica enables the enterprise to tackle the toughest streaming use cases, delivering new insights with an elegant and user-friendly platform.

See the difference for yourself! Get started with Kinetica today by signing up for a free trial or contact us to learn more.

Dipti Joshi is director of product management at Kinetica.

Leave a Comment





This site uses Akismet to reduce spam. Learn how your comment data is processed.