Cassandra Bulk Loader
Bulk Loading into Cassandra cluster was used to be a difficult task. But
if you have all the data to be inserted in place , loading data to the
Cassandra is going to be a cakewalk with utility that Cassandra has
introduced in Cassandra-0.8.1.
I tried inserting the data to the Cassandra using Cassandra Java Client Pelops and Python Client Pycassa. But for the heavy data load we have (70 GB of text data) we found that both of these libraries are not helpful.
After lots of effort on Pycassa and Pelops we tried the sstableloader
and this utility is more robust and time efficient. Sstableloader
utility loads the available or generated sstables into Cassandra .
In this Blog I am going to cover
- Generating sstables
- Loading sstables with sstableloader
- Improving performance of generation and loading
- Tuning generation for optimize performance of the Cassandra
- Advantage of sstableloader utility over other bulk loading techniques.
Sstableloader:
Cassandra
0.8.1 introduces a new tool sstableloader to load the sstables to
Cassandra and this is the fastest way to insert the data into Cassandra.
( Later in this document we will see how to use sstableloader )
sstableloader is a tool that, given a set of sstable data files, streams them to a live cluster. It does not
simply copy the set of sstables to every node, but only transfers the
relevant part of the data to each, conforming to the replication
strategy of the cluster.So the sstable needs the data to be in the
sstable format,Lets see how to generate sstables from the row or text
data.
Why Use SStable Generator?
Ssstableloader is the best way to insert the data into Cassandra and it requires the data in the form of sstables.
We have the raw( text ) data which is not in the form of sstables. So we need to create sstables from this raw data .
So to convert this raw data into the sstables the simplest solution is the new Java class SSTableSimpleUnsortedWriter introduced in Cassandra 0.8.2.
1)
Generating SSTables
SSTableSimpleUnsortedWriter API
org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter
a) Creating Writer :
Here we get the writer which creates sstable for a
given column family of given keyspace
SSTableSimpleUnsortedWriter
columnFamilyWriter = new SSTableSimpleUnsortedWriter(
directory,
keyspace,
"ColumnFamilyName",
Utf8.instance,
null,
64);
- The directory is the directory where to put the sstables ,this directory is object of File class in java
- Keyspace is the keyspace of the column families (a String).
- Next, there are the column family name and the comparator and sub-columns comparator–here, we don’t use super columns so the sub-columns comparator is null.
- The last parameter is a “buffer” size:
- SSTableSimpleUnsortedWriter buffers whatever input it gets in memory and “flush” everything in one sstable once the buffer size is reached.
- That is, the resulting sstables will be approximately of size equal to buffer.
b) Writing to sstables :
Here using above writer we populates rows in
column family with column names and value specified.
for (...each Row ,curresponding Column and Value)
{
columnFamilyWriter.newRow(byte(RowKey
));//byte converts String into ByteBuffer columnFamilyWriter.addColumn(bytes("ColumnName"),
bytes(“ColumnValue”),
timestamp);
}
c) Writing to sstables
with TTL
:
TTL
is time to live .it is provided in seconds and the column will be expired after the given seconds.
for (...each Row ,curresponding Column and Value)
{
columnFamilyWriter.newRow(byte(RowKey
));//byte converts String into ByteBuffer columnFamilyWriter.
addExpiringColumn(bytes("ColumnName"),
bytes(“ColumnValue”), timestamp,
ttl,
expirationTimestampMS );
}
Note: expirationTimestampMS -this is the server time timestamp used for actually
expiring the column.It should be (insertion time in milliseconds + TTL in milliseconds)
d) Closing sstablewriter
:
columnFamilyWriter.close();
Things to remember to compile and run the program for sstables generation:
To compile this file the Cassandra jar (>= 0.8.2) needs
to be in the classpath (javac
-cp <path_to>/apache-cassandra-0.8.2.jar DataImportExample.java).
To run it, the Cassandra jar needs to be present as well as the jar of the
librairies used by Cassandra (those in the lib/ directory of Cassandra source tree). Valid cassandra.yaml and log4j configuration files
should also be accessible; typically, this means the conf/ directory of the
Cassandra source tree should be in the classpath–see here
for a typical launch script that sets all those. As of 0.8.2, you will need to
set the data_file_directories
and commitlog_directory
directives in said cassandra.yaml
to accessible directories, but not ones of an existing Cassandra node.
(This will be fixed
in 0.8.3, but in the meantime using /tmp for both is a good idea.) The only useful
property you need to set up for SSTableSimpleUnsortedWriter
is the partitioner you want to use.
Once You have sstables in place the further step
is to load these sstables to Cassandra cluster,Here the sstable loader comes in
picture.
2) Loading sstables with sstableloader
Configuration :
- To learn the topology of the cluster, the number of nodes, which ranges of keys each node is responsible for, the schema, etc., sstableloader uses the Cassandra gossip subsystem. It thus requires a directory containing a cassandra.yaml configuration file in the classpath
- In this config file, the listen_address, storage_port, rpc_address and rpc_port should be set correctly to communicate with the cluster, and at least one node of the cluster you want to load data in should be configured as seed. The rest is ignored for the purposes of sstableloader.
- Note that the schema for the column families to be loaded should be defined beforehand, using you prefered method: CLI, thrift or CQL.
- Remove the initial token in Cassandra.yaml
- Change the listen_address and rpc_address in Cassandra.yml to private ip of the loader machine.
Running Sstable-Loader :
We will need cassandra 0.8 or newer version to run the sstable –loader (We are using cassandra0.8.2 )
- With sstableloader, you first need the sstables–only the -Data and -Index components are required, the others (-Statistics and -Filter) will be ignored
- These sstables have to
be in a directory whose name is the name of the keyspace of the sstables. This
is how they will be stored in either the main data directory, or a snapshot.
SayTestKeyspace is the name our keyspace, then will require all the sstables in the directory named TestKeyspace
- Then, assuming sstableloader
is configured to talk to your multi-node cluster:
Go to CASSANDRA_HOME/bin and say
$ ./sstableloader TestKeyspace
This will start loading sstables to the cluster sstable loader configured to talk
3) Improving
performance of generation and loading
- If the JVM and GC tuning is not done properly you will not
experience the power of this utility. For the generation while running the
program you should provide the appropriate -Xmx
and -Xms value. As sstablewriter
keeps writing into heap still sstable reach buffer size, providing the sufficient heap is
very essential. Too small size might result into Out of memory error.
Java -ea -cp $CLASSPATH –Xms3000M –Xmx3000M \
-Dlog4j.configuration=log4j-tools.properties
your.program.for.SstableGenerator
I have experienced the better result with 3000MB of heap size for 300MB of buffer size. - Also this is very essential to
write the row once and all the columns following it and not repeating call to
newRow for the same row and different column again and again. For that you may
iterate over certain no of rows and store the columns for the each row in some
Collection may be Map. And then you can call to newRow() once and addColumn()
for all the columns for that row.I have observed 50 % of performance growth with this
approach.
For Example:
for (...each row and corresponding columns ...)
{
columnFamilyWriter.newRow(bytes(uuid));
columnFamilyWriter.addColumn(bytes("firstname"), bytes(entry.firstname), timestamp);
columnFamilyWriter.addColumn(bytes("lastname"), bytes(entry.lastname), timestamp);
columnFamilyWriter.addColumn(bytes("passsword"), bytes(entry.password), timestamp);
columnFamilyWriter.addColumn(bytes("age"), bytes(entry.age), timestamp);
columnFamilyWriter.addColumn(bytes("email"), bytes(entry.email), timestamp);}
- Also while loading you should have
allocated sufficient heap to sstableloader for better performance. I have
observed better performance with 4000MB
of heap for 60 GB of sstables.
In our case 60 GB of sstables were loaded in 20-25 mins .
To set the -Xmx and -Xms for sstableloader:
a. Open file "CASSANDRA_HOME/bin/sstableloader.sh"
b. search for
$JAVA -ea -cp $CLASSPATH -Xmx256M \
-Dlog4j.configuration=log4j-tools.properties \
org.apache.cassandra.tools.BulkLoader "$@"
c. in above part of the file change –Xmx and add –Xms. Have same value for both option.
4) Tuning generation and loading for optimize
performance of the Cassandra
- Deciding on an appropriate value of the Buffer is very important part of the sstable generation.
- As setting this buffer size low results into faster generation and loading, but it puts load on cassndra and one should not set it to smaller value.
- With small buffer size the large no of small size sstables will be generated which when loaded into the Cassandra takes too much time to complete compaction and puts load on Cassandra cluster.
- If the size of sstable generated
is “X” before loading then the size of sstable you will find after loading into
Cassandra is
(X * replication_factor) /( number_of_nodes_into_cluster)
You can observe that as the no of nodes increases the size of sstable will be smaller .This will help you to decide the buffer size as per you cluster size. - Also you can turn minor compaction off while loading data into Cassandra for faster loading.(You can do this with nodetool setcompactionthreshold)
- While loading is in progress the data becomes available only after the rebuilding of indexes is completed(Won’t take that much time).
5) Advantage
of sstableloader utility over other bulk loading techniques.
- It do not put much load on loader machine as well as Cassandra cluster.
- Of course it’s faster than any other loading methods.
- In all other bulk loading techniques you keep hitting Cassandra all the time. On the other hand in sstableloader while generating the sstables you do not put a load on Cassandra cluster(Provided you are not using the same machine on which Cassandra is running to generate the sstables)
- In our case to generate 140 GB of sstables from text data of 70GB it took 3 hours, and loading is completed in just an hour. That is we are hitting the Cassandra cluster just for an Hour to load the data.
References: