Friday, January 9, 2015

Using the Cassandra Bulk Loader, Updated

We introduced sstableloader back in 0.8.1, in order to do bulk loading data into Cassandra. When it was first introduced, we wrote a blog post about its usage along with generating SSTable to bulk load.
Now, Cassandra version 2.1.0 was released, and bulk loading has been evolved since the old blog post. Let’s see how the change makes our life easier than before.

What’s changed?

Specific changes are:
  • sstableloader no longer participates in gossip membership to get schema and ring information. Instead, it just contacts one of the nodes in the cluster and ask for it. This allows you to bulk load from the same machine where cassandra is running, since it no longer listens at the same port with cassandra.
  • Internally, streaming protocol is re-designed. You can stream data more efficiently than before.
  • New CQLSSTableWriter is introduced(CASSANDRA-5894). You can now create SSTables using familiar CQL.
In the old post, we showed two scenarios where sstableloader is used. Let’s see how the changes work in those scenes.
I use Apache Cassandra ver 2.1.0 through out this example, from cluster to running sstableloader.

Example 1 – Loading existing SSTables

Usage of sstableloader has not changed much, but because it has to contact the node to get schema for loading SSTables, you have to specify the address(es) of the node by using -d option.
So for example, you want to bulk load to
$ bin/sstableloader -d 127.0.0.1 ~/Keyspace1/Standard1-cb5e6f30458811e49349511b628b066f
Established connection to initial hosts
Opening sstables and calculating sections to stream
Streaming relevant part of /data/Keyspace1/Standard1-cb5e6f30458811e49349511b628b066f/Keyspace1-Standard1-ka-6-Data.db /data/Keyspace1/Standard1-cb5e6f30458811e49349511b628b066f/Keyspace1-Standard1-ka-5-Data.db to [/127.0.0.1, /127.0.0.2, /127.0.0.3]
progress: [/127.0.0.1]0:2/2 100% [/127.0.0.2]0:2/2 100% [/127.0.0.3]0:2/2 100% total: 100% 0  MB/s(avg: 5 MB/s)
Summary statistics:
   Connections per host:         : 1
   Total files transferred:      : 6
   Total bytes transferred:      : 98802914
   Total duration (ms):          : 9455
   Average transfer rate (MB/s): : 5
   Peak transfer rate (MB/s):    : 11
As you can see, some stats are printed out after the bulk load.

Example 2 – Loading external data

Previously, we had example that creates SSTables from CSV using UnsortedSimpleSSTableWriter and uses sstableloader to load it to Cassandra cluster in the old post.
Schema there is created with thrift, and it has a simple, flat table structure.
For this updated post, let’s do more complex scenario with new CQLSSTableWriter.
We will create real data from Yahoo! Finance to load historical prices of stocks in time-series manner.

Schema Definition

If we take a look at CSV file for Yahoo!(YHOO), it has 7 fields in it.
Date,Open,High,Low,Close,Volume,Adj Close
2014-09-25,39.56,39.80,38.82,38.95,35859400,38.95
Let’s use ticker symbol as our partition key, and ‘Date’ field as clustering key.
So schema looks like:
We use CLUSTERING ORDER BY to query recent data easily.

Generating SSTable using CQLSSTableWriter

How do you bulk load data to such a schema? If you choose to use UnsortedSimpleSSTableWriter as we did in the old post, you have to manually construct each cell of complex type to fit to your CQL3 schema. This requires you to have deep knowledge of how CQL3 works internally.
Enter CQLSSTableWriter.
All you need is DDL for table you want to bulk load, and INSERT statement to insert data to it.
// Prepare SSTable writer
CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder();
// set output directory
builder.inDirectory(outputDir)
       // set target schema
       .forTable(SCHEMA)
       // set CQL statement to put data
       .using(INSERT_STMT)
       // set partitioner if needed
       // default is Murmur3Partitioner so set if you use different one.
       .withPartitioner(new Murmur3Partitioner());
CQLSSTableWriter writer = builder.build();
// …snip…
while ((line = csvReader.read()) != null)
{
    // We use Java types here based on
    // http://www.datastax.com/drivers/java/2.0/com/datastax/driver/core/DataType.Name.html#asJavaClass%28%29
    writer.addRow(ticker,
                  DATE_FORMAT.parse(line.get(0)),
                  new BigDecimal(line.get(1)),
                  new BigDecimal(line.get(2)),
                  new BigDecimal(line.get(3)),
                  new BigDecimal(line.get(4)),
                  Long.parseLong(line.get(5)),
                  new BigDecimal(line.get(6)));
}
writer.close();
You can see complete example on my github.
After you generating SSTable, you can just use sstableloader to target cluster as described before.
There are still some limitations in CQLSSTableWriter, like you cannot use it in parallel, or user defined types are not supported yet.
But we keep improving so stay tuned to Apache JIRA.

Wrap up

Generating SSTable and bulk loading have been improved over the past release. There are many new features available to make your life easier.
Start experimenting by yourself today!

No comments:

Post a Comment