Kinetica in Motion
The Kinetica Blog
Blog »
Matt Brown

Analyzing Meetup RSVPs with Kinetica – Part One

Storing streaming data in the database
Share
Tweet about this on TwitterShare on LinkedIn0Share on Facebook0Share on Reddit0Share on Google+0

Introduction

Geospatial data is everywhere. Mapping directions, tracking a package, and reading a weather report are all examples of how we use geospatial data in our daily lives. In recent years, the nature of geospatial data has changed. Data is generated from different sources, such as technologies like IoT, drones, and autonomous vehicles, as well as from individuals and social networks. This data is also very large. This presents a number of challenges for developers that are not easy to solve with traditional databases. Accessing, moving, modifying, and storing this data is very complicated.

In June 2018, Profiq published a blog post about Kinetica’s high-performance analytics database. In this article, the first in a series of four, I’m going to take it from there and dive deeper into Kinetica’s capabilities by showing you how to store large amounts of streaming geospatial data from Meetup.com into Kinetica’s GPU-accelerated database using Kinetica’s Python API. Meetup is a well-known platform for organizing in-person events for people with similar interests. It is used globally by millions of people. Every minute, users send hundreds of responses to invitations (RSVPs). The service provides an API to access all of these actions.

Future articles in the series will focus on visualizing the data, using APIs for creating custom apps, training, and using simple prediction models. All of the code is available in public repositories. Now, without further ado, let’s get to work.

Installing and running Kinetica

The easiest way to install Kinetica’s database engine is to use Docker. Two variants are available on the Docker Hub. The kinetica/kinetica-intel image contains a version that is executable on standard CPUs. kinetica/cuda-91 requires the nvidia-docker runtime and uses CUDA to accelerate many database operations on an NVIDIA GPU. I’ve used the CPU version for this application, as the server doesn’t have a GPU installed.

The Kinetica Active Analytics Platform is designed to utilize your server resources as efficiently as possible. It automatically determines whether a query should be executed on a CPU or on a GPU. Depending on the queries you run, the database performs well even on a CPU-only server, so there is no need to upgrade your infrastructure. However, if you are processing a lot of data or would like to see great improvements in speed, I’d recommend deploying it on a system with CUDA GPU.

I prepared a modified image which automatically registers a product key from an environment variable, imports a demo Reveal dashboard (more about Reveal in future articles), and starts the database engine with the container. By default, only the database manager starts. You need to start the engine itself from the web interface or by executing a specific command in a terminal. If you are running the database on your local machine, I also recommend limiting available memory, either by setting the container’s memory limit (such as --memory=6g), or by modifying the database configuration.

Let’s start by cloning our GIT repository:

git clone git@github.com:profiq/meetup-analysis-kinetica.git

If you have docker-compose installed, paste your product key (you can start a free trial and get one here) into the docker-compose.yml file and start the container:

docker-compose up -d kinetica

We limited the amount of memory the container can use to 4GB. You can modify this setting by changing the value of the mem_limit attribute in docker-compose.yml. Please also keep in mind that the size of Kinetica’s Docker image is around 5GB.

After you start the container, you can check the web interface for database management running onhttp://localhost:8080. The default username and password is admin.

If you do not specify the container name, run:

docker-compose up -d

You will also start a second container running a Python script that creates the required database tables and starts collecting data from the Meetup.com API. Let’s have a look at the first of these two tasks.

Creating database tables

We want our application to store information about responses to Meetup events. For each response we need the following:

  • The unique ID of the event
  • The name of the event
  • The event page URL
  • The time of the event
  • The event location: latitude, longitude, and city name
  • The unique ID of the response
  • The time of the response
  • Whether the response was positive or negative
  • The city and country where the event is taking place
  • The number of Meetup group members and number of previous events

You could define a table with all of these columns using the web interface mentioned above. Since we want to have everything automated, we decided to write a Python script to create the table for us.

We used Kinetica’s Python API to communicate with the database. Python scripts are executed inside the python container which is also part of our repository. Creation of required tables is handled by the src/deploy.py script. Let’s have a look at the most important parts of the code:

To initiate a connection, run:

db = gpudb.GPUdb(host=config.GPUDB_HOST, port=config.GPUDB_PORT)

For this project, we didn’t activate user authentication, which is the database’s default state. For real-world setups, this option can be enabled in the database configuration.

After the connection opens, we can create a table for storing RSVPs from Meetup.com:

gpudb.GPUdbTable(
_type=config.EVENT_RSVP_TYPE,
db=db,
name=config.EVENT_RSVP_TABLE_NAME,
options={'collection_name': config.EVENT_RSVP_COLLECTION}
)

The table definition provided as the _type argument can be found in meetupmap/config.py:

EVENT_RSVP_TYPE = [
['event_id', 'string', 'char32', 'primary_key', 'shard_key'],
['name', 'string', 'char128', 'text_search'],
['url', 'string', 'char256'],
['event_timestamp', 'long', 'nullable', 'timestamp'],
['lat', 'double', 'nullable'],
['lon', 'double', 'nullable'],
['rsvp_id', 'long', 'primary_key'],
['response', 'int', 'int8'],
['rsvp_timestamp', 'long', 'timestamp'],
['city', 'string', 'char64', 'nullable', 'text_search'],
['country', 'string', 'char4', 'nullable'],
['group_members', 'long', 'nullable'],
['group_events', 'int', 'nullable']
]

The definition is a simple list of lists in which each of the lower-level lists represents one column of the table. The first element defines the column name, and the second determines the data type. Other elements describe additional properties such as subtypes, index definitions, or the possibility to store as null. For example, we defined a primary key consisting of event_id and rsvp_id columns and told the database to handle event_timestamp and rsvp_timestamp columns as timestamps. This lets us perform date-time operations on them, and when exploring the table, the data displays in a human-readable date and time format.

We stored event coordinates in two columns of type double. I recommend naming these columns lat and lon. This convention tells Reveal, Kinetica’s visualization tool, to automatically mark the columns as containing geospatial data.

This approach lets you store single points. But what if you need to work with more complex data; for example, a path of an autonomous vehicle? You can save various types of shapes as WKT strings simply in a string column with the subtype set to wkt.

Be careful when defining a database table. At the time of this writing, there were some settings you can’t just change later. These include primary/shard keys and certain data type changes. You can’t, for example, change a string column to long.

When we created a table, we also provided an optional collection name. Collections are used to logically separate data for different projects stored in one database.

You can also notice the try-except block surrounding the table creation code. The meetupmap/deploy.py script is executed on every python container start, and we don’t want to cause the container to stop the if the table already exists.

The file also contains a add_db_column() function which modifies the existing table by adding new column using the alter_table() method. Note that we replaced some variables with real parameter values in this example:


response = db.alter_table(
table_name=config.EVENT_RSVP_TABLE_NAME,
action='add_column',
value='city',
options={
'column_type': 'string',
'column_properties': 'char64,nullable,text_search'
})

We added city, country, group_members, and group_events columns. You might have noticed that they are already present in the table definition above. This was not the case for some older instances of the database we had already running. The alter_table() method allowed us to modify the table without recreating the container and losing the data we already collected. This is just one of many features provided by Kinetica’s Python API. Check the official documentation and tutorials if you want to know more.

Processing RSVPs From Meetup.com

Let’s start by having a look at the main() function inside the meetupmap/meetup.py file responsible for processing RSVPs from Meetup API:


db = gpudb.GPUdb(host=config.GPUDB_HOST,
port=config.GPUDB_PORT)
rsvp_queue = multiprocessing.Queue()
event_info_provider = apiutils.EventInfoProvider(db)

storer = threading.Thread(
target=store_rsvps,
kwargs={'db': db, 'queue': rsvp_queue,
'event_info_provider': event_info_provider})
storer.start()

websocket.enableTrace(False)
on_message = functools.partial(add_to_storing_queue,
queue=rsvp_queue)
ws = websocket.WebSocketApp(config.MEETUP_API_RSVP_ENDPOINT,
on_message=on_message)
ws.run_forever()

First, we opened a DB connection and created a Queue for storing RSVPs received from Meetup. The EventInfoProvider gives us additional information about the event that is not present in the RSVP received from the stream. This includes city name, country, ISO code, number of group members, and number of previous events organized by the same group.

Next, we created and started a thread to manage processing and storing the RSVPs. Finally, we started to listen to the RSVPs stream provided by Meetup using websockets. The websocket-client library is used to handle the socket. The library calls the add_to_storing_queue() function every time a new RSVP arrives.

You are probably wondering why we’re using a second thread to process data from the stream. EventInfoProvider often needs to call the Meetup API to receive the information it needs (it tries to find the information in our database from previously stored RSVPs first). Meetup limits the number of calls to 30 in 10 seconds, so a short 0.35s sleep is added before each API call to comply. Because of this pause, we aren’t able to process all the RSVPs as they are read from the stream; the main thread just puts them into a queue:


def add_to_storing_queue(_, rsvp_json, queue):
rsvp = json.loads(rsvp_json)
queue.put(rsvp)
print('[stream] RSVP ID: %d in queue' % rsvp['rsvp_id'])

RSVPs are received as JSON strings. The function parses them and puts the resulting dictionary into a queue.

Now, on to the storing part:


def store_rsvps(db, queue, event_info_provider):
rsvp_record_bases = []
while True:
rsvp = queue.get()
rsvp_record_base = record_base_from_rsvp(rsvp)
rsvp_record_bases.append(rsvp_record_base)

if len(rsvp_record_bases) == 10:
event_ids = [r['event_id'] for r in rsvp_record_bases]
event_info = event_info_provider.get_info(event_ids)
rsvp_records = add_event_info_to_record_bases(
event_info, rsvp_record_bases)
save_records_to_db(db, rsvp_records)
rsvp_record_bases = []

The function executed in a separate thread gets a RSVP from the queue and transforms it to a OrderedDict instance matching the table structure. This process is implemented by the record_base_from_rsvp() function:


def record_base_from_rsvp(rsvp):
rsvp_record = collections.OrderedDict()
rsvp_record['event_id'] = rsvp['event']['event_id']
rsvp_record['name'] = rsvp['event']['event_name']
rsvp_record['url'] = rsvp['event']['event_url']
rsvp_record['event_timestamp'] = rsvp['event']['time'] \
if 'time' in rsvp['event'] else None
rsvp_record['lat'] = rsvp['venue']['lat'] \
if 'venue' in rsvp else None
rsvp_record['lon'] = rsvp['venue']['lon'] \
if 'venue' in rsvp else None
rsvp_record['rsvp_id'] = rsvp['rsvp_id']
rsvp_record['response'] = 1 if rsvp['response'] == 'yes' else 0
rsvp_record['rsvp_timestamp'] = rsvp['mtime']
return rsvp_record

As mentioned earlier, we are still missing some data- city, country, group_members, and group_events. We wait until 10 RSVPs are collected and transformed, and then try to find the info using EventInfoProvider defined in meetupmap/apiutils.py for all RSVPs at once.

The Meetup API lets us request data about multiple events at once and we want to reduce the number of requests. The method returns a dictionary in which a unique is used as a key and the value assigned is a dictionary containing data about the event:


{
'123456’: {
'city': 'London',
'country’: ‘uk’,
'group_members': 1234,
'group_events’: 56
},
'654321’: {
...
}

To combine this additional event data with RSVPs, we use the dict.update() method provided by Python. If event info is not found, we simply leave the fields empty by setting their values to None.


def add_event_info_to_record_bases(event_info, rsvp_record_bases):
rsvp_records = []
for rsvp_base in rsvp_record_bases:
rsvp = rsvp_base.copy()
if rsvp['event_id'] in event_info:
rsvp.update(event_info[rsvp['event_id']])
else:
rsvp['city'] = rsvp['country'] = \
rsvp['group_members'] = rsvp['group_events'] = None
rsvp_records.append(rsvp)
return rsvp_records

Finally, we inserted the records into the database table:


def save_records_to_db(db, rsvp_records):
dumped_records = [
json.dumps(r, ensure_ascii=False) for r in rsvp_records]
response = db.insert_records('event_rsvp', dumped_records,
list_encoding='json')

if response['status_info']['status'] == 'OK':
print('[store] Record inserted')
else:
print('[store] Error while storing')
print(response)

We serialized every record separately into a JSON string and then we called the db.insert_records() to store them. Notice the ensure_ascii=False parameter. By default, json.dumps() encodes all unicode characters. This behavior seems not to work well with the GPU-accelerated database, and the strings received from the database seem to be damaged. Setting this parameter’s value to False solves the problem.

Conclusion

In this article, I showed you how to store large amounts of streaming geospatial data containing geospatial information into Kinetica’s Active Analytics Platform using the Python API. The structure of the code might look familiar: open a connection, create a table with a specific structure, and then call a method for inserting new data. This approach is used by many other database libraries. If you already possess knowledge about some of these solutions, you will master Kinetica in no time.

You are, of course, not limited to using Python. Kinetica provides libraries for Java, C++, and Javascript as well as raw a REST API and an ODBC connector, which lets you manipulate your data using SQL.

In the next article, I will show you how the data we collected can be visualized using Reveal—a powerful visualization applicatievent_rsvpon that is part of Kinetica’s Active Analytics Platform.

Check out the other blogs in this series!:

Analyzing Meetup RSVPs with Kinetica Part 2

Analyzing Meetup RSVPs with Kinetica Part 3 – Visualizing & Analyzing Data in Custom Web Applications with Kinetica

Leave a Comment