A Grails Plugin for Sharding

July 13, 2010 9 comments

Background

A while back Rob Woollam and I began a project to build a scalable web application for RezzMap.  We evaluated various technologies and decided on the Grails Framework because of it’s ability to quickly make progress.  Having built several scalable applications in the past I knew that building a system the scales horizontally would allow for easy growth in the future.  Given that and my use of database sharding in the past we started looking at how we would approach that in Grails.  At first we found little on the subject beyond a conversation that discussed using Hibernate Shards within Grails.  In this thread the was reference to a blog post by Lee Butts that showed how to use a “switchable datasource” (an implementation of Spring’s AbstractRoutingDataSource) to change the connection of a datasource on the fly.  So we took this blog posting and built a plugin that allows us to have shards of data within Grails based on a user account.  I have since pulled most of this logic out into a plugin that we are sharing with the Grails community.  Over time we will integrate the more advanced features of our internal plugin but I wanted to get a more basic version out there for feedback before complicating the plugin.

Shard Selection and Resolution

There are a couple of high level notes about the implementation before getting into installation and configuration of the plugin.  Typically when talking about database sharding there are a couple of decisions an implementation needs to make:

  1. How do we assign a shard to a new object?  (Shard Selection)
  2. How do we resolve the shard that a current object lives in? (Shard Resolution)

The choices we made for our plugin are to use what we call an Index database that holds a minimum of two tables.  One which the plugin creates is called which contains a record for every shard in the system along with a capacity and usage field.  This table is queried every time a new object, in our case User, is created within the system.  We assign the new object to the shard with the lowest usage to capacity ratio.  This allows for shards to be located on different types of hardware that should take a smaller or larger number of objects.  The other table is supplied by the application using the plugin and provides a mapping of the object to the shard to be used.  Then whenever a request begins for an object the application should query this table and retrieve the shard to use and then pass that to the plugin to switch to that database.

Using the Sharding plugin

Start by creating a new Grails application with the Sharding plugin (now at version 0.1), assuming Grails 1.2.1 is installed somewhere :


grails create-app ShardingExample

cd ShardingExample

grails install-plugin sharding

Next lte’s need to create a couple of domain classes:

grails create-domain-class UserIndex

grails create-domain-class Comment

Create some simple properties for the domain objects:

grails-app/domain/UserIndex.groovy

class UserIndex {

    String userName

    String shard

    static constraints = {
    }
}

grails-app/domain/Comment.groovy

class Comment {

    Integer userIndexId

    String comment

    static constraints = {
    }
}

As you can see UserIndex associates a userName with a shard, this allows the application to identify the owning shard for a given user and then switch to that shard.   The Comment object is just a sample piece of data we might store with a user.  The next thing to do is create a definition of the databases that will hold the applications data.  For this example there will be three databases an Index database and two Shard databases.  When using the plugin there must be exactly one Index database and as many Shard databases as necessary.  Here is the configuration file used to define the databases (obviously you may need to change your connection settings):

grails-app/conf/Shards.groovy:

index = {
  domainClass('UserIndex')
  shardNameFieldName('shard')

  name('shardINDEX')
  user('root')
  password('PASSWORD')
  driverClass('com.mysql.jdbc.Driver')
  jdbcUrl('jdbc:mysql://localhost:3306/shardINDEX')
  dialect(org.hibernate.dialect.MySQL5InnoDBDialect)
}
shards = {
  shard_01 {
    name('shard1001')
    user('root')
    password('PASSWORD')
    driverClass('com.mysql.jdbc.Driver')
    capacity(1000)
    jdbcUrl('jdbc:mysql://localhost:3306/shard1001')
  }
  shard_02 {
    name('shard1002')
    user('root')
    password('PASSWORD')
    driverClass('com.mysql.jdbc.Driver')
    capacity(1000)
    jdbcUrl('jdbc:mysql://localhost:3306/shard1002')
  }
}

Next create the three schema’s in the database (in this example we are using MySQL):

create schema shardINDEX;

create schema shard1001;

create schema shard1002;

Add a dependancy for the database driver you are using to grails-app/conf/BuildConfig.groovy dependencies section (in this case MySql):

    dependencies {
         runtime 'mysql:mysql-connector-java:5.1.5'
    }

Now create the default templates for both UserIndex and Comment:

grails generate-all UserIndex

grails generate-all Comment

Add the following closures to grails-app/controllers/UserIndexController.groovy (poor man’s login but you get the idea):

    def login = {
    	session.userName = params.userName
    	render "User logged in."
    }

    def logout = {
        session.userName = null
        render "User logged out"
    }

And modify the grails-app.controllers/controllers/CommentController.groovy (this will make use of the “logged in” users name and switch to the appropriate shard):

import UserIndex

class CommentController {
    def shardService

    static allowedMethods = [save: "POST", update: "POST", delete: "POST"]

    def index = {
        redirect(action: "list", params: params)
    }

    def list = {
    	def user = UserIndex.findByUserName(session.userName)
	shardService.changeByObject(user)

        params.max = Math.min(params.max ? params.int('max') : 10, 100)
        [commentInstanceList: Comment.list(params), commentInstanceTotal: Comment.count()]
    }

    def create = {
    	def user = UserIndex.findByUserName(session.userName)
    	shardService.changeByObject(user)

        def commentInstance = new Comment()
        commentInstance.properties = params
        return [commentInstance: commentInstance]
    }

    def save = {
    	def user = UserIndex.findByUserName(session.userName)
    	shardService.changeByObject(user)

        def commentInstance = new Comment(params)
        if (commentInstance.save(flush: true)) {
            flash.message = "${message(code: 'default.created.message', args: [message(code: 'comment.label', default: 'Comment'), commentInstance.id])}"
            redirect(action: "show", id: commentInstance.id)
        }
        else {
            render(view: "create", model: [commentInstance: commentInstance])
        }
    }

    def show = {
    	def user = UserIndex.findByUserName(session.userName)
    	shardService.changeByObject(user)

        def commentInstance = Comment.get(params.id)
        if (!commentInstance) {
            flash.message = "${message(code: 'default.not.found.message', args: [message(code: 'comment.label', default: 'Comment'), params.id])}"
            redirect(action: "list")
        }
        else {
            [commentInstance: commentInstance]
        }
    }

    def edit = {
    	def user = UserIndex.findByUserName(session.userName)
    	shardService.changeByObject(user)

        def commentInstance = Comment.get(params.id)
        if (!commentInstance) {
            flash.message = "${message(code: 'default.not.found.message', args: [message(code: 'comment.label', default: 'Comment'), params.id])}"
            redirect(action: "list")
        }
        else {
            return [commentInstance: commentInstance]
        }
    }

    def update = {
    	def user = UserIndex.findByUserName(session.userName)
    	shardService.changeByObject(user)

        def commentInstance = Comment.get(params.id)
        if (commentInstance) {
            if (params.version) {
                def version = params.version.toLong()
                if (commentInstance.version > version) {

                    commentInstance.errors.rejectValue("version", "default.optimistic.locking.failure", [message(code: 'comment.label', default: 'Comment')] as Object[], "Another user has updated this Comment while you were editing")
                    render(view: "edit", model: [commentInstance: commentInstance])
                    return
                }
            }
            commentInstance.properties = params
            if (!commentInstance.hasErrors() && commentInstance.save(flush: true)) {
                flash.message = "${message(code: 'default.updated.message', args: [message(code: 'comment.label', default: 'Comment'), commentInstance.id])}"
                redirect(action: "show", id: commentInstance.id)
            }
            else {
                render(view: "edit", model: [commentInstance: commentInstance])
            }
        }
        else {
            flash.message = "${message(code: 'default.not.found.message', args: [message(code: 'comment.label', default: 'Comment'), params.id])}"
            redirect(action: "list")
        }
    }

    def delete = {
    	def user = UserIndex.findByUserName(session.userName)
    	shardService.changeByObject(user)

        def commentInstance = Comment.get(params.id)
        if (commentInstance) {
            try {
                commentInstance.delete(flush: true)
                flash.message = "${message(code: 'default.deleted.message', args: [message(code: 'comment.label', default: 'Comment'), params.id])}"
                redirect(action: "list")
            }
            catch (org.springframework.dao.DataIntegrityViolationException e) {
                flash.message = "${message(code: 'default.not.deleted.message', args: [message(code: 'comment.label', default: 'Comment'), params.id])}"
                redirect(action: "show", id: params.id)
            }
        }
        else {
            flash.message = "${message(code: 'default.not.found.message', args: [message(code: 'comment.label', default: 'Comment'), params.id])}"
            redirect(action: "list")
        }
    }
}

Now run the application:

grails run-app

Navigate to http://localhost:8080/ShardingExample/userIndex in your favorite browser and create a couple of users (but leave the shard field empty) when you are done go to the list view and you will see the two new users assigned to different shards:

image

Now if navigate to the login action we created for one of the new users http://localhost:8080/ShardingExample/userIndex/login?userName=jrick.  Once you have gone to the login for the user, navigate to the comment controller http://localhost:8080/ShardingExample/comment and create some comments for the user:

image

You will then end up with a list that looks like this (http://localhost:8080/ShardingExample/comment/list):

image

Now switch to the second user created (http://localhost:8080/ShardingExample/userIndex/login?userName=lrick) and then navigate to the comment list view (http://localhost:8080/ShardingExample/comment/list) which is now empty:

image

Create a comment for this user:

image

And then the list view (http://localhost:8080/ShardingExample/comment/list) notice the ID field is not unique across shards:

image

Now switch back http://localhost:8080/ShardingExample/userIndex/login?userName=jrick and go to the list view http://localhost:8080/ShardingExample/comment/list and notice that the two shards are separated:

image

Example Application

The sample application described above can be obtained here.

Timing and Execution

April 3, 2010 1 comment

I went out last night with my wife to the movie Hot Tub Time Machine , which is a fairly low brow comedy about a bunch of guys who end up going back in time using a hot tub.  Beyond the discomfort of going to a time travel movie in which the characters go “back” to a time when I was alive, 1986, a minor part of the plot struck me as interesting.  At the end of the movie, one of the guys ends up staying behind in 1986 getting to relive the past 24 years with all of the knowledge of what is to come.  This of course leads to him being famously successful, owning a company named Lougle (which replaces Google) and having a fabulous life. 

This is a common theme in a lot of time travel movies and is generally accepted by most of the viewers as what would happen.  I wonder, however, if this is really what would happen.  Throwing out all of the usual paradoxes in Hollywood’s idea of time travel and whether your knowledge would change the future. What if you knew the products that would be successful in the future?  Would you be as successful as the original inventors and founders of these products and companies?  My guess is that  you would not because while having a good idea is a key piece of success, when looking at most successful companies they are hardly ever the first to market with the product or the only ones with the product idea. 

Google and their innovative search business model, which AltaVista pioneered before them, to the social networking darlings of Facebook, Twitter and MySpace and those that came before them, SixDegrees, Friendster and many others, most truly successful companies where not the first to market.  Rather they had two things going for them Timing and Execution.  Google won because they executed search and more importantly the paid search business model better than anyone  else.  Facebook and Twitter and to some extent MySpace, have succeeded more because they came to market at a time when the market was ready for them.  This was because the 30, 40 and 50 year olds were ready to embrace the web and talk about and share their lives in a new way.  The real challenge in building a business is not coming up with the idea but building a product based on that idea is well designed and executed and is brought to market when the market is ready to embrace it.

Bulk insert into a MySql Database

March 23, 2010 10 comments

At RezzMap we use Java and MySQL for the RezzMap application and have the need to bulk insert lots of records into a large table.  These inserts have started to become a bottleneck in the processing of the data required to run the service.  So I started to look for a better way.  Up until this point we were doing the classic batch inserts using a PreparedStatment and executeBatch.  But even after adding the parameter rewriteBatchedStatements to our JDBC url the inserts that were too slow.  So after reading all over the place that LOAD DATA INFILE was “up to 20 times faster” than batched inserts I was considering doing that.  In looking at this solution I wasn’t really thrilled about writing the data out to temporary file and then running SQL to pull from that file.  So then did a little digging around and found an old announcement by MySql of a, at the time, new method setLocalInfileInputStream (included in Connector/J 5.1.3 and later):

* New methods on com.mysql.jdbc.Statement: setLocalInfileInputStream() and
getLocalInfileInputStream():

    * setLocalInfileInputStream() sets an InputStream instance that will be used
     to send data to the MySQL server for a "LOAD DATA LOCAL INFILE"
     statement rather than a FileInputStream or URLInputStream that represents the path given as an argument to the statement.

So after reading this the path seemed easy: build a InputStream that has a tab-delimited set of data that I can pass to this method and then call LOAD DATA LOCAL INFILE and get all of the benefits of the bulk loader.  To get this working I took the path of least resistance and built up a String using StringBuilder and then used the method toInputStream(String input) of class org.apache.commons.io.IOUtils to convert my String to an InputStream.  After doing this I compared the two methods of inserting data on my wimpy Dell Laptop and got the following results:

image

Here is what a stripped down version of this method looks like:

import com.mysql.jdbc.Connection;
import com.mysql.jdbc.Statement;
import org.apache.commons.io.IOUtils;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
public class DemoBulkInsert {
    public void bulkInsert(Connection conn, Long personId, HashMap<String, String> hashOfNameValues) throws SQLException {

            // First create a statement off the connection and turn off unique checks and key creation
            Statement statement = (com.mysql.jdbc.Statement)conn.createStatement();
            statement.execute("SET UNIQUE_CHECKS=0; ");
            statement.execute("ALTER TABLE real_big_table DISABLE KEYS");

            // Define the query we are going to execute
            String statementText = "LOAD DATA LOCAL INFILE 'file.txt' " +
                    "INTO TABLE real_big_table " +
                    "(name, value) " +
                    " SET owner_id = " + personId + ", " +
                    " version = 0; ";

            // Create StringBuilder to String that will become stream
            StringBuilder builder = new StringBuilder();

            // Iterate over map and create tab-text string
            for (Map.Entry<String, String> entry : hashOfNameValues.entrySet()) {
                builder.append(entry.getKey());
                builder.append('\t');
                builder.append(entry.getValue());
                builder.append('\n');
            }

            // Create stream from String Builder
            InputStream is = IOUtils.toInputStream(builder.toString());

            // Setup our input stream as the source for the local infile
            statement.setLocalInfileInputStream(is);

            // Execute the load infile
            statement.execute(statementText);

            // Turn the checks back on
            statement.execute("ALTER TABLE affinity ENABLE KEYS");
            statement.execute("SET UNIQUE_CHECKS=1; ");
    }
}

Follow

Get every new post delivered to your Inbox.