A Grails Plugin for Sharding
Background
A while back Rob Woollam and I began a project to build a scalable web application for RezzMap. We evaluated various technologies and decided on the Grails Framework because of it’s ability to quickly make progress. Having built several scalable applications in the past I knew that building a system the scales horizontally would allow for easy growth in the future. Given that and my use of database sharding in the past we started looking at how we would approach that in Grails. At first we found little on the subject beyond a conversation that discussed using Hibernate Shards within Grails. In this thread the was reference to a blog post by Lee Butts that showed how to use a “switchable datasource” (an implementation of Spring’s AbstractRoutingDataSource) to change the connection of a datasource on the fly. So we took this blog posting and built a plugin that allows us to have shards of data within Grails based on a user account. I have since pulled most of this logic out into a plugin that we are sharing with the Grails community. Over time we will integrate the more advanced features of our internal plugin but I wanted to get a more basic version out there for feedback before complicating the plugin.
Shard Selection and Resolution
There are a couple of high level notes about the implementation before getting into installation and configuration of the plugin. Typically when talking about database sharding there are a couple of decisions an implementation needs to make:
- How do we assign a shard to a new object? (Shard Selection)
- How do we resolve the shard that a current object lives in? (Shard Resolution)
The choices we made for our plugin are to use what we call an Index database that holds a minimum of two tables. One which the plugin creates is called which contains a record for every shard in the system along with a capacity and usage field. This table is queried every time a new object, in our case User, is created within the system. We assign the new object to the shard with the lowest usage to capacity ratio. This allows for shards to be located on different types of hardware that should take a smaller or larger number of objects. The other table is supplied by the application using the plugin and provides a mapping of the object to the shard to be used. Then whenever a request begins for an object the application should query this table and retrieve the shard to use and then pass that to the plugin to switch to that database.
Using the Sharding plugin
Start by creating a new Grails application with the Sharding plugin (now at version 0.1), assuming Grails 1.2.1 is installed somewhere :
grails create-app ShardingExample
cd ShardingExample
grails install-plugin sharding
Next lte’s need to create a couple of domain classes:
grails create-domain-class UserIndex
grails create-domain-class Comment
Create some simple properties for the domain objects:
grails-app/domain/UserIndex.groovy
class UserIndex {
String userName
String shard
static constraints = {
}
}
grails-app/domain/Comment.groovy
class Comment {
Integer userIndexId
String comment
static constraints = {
}
}
As you can see UserIndex associates a userName with a shard, this allows the application to identify the owning shard for a given user and then switch to that shard. The Comment object is just a sample piece of data we might store with a user. The next thing to do is create a definition of the databases that will hold the applications data. For this example there will be three databases an Index database and two Shard databases. When using the plugin there must be exactly one Index database and as many Shard databases as necessary. Here is the configuration file used to define the databases (obviously you may need to change your connection settings):
grails-app/conf/Shards.groovy:
index = {
domainClass('UserIndex')
shardNameFieldName('shard')
name('shardINDEX')
user('root')
password('PASSWORD')
driverClass('com.mysql.jdbc.Driver')
jdbcUrl('jdbc:mysql://localhost:3306/shardINDEX')
dialect(org.hibernate.dialect.MySQL5InnoDBDialect)
}
shards = {
shard_01 {
name('shard1001')
user('root')
password('PASSWORD')
driverClass('com.mysql.jdbc.Driver')
capacity(1000)
jdbcUrl('jdbc:mysql://localhost:3306/shard1001')
}
shard_02 {
name('shard1002')
user('root')
password('PASSWORD')
driverClass('com.mysql.jdbc.Driver')
capacity(1000)
jdbcUrl('jdbc:mysql://localhost:3306/shard1002')
}
}
Next create the three schema’s in the database (in this example we are using MySQL):
create schema shardINDEX;
create schema shard1001;
create schema shard1002;
Add a dependancy for the database driver you are using to grails-app/conf/BuildConfig.groovy dependencies section (in this case MySql):
dependencies {
runtime 'mysql:mysql-connector-java:5.1.5'
}
Now create the default templates for both UserIndex and Comment:
grails generate-all UserIndex
grails generate-all Comment
Add the following closures to grails-app/controllers/UserIndexController.groovy (poor man’s login but you get the idea):
def login = {
session.userName = params.userName
render "User logged in."
}
def logout = {
session.userName = null
render "User logged out"
}
And modify the grails-app.controllers/controllers/CommentController.groovy (this will make use of the “logged in” users name and switch to the appropriate shard):
import UserIndex
class CommentController {
def shardService
static allowedMethods = [save: "POST", update: "POST", delete: "POST"]
def index = {
redirect(action: "list", params: params)
}
def list = {
def user = UserIndex.findByUserName(session.userName)
shardService.changeByObject(user)
params.max = Math.min(params.max ? params.int('max') : 10, 100)
[commentInstanceList: Comment.list(params), commentInstanceTotal: Comment.count()]
}
def create = {
def user = UserIndex.findByUserName(session.userName)
shardService.changeByObject(user)
def commentInstance = new Comment()
commentInstance.properties = params
return [commentInstance: commentInstance]
}
def save = {
def user = UserIndex.findByUserName(session.userName)
shardService.changeByObject(user)
def commentInstance = new Comment(params)
if (commentInstance.save(flush: true)) {
flash.message = "${message(code: 'default.created.message', args: [message(code: 'comment.label', default: 'Comment'), commentInstance.id])}"
redirect(action: "show", id: commentInstance.id)
}
else {
render(view: "create", model: [commentInstance: commentInstance])
}
}
def show = {
def user = UserIndex.findByUserName(session.userName)
shardService.changeByObject(user)
def commentInstance = Comment.get(params.id)
if (!commentInstance) {
flash.message = "${message(code: 'default.not.found.message', args: [message(code: 'comment.label', default: 'Comment'), params.id])}"
redirect(action: "list")
}
else {
[commentInstance: commentInstance]
}
}
def edit = {
def user = UserIndex.findByUserName(session.userName)
shardService.changeByObject(user)
def commentInstance = Comment.get(params.id)
if (!commentInstance) {
flash.message = "${message(code: 'default.not.found.message', args: [message(code: 'comment.label', default: 'Comment'), params.id])}"
redirect(action: "list")
}
else {
return [commentInstance: commentInstance]
}
}
def update = {
def user = UserIndex.findByUserName(session.userName)
shardService.changeByObject(user)
def commentInstance = Comment.get(params.id)
if (commentInstance) {
if (params.version) {
def version = params.version.toLong()
if (commentInstance.version > version) {
commentInstance.errors.rejectValue("version", "default.optimistic.locking.failure", [message(code: 'comment.label', default: 'Comment')] as Object[], "Another user has updated this Comment while you were editing")
render(view: "edit", model: [commentInstance: commentInstance])
return
}
}
commentInstance.properties = params
if (!commentInstance.hasErrors() && commentInstance.save(flush: true)) {
flash.message = "${message(code: 'default.updated.message', args: [message(code: 'comment.label', default: 'Comment'), commentInstance.id])}"
redirect(action: "show", id: commentInstance.id)
}
else {
render(view: "edit", model: [commentInstance: commentInstance])
}
}
else {
flash.message = "${message(code: 'default.not.found.message', args: [message(code: 'comment.label', default: 'Comment'), params.id])}"
redirect(action: "list")
}
}
def delete = {
def user = UserIndex.findByUserName(session.userName)
shardService.changeByObject(user)
def commentInstance = Comment.get(params.id)
if (commentInstance) {
try {
commentInstance.delete(flush: true)
flash.message = "${message(code: 'default.deleted.message', args: [message(code: 'comment.label', default: 'Comment'), params.id])}"
redirect(action: "list")
}
catch (org.springframework.dao.DataIntegrityViolationException e) {
flash.message = "${message(code: 'default.not.deleted.message', args: [message(code: 'comment.label', default: 'Comment'), params.id])}"
redirect(action: "show", id: params.id)
}
}
else {
flash.message = "${message(code: 'default.not.found.message', args: [message(code: 'comment.label', default: 'Comment'), params.id])}"
redirect(action: "list")
}
}
}
Now run the application:
grails run-app
Navigate to http://localhost:8080/ShardingExample/userIndex in your favorite browser and create a couple of users (but leave the shard field empty) when you are done go to the list view and you will see the two new users assigned to different shards:
Now if navigate to the login action we created for one of the new users http://localhost:8080/ShardingExample/userIndex/login?userName=jrick. Once you have gone to the login for the user, navigate to the comment controller http://localhost:8080/ShardingExample/comment and create some comments for the user:
You will then end up with a list that looks like this (http://localhost:8080/ShardingExample/comment/list):
Now switch to the second user created (http://localhost:8080/ShardingExample/userIndex/login?userName=lrick) and then navigate to the comment list view (http://localhost:8080/ShardingExample/comment/list) which is now empty:
Create a comment for this user:
And then the list view (http://localhost:8080/ShardingExample/comment/list) notice the ID field is not unique across shards:
Now switch back http://localhost:8080/ShardingExample/userIndex/login?userName=jrick and go to the list view http://localhost:8080/ShardingExample/comment/list and notice that the two shards are separated:
Example Application
The sample application described above can be obtained here.
Timing and Execution
I went out last night with my wife to the movie Hot Tub Time Machine , which is a fairly low brow comedy about a bunch of guys who end up going back in time using a hot tub. Beyond the discomfort of going to a time travel movie in which the characters go “back” to a time when I was alive, 1986, a minor part of the plot struck me as interesting. At the end of the movie, one of the guys ends up staying behind in 1986 getting to relive the past 24 years with all of the knowledge of what is to come. This of course leads to him being famously successful, owning a company named Lougle (which replaces Google) and having a fabulous life.
This is a common theme in a lot of time travel movies and is generally accepted by most of the viewers as what would happen. I wonder, however, if this is really what would happen. Throwing out all of the usual paradoxes in Hollywood’s idea of time travel and whether your knowledge would change the future. What if you knew the products that would be successful in the future? Would you be as successful as the original inventors and founders of these products and companies? My guess is that you would not because while having a good idea is a key piece of success, when looking at most successful companies they are hardly ever the first to market with the product or the only ones with the product idea.
Google and their innovative search business model, which AltaVista pioneered before them, to the social networking darlings of Facebook, Twitter and MySpace and those that came before them, SixDegrees, Friendster and many others, most truly successful companies where not the first to market. Rather they had two things going for them Timing and Execution. Google won because they executed search and more importantly the paid search business model better than anyone else. Facebook and Twitter and to some extent MySpace, have succeeded more because they came to market at a time when the market was ready for them. This was because the 30, 40 and 50 year olds were ready to embrace the web and talk about and share their lives in a new way. The real challenge in building a business is not coming up with the idea but building a product based on that idea is well designed and executed and is brought to market when the market is ready to embrace it.
Bulk insert into a MySql Database
At RezzMap we use Java and MySQL for the RezzMap application and have the need to bulk insert lots of records into a large table. These inserts have started to become a bottleneck in the processing of the data required to run the service. So I started to look for a better way. Up until this point we were doing the classic batch inserts using a PreparedStatment and executeBatch. But even after adding the parameter rewriteBatchedStatements to our JDBC url the inserts that were too slow. So after reading all over the place that LOAD DATA INFILE was “up to 20 times faster” than batched inserts I was considering doing that. In looking at this solution I wasn’t really thrilled about writing the data out to temporary file and then running SQL to pull from that file. So then did a little digging around and found an old announcement by MySql of a, at the time, new method setLocalInfileInputStream (included in Connector/J 5.1.3 and later):
* New methods on com.mysql.jdbc.Statement: setLocalInfileInputStream() and
getLocalInfileInputStream():
* setLocalInfileInputStream() sets an InputStream instance that will be used
to send data to the MySQL server for a "LOAD DATA LOCAL INFILE"
statement rather than a FileInputStream or URLInputStream that represents the path given as an argument to the statement.
So after reading this the path seemed easy: build a InputStream that has a tab-delimited set of data that I can pass to this method and then call LOAD DATA LOCAL INFILE and get all of the benefits of the bulk loader. To get this working I took the path of least resistance and built up a String using StringBuilder and then used the method toInputStream(String input) of class org.apache.commons.io.IOUtils to convert my String to an InputStream. After doing this I compared the two methods of inserting data on my wimpy Dell Laptop and got the following results:
Here is what a stripped down version of this method looks like:
import com.mysql.jdbc.Connection;
import com.mysql.jdbc.Statement;
import org.apache.commons.io.IOUtils;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
public class DemoBulkInsert {
public void bulkInsert(Connection conn, Long personId, HashMap<String, String> hashOfNameValues) throws SQLException {
// First create a statement off the connection and turn off unique checks and key creation
Statement statement = (com.mysql.jdbc.Statement)conn.createStatement();
statement.execute("SET UNIQUE_CHECKS=0; ");
statement.execute("ALTER TABLE real_big_table DISABLE KEYS");
// Define the query we are going to execute
String statementText = "LOAD DATA LOCAL INFILE 'file.txt' " +
"INTO TABLE real_big_table " +
"(name, value) " +
" SET owner_id = " + personId + ", " +
" version = 0; ";
// Create StringBuilder to String that will become stream
StringBuilder builder = new StringBuilder();
// Iterate over map and create tab-text string
for (Map.Entry<String, String> entry : hashOfNameValues.entrySet()) {
builder.append(entry.getKey());
builder.append('\t');
builder.append(entry.getValue());
builder.append('\n');
}
// Create stream from String Builder
InputStream is = IOUtils.toInputStream(builder.toString());
// Setup our input stream as the source for the local infile
statement.setLocalInfileInputStream(is);
// Execute the load infile
statement.execute(statementText);
// Turn the checks back on
statement.execute("ALTER TABLE affinity ENABLE KEYS");
statement.execute("SET UNIQUE_CHECKS=1; ");
}
}