Friday, January 9, 2015

Cassandra Bulk Loader

Cassandra Bulk Loader


Bulk Loading into Cassandra cluster was used to be a difficult task. But if you have all the data to be inserted in place , loading data to the Cassandra is going to be a cakewalk with utility that Cassandra has introduced  in Cassandra-0.8.1.
I tried inserting the data to the Cassandra using Cassandra Java Client  Pelops and Python Client Pycassa. But for the heavy data load we have (70 GB of text data) we found that both of these libraries are not helpful.
After lots of effort on Pycassa and Pelops we tried the sstableloader and this utility is more robust  and time efficient. Sstableloader  utility loads the available or generated sstables into Cassandra .
In this Blog I am going to cover
  1. Generating sstables
  2. Loading sstables with sstableloader
  3. Improving performance of generation and loading
  4. Tuning  generation for optimize performance of the Cassandra
  5. Advantage of sstableloader utility over other bulk loading  techniques.
Sstableloader:
Cassandra 0.8.1 introduces a new tool sstableloader to load the sstables to Cassandra and this is the fastest way to insert the data into Cassandra. ( Later in this document we will see how to use sstableloader )
sstableloader is a tool that, given a set of sstable data files, streams them to a live cluster. It does not simply copy the set of sstables to every node, but only transfers the relevant part of the data to each, conforming to the replication strategy of the cluster.So the sstable needs the data to be in the sstable format,Lets see how to generate sstables from the row or text data. 

Why Use SStable Generator?
Ssstableloader is the best way to insert the data into Cassandra and it requires the data in the form of  sstables.
We have the raw( text ) data which is not in the form of sstables. So we need to create sstables from this raw data .
So to convert this raw data into the sstables the simplest solution is the new Java class SSTableSimpleUnsortedWriter introduced in Cassandra 0.8.2. 
1) Generating SSTables

SSTableSimpleUnsortedWriter API

org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter 

a) Creating  Writer :

Here we get the writer which creates sstable for a given column family of given keyspace
SSTableSimpleUnsortedWriter columnFamilyWriter = new SSTableSimpleUnsortedWriter(
        directory,
        keyspace,
        "ColumnFamilyName",
        Utf8.instance,
        null,
        64);
  • The directory is the directory where to put the sstables ,this directory is object of File class in java
  • Keyspace is  the keyspace of the column families (a String).
  • Next, there are the column family name and the comparator and sub-columns comparator–here, we don’t use super columns so the sub-columns comparator is null.
  • The last parameter is a “buffer” size:
  • SSTableSimpleUnsortedWriter buffers whatever input it gets in memory and “flush” everything in one sstable once the buffer size is reached.
  • That is, the resulting sstables will be approximately of size equal to buffer.
b) Writing to sstables :
Here using above writer we populates rows in column family with column names and value specified.

for (...each Row ,curresponding Column and Value)
{
    columnFamilyWriter.newRow(byte(RowKey));//byte converts String into ByteBuffer
    columnFamilyWriter.addColumn(bytes("ColumnName"), bytes(“ColumnValue”),
timestamp);


                        }

c) Writing to sstables with TTL :
TTL is time to live .it is provided in seconds and the column will be expired after   the given seconds.
for (...each Row ,curresponding Column and Value)
{
    columnFamilyWriter.newRow(byte(RowKey));//byte converts String into ByteBuffer
    columnFamilyWriter.addExpiringColumn(bytes("ColumnName"), bytes(“ColumnValue”), timestamp, ttl, expirationTimestampMS );
}
Note: expirationTimestampMS -this is the server time timestamp used for actually
expiring the column.It should be (insertion time in milliseconds + TTL in milliseconds)
 
 d) Closing sstablewriter  :
columnFamilyWriter.close();
 
Things to remember to compile and run the program for sstables generation:
To compile this file the Cassandra jar (>= 0.8.2) needs to be in the classpath (javac -cp <path_to>/apache-cassandra-0.8.2.jar DataImportExample.java). To run it, the Cassandra jar needs to be present as well as the jar of the librairies used by Cassandra (those in the lib/ directory of Cassandra source tree). Valid cassandra.yaml and log4j configuration files should also be accessible; typically, this means the conf/ directory of the Cassandra source tree should be in the classpath–see here for a typical launch script that sets all those. As of 0.8.2, you will need to set the data_file_directories and commitlog_directory directives in said cassandra.yaml to accessible directories, but not ones of an existing Cassandra node. (This will be fixed in 0.8.3, but in the meantime using /tmp for both is a good idea.) The only useful property you need to set up for SSTableSimpleUnsortedWriter is the partitioner you want to use.
Once You have sstables in place the further step is to load these sstables to Cassandra cluster,Here the sstable loader comes in picture.

2) Loading sstables with sstableloader
Configuration :
  1. To learn the topology of the cluster, the number of nodes, which ranges of keys each node is responsible for, the schema, etc., sstableloader uses the Cassandra gossip subsystem. It thus requires a directory containing a cassandra.yaml configuration file in the classpath
  2. In this config file, the listen_address, storage_port, rpc_address and rpc_port should be set correctly to communicate with the cluster, and at least one node of the cluster you want to load data in should be configured as seed. The rest is ignored for the purposes of sstableloader.
  3. Note that the schema for the column families to be loaded should be defined beforehand, using you prefered method: CLI, thrift or CQL.
  4. Remove the initial token in Cassandra.yaml
  5. Change the listen_address and rpc_address in Cassandra.yml to private ip of the loader machine.
Running Sstable-Loader :
We will need cassandra 0.8 or newer version to run the sstable –loader (We are using cassandra0.8.2 ) 
  1. With sstableloader, you first need the sstables–only the -Data and -Index components are required, the others (-Statistics and -Filter) will be ignored
  2. These sstables have to be in a directory whose name is the name of the keyspace of the sstables. This is how they will be stored in either the main data directory, or a snapshot.
    Say
    TestKeyspace is the name  our keyspace, then will require all the sstables in the directory named TestKeyspace
  3. Then, assuming sstableloader is configured to talk to your multi-node cluster:
        Go to CASSANDRA_HOME/bin and say
        $ ./sstableloader TestKeyspace
        This will start loading sstables to the cluster sstable loader configured to talk
3) Improving performance of generation and loading
  1. If the JVM and  GC tuning is not done properly you will not experience the power of this utility. For the generation while running the program you should provide the appropriate -Xmx and -Xms value. As sstablewriter keeps writing into  heap  still sstable reach  buffer size, providing the sufficient heap is very essential. Too small size might result into Out of memory error.

    Java  -ea -cp $CLASSPATH  –Xms3000M –Xmx3000M \
    -Dlog4j.configuration=log4j-tools.properties
    your.program.for.SstableGenerator

    I have experienced the better result with 3000MB of heap size for 300MB of buffer size.
  2. Also this is very essential to write the row once and all the columns following it and not repeating call to newRow for the same row and different column again and again. For that you may iterate over certain no of rows and store the columns for the each row in some Collection may be Map. And then you can call to newRow() once and addColumn() for all the columns for that row.I have observed  50 % of performance growth with this approach.
    For Example:

    for (...each row and corresponding columns ...)

    {
    columnFamilyWriter.newRow(bytes(uuid));

        columnFamilyWriter.addColumn(bytes("firstname"), bytes(entry.firstname), timestamp);
        columnFamilyWriter.addColumn(bytes("lastname"), bytes(entry.lastname), timestamp);
        columnFamilyWriter.addColumn(bytes("passsword"), bytes(entry.password), timestamp);
        columnFamilyWriter.addColumn(bytes("age"), bytes(entry.age), timestamp);
        columnFamilyWriter.addColumn(bytes("email"), bytes(entry.email), timestamp);}
  3. Also while loading you should have allocated sufficient heap to sstableloader for better performance. I have observed  better performance with 4000MB of heap for 60 GB of sstables.
    In our case 60 GB of sstables were loaded in 20-25 mins .
    To set the -Xmx and -Xms   for sstableloader:

        a. Open file "CASSANDRA_HOME/bin/sstableloader.sh"
        b. search for
            $JAVA -ea -cp $CLASSPATH -Xmx256M \
            -Dlog4j.configuration=log4j-tools.properties \
            org.apache.cassandra.tools.BulkLoader "$@"
        c. in above part of the file change –Xmx and add –Xms. Have same value for both option.
4)  Tuning  generation and loading for optimize performance of the Cassandra
  1. Deciding on an appropriate value of the Buffer is very important part of the sstable generation.
  2. As setting this buffer size low results into faster generation and loading, but it puts load on cassndra and one should not set it to smaller value.
  3. With small buffer size the large no of small size sstables will be generated which when loaded into the Cassandra takes too much time to complete compaction and puts load on Cassandra cluster.
  4. If the size of sstable generated is “X” before loading then the size of sstable you will find after loading into Cassandra is
    (X * replication_factor) /( number_of_nodes_into_cluster)

    You can observe that as the no of nodes increases the size of sstable will be smaller .This will help you to decide the buffer size as per you cluster size.
  5. Also you can turn minor compaction off while loading data into Cassandra for faster loading.(You can do this with nodetool setcompactionthreshold)
  6. While loading is in progress the data becomes available only after the rebuilding of indexes is completed(Won’t  take that much time).
5) Advantage of sstableloader utility over other bulk loading  techniques.
  1. It do not put much load on loader machine as well as Cassandra cluster.
  2. Of course it’s  faster than any other loading methods.
  3. In all other bulk loading techniques you keep hitting Cassandra all the time. On the other hand in sstableloader while generating the sstables you do not put a load on Cassandra cluster(Provided you are not using the same machine on which Cassandra is running to generate the sstables)
  4. In our case to generate 140 GB of sstables from text data of 70GB it took 3 hours, and loading is completed in just an hour. That is we are hitting the Cassandra cluster just for an Hour to load the data.
 References:

Bulk Loading Data into Cassandra Using SSTableLoader

Bulk Loading Data into Cassandra Using SSTableLoader

Why Use SSTableLoader:
                        When you want to move the data from any database to Cassandra database the best option is SSTableloader in Cassandra. By using this we can transfer the data very fast.

Steps to loading the data into Cassandra:

  • Create Keyspace in the Casssandra.
  • Create table based on your requirement using CQLSH.
  • Create a .csv file from the existing data 
  •       Then use SSTableloader move the data into Cassandra.
          Step1: Creating Keyspace
                CREATE KEYSPACE sample WITH REPLICATION = {‘class’ : 'SimpleStrategy', 'replication_factor' : 1 };
Step 2: Creating table based on your requirement .
              CREATE TABLE sample.users (
            key uuid,
            firstname ascii,
            lastname ascii,
            password ascii,
            age ascii,
            email ascii,
            PRIMARY KEY (key, firstname));  
     
               In the above i am creating table users .Primary keys are key and firstname.
Step 3:

Creating the .csv based on your table.

How to create CSV file using Java:

Sample program to create CsvFile:
import java.io.FileWriter;
public class CreateCsv {
       public static void main(String[] args) {
              generateCsvFile("E:/csv/records.csv");
       }
       public static void generateCsvFile(String csvName) {
              try {
                     FileWriter writer = new FileWriter(csvName);
                     for (int i = 0; i < 1000000; i++) {
                          
                           writer.append(Integer.toString(i));
                           writer.append(',');
                           writer.append("26");
                           writer.append('\n');
                          
                     }
                     writer.flush();
                     writer.close();
                     System.out.println("Success");
              } catch (Exception e) {
                     e.printStackTrace();
              }
       }
}

These are mandatory steps after the  create project for sstableloader

·        In the project to upload the all the jars of Cassandra. These jars all are available in lib folder and  tools folder of Cassandra tar or zip file provided by the Datastax.
·        And also upload the Cassandra.yaml file of conf folder in Cassandra tar or zip file of Datastax.
·         And also upload the .csv file to the project.For example I put the sstable.csv in my project.

Using the Cassandra Bulk Loader, Updated

We introduced sstableloader back in 0.8.1, in order to do bulk loading data into Cassandra. When it was first introduced, we wrote a blog post about its usage along with generating SSTable to bulk load.
Now, Cassandra version 2.1.0 was released, and bulk loading has been evolved since the old blog post. Let’s see how the change makes our life easier than before.

What’s changed?

Specific changes are:
  • sstableloader no longer participates in gossip membership to get schema and ring information. Instead, it just contacts one of the nodes in the cluster and ask for it. This allows you to bulk load from the same machine where cassandra is running, since it no longer listens at the same port with cassandra.
  • Internally, streaming protocol is re-designed. You can stream data more efficiently than before.
  • New CQLSSTableWriter is introduced(CASSANDRA-5894). You can now create SSTables using familiar CQL.
In the old post, we showed two scenarios where sstableloader is used. Let’s see how the changes work in those scenes.
I use Apache Cassandra ver 2.1.0 through out this example, from cluster to running sstableloader.

Example 1 – Loading existing SSTables

Usage of sstableloader has not changed much, but because it has to contact the node to get schema for loading SSTables, you have to specify the address(es) of the node by using -d option.
So for example, you want to bulk load to
$ bin/sstableloader -d 127.0.0.1 ~/Keyspace1/Standard1-cb5e6f30458811e49349511b628b066f
Established connection to initial hosts
Opening sstables and calculating sections to stream
Streaming relevant part of /data/Keyspace1/Standard1-cb5e6f30458811e49349511b628b066f/Keyspace1-Standard1-ka-6-Data.db /data/Keyspace1/Standard1-cb5e6f30458811e49349511b628b066f/Keyspace1-Standard1-ka-5-Data.db to [/127.0.0.1, /127.0.0.2, /127.0.0.3]
progress: [/127.0.0.1]0:2/2 100% [/127.0.0.2]0:2/2 100% [/127.0.0.3]0:2/2 100% total: 100% 0  MB/s(avg: 5 MB/s)
Summary statistics:
   Connections per host:         : 1
   Total files transferred:      : 6
   Total bytes transferred:      : 98802914
   Total duration (ms):          : 9455
   Average transfer rate (MB/s): : 5
   Peak transfer rate (MB/s):    : 11
As you can see, some stats are printed out after the bulk load.

Example 2 – Loading external data

Previously, we had example that creates SSTables from CSV using UnsortedSimpleSSTableWriter and uses sstableloader to load it to Cassandra cluster in the old post.
Schema there is created with thrift, and it has a simple, flat table structure.
For this updated post, let’s do more complex scenario with new CQLSSTableWriter.
We will create real data from Yahoo! Finance to load historical prices of stocks in time-series manner.

Schema Definition

If we take a look at CSV file for Yahoo!(YHOO), it has 7 fields in it.
Date,Open,High,Low,Close,Volume,Adj Close
2014-09-25,39.56,39.80,38.82,38.95,35859400,38.95
Let’s use ticker symbol as our partition key, and ‘Date’ field as clustering key.
So schema looks like:
We use CLUSTERING ORDER BY to query recent data easily.

Generating SSTable using CQLSSTableWriter

How do you bulk load data to such a schema? If you choose to use UnsortedSimpleSSTableWriter as we did in the old post, you have to manually construct each cell of complex type to fit to your CQL3 schema. This requires you to have deep knowledge of how CQL3 works internally.
Enter CQLSSTableWriter.
All you need is DDL for table you want to bulk load, and INSERT statement to insert data to it.
// Prepare SSTable writer
CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder();
// set output directory
builder.inDirectory(outputDir)
       // set target schema
       .forTable(SCHEMA)
       // set CQL statement to put data
       .using(INSERT_STMT)
       // set partitioner if needed
       // default is Murmur3Partitioner so set if you use different one.
       .withPartitioner(new Murmur3Partitioner());
CQLSSTableWriter writer = builder.build();
// …snip…
while ((line = csvReader.read()) != null)
{
    // We use Java types here based on
    // http://www.datastax.com/drivers/java/2.0/com/datastax/driver/core/DataType.Name.html#asJavaClass%28%29
    writer.addRow(ticker,
                  DATE_FORMAT.parse(line.get(0)),
                  new BigDecimal(line.get(1)),
                  new BigDecimal(line.get(2)),
                  new BigDecimal(line.get(3)),
                  new BigDecimal(line.get(4)),
                  Long.parseLong(line.get(5)),
                  new BigDecimal(line.get(6)));
}
writer.close();
You can see complete example on my github.
After you generating SSTable, you can just use sstableloader to target cluster as described before.
There are still some limitations in CQLSSTableWriter, like you cannot use it in parallel, or user defined types are not supported yet.
But we keep improving so stay tuned to Apache JIRA.

Wrap up

Generating SSTable and bulk loading have been improved over the past release. There are many new features available to make your life easier.
Start experimenting by yourself today!