Friday, June 22, 2012

High-availability architecure with mysql replication and C3P0

I've recently had a requirement of setting up the application to work in high-availability environment, which among the others involved hardiness for database (mysql) crash. We established with the customer that we want to have a second database server, where the mysql database mirror will be maintained, using master-slave mysql replication. In this model the master mysql server takes care of synchronizing records from itself to the slave server, and the slave server is available in fail-scenario for the application in read-only mode.

This is about database, but what about the code? If you establish a database connection each time the request comes, it sound easy, but we are talking about the Tomcat/Spring/Hibernate application with SessionFactory connected to polled JDBC datasource through c3p0 connection polling libraryHowever, I realized that c3p0 allows to change the JDBC connection params on-the-fly, and when it is done, it resets the existing poll. In such scenario, new connection acquiring requests will be directed to empty poll, and new connections will be established with overwritten parameters.

Therefore, the emerging idea was just to implement the connection supervisor, which can change the main c3p0 ComboPooledDataSource settings, when one of databases is unavailable. The implementation is below.

First class is only the connection configuration class:
import com.mchange.v2.c3p0.ComboPooledDataSource;

public class C3P0SupervisorConnectionConfig {

  protected String user;
  protected String password;
  protected String jdbcUrl;
  
  /** Whether connection is read-only. */
  protected boolean readonly = false;
  
  public C3P0SupervisorConnectionConfig() {
    super();
  }
  
  public C3P0SupervisorConnectionConfig(ComboPooledDataSource pds, 
      boolean readonly) {
    this.user = pds.getUser();
    this.password = pds.getPassword();
    this.jdbcUrl = pds.getJdbcUrl();
    this.readonly = readonly;
  }

  /** Let's omit here the getters and setters, doing nothing */

}
Then the supervisor itself.
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Map;
import java.util.TreeMap;

import org.apache.log4j.Logger;
import org.quartz.JobDetail;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.InitializingBean;

import com.mchange.v2.c3p0.ComboPooledDataSource;

public class C3P0Supervisor implements InitializingBean {
  
  public static final Logger logger = Logger.getLogger(C3P0Supervisor.class);
  
  /**
   * Connections configuration with priorities.
   */
  protected Map<Integer, C3P0SupervisorConnectionConfig> connections;
  
  /**
   * Main datasource. 
   */
  protected ComboPooledDataSource pooledDataSource;
  
  /**
   * Which one is live currently.
   */
  protected int live = 0;

  @Override
  public void afterPropertiesSet() throws Exception {
    if (pooledDataSource==null)
      throw new BeanInitializationException("Pooled datasource cannot be null for C3P0Supervisor");
    
    if (connections!=null)
      connections = new TreeMap<Integer, C3P0SupervisorConnectionConfig>(connections);
    
    if (connections.get(0)==null) 
      connections.put(0, new C3P0SupervisorConnectionConfig(pooledDataSource, false));
  }
  
  public void poll() {
    if (logger.isDebugEnabled())
      logger.debug("Starts polling JDBC connections...");
    
    for (Integer priority: connections.keySet()) {
      C3P0SupervisorConnectionConfig cfg = connections.get(priority); 
      if (test(cfg)) {
        
        // the connection was tested and it's current connection
        if (live == priority)
          return;
        
        // something happened
        else {
          if (priority<live) {
            if (logger.isInfoEnabled())
              logger.info(String.format("Higher priority connection: %d than " +
                  "current: %d is available again, reconnecting to: %s", 
                priority, live, cfg.getJdbcUrl()));
          } else {
            if (logger.isInfoEnabled())
              logger.info(String.format("Higher priority connection: %d is currently unavailable, " +
                  "switching to: %d and reconnecting to: %s", 
                live, priority, cfg.getJdbcUrl()));
          }
              
          switchTo(priority, cfg);
          return;
        }
        
      }
    }
    
    live = Integer.MAX_VALUE;
  }
  
  protected boolean test(C3P0SupervisorConnectionConfig cfg) {
    Connection c = null;
    
    try {
      c = DriverManager.getConnection(cfg.getJdbcUrl(), cfg.getUser(), cfg.getPassword());
    } catch (SQLException e) {
      return false;
    } finally {
      try {
        if (c!=null)
          c.close();
      } catch (Exception e) {
        logger.error("Cannot close connection", e);
        return false;
      }
    }
    
    return true;
  }
  
  protected void switchTo(int priority, C3P0SupervisorConnectionConfig cfg) {
    if (logger.isDebugEnabled())
      logger.debug(String.format("Switching to priority=%d: %s", priority, cfg.getJdbcUrl()));
    
    synchronized (pooledDataSource) {
      live = priority;
      pooledDataSource.setJdbcUrl(cfg.getJdbcUrl());
      pooledDataSource.setUser(cfg.getUser());
      pooledDataSource.setPassword(cfg.getPassword());
    }

    // TODO Handle readonly connections
  }

  public Map<Integer, C3P0SupervisorConnectionConfig> getConnections() {
    return connections;
  }

  public void setConnections(Map<Integer, C3P0SupervisorConnectionConfig> connections) {
    this.connections = connections;
  }

  public ComboPooledDataSource getPooledDataSource() {
    return pooledDataSource;
  }

  public void setPooledDataSource(ComboPooledDataSource pooledDataSource) {
    this.pooledDataSource = pooledDataSource;
  }

}
The supervisor takes the bunch of prioritized JDBC configurations, and when the 0-priotity connection fails (which is the source ComboPooledDataSource connection), it tries to connect to another connection, accordingly to their priorites. When the higher priority (lower priority number) connection is back, it reconnects main datasource to the higher priority connection.

The readonly parameter is for further usage in application. One should, depending on this param, "do something" in application, like eg. remove "Save" buttons, or generate some message, that application works in read-only mode.

Now, what remains is to configure the supervisor bean and the Quartz Scheduler job detail (or whatever scheduling technology the application uses). Here is the exemplary bean config together with quartz part:
  <!-- the main application datasource -->

  <bean id="mainDataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource" destroy-method="close">
    <property name="user" value="${db.main.username}"/>
    <property name="password" value="${db.main.password}"/>
    <property name="jdbcUrl" value="${db.main.url}"/>
    <property name="driverClass" value="${db.driverClassName}"/>
  </bean>

  <!-- the supervisor bean -->

  <bean name="c3p0Supervisor" class="C3P0Supervisor">
    <property name="pooledDataSource" ref="mainDataSource"></property>
    <property name="connections">
    <map>
      <entry key="1">
        <bean class="C3P0SupervisorConnectionConfig">
        <property name="user" value="${db.spare.username}"/>
        <property name="password" value="${db.spare.password}"/>
        <property name="jdbcUrl" value="${db.spare.url}"/>
          <property name="readonly" value="true"></property>
        </bean>
      </entry>
    </map>
    </property>
  </bean>

  <!-- the quartz job and scheduler -->

  <bean id="c3p0SupervisorTask" 
      class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">
    <property name="targetObject" ref="c3p0Supervisor" />
    <property name="targetMethod" value="poll" />
    <property name="concurrent" value="false" /> 
  </bean>
  <bean id="c3p0SupervisorTrigger" class="org.springframework.scheduling.quartz.SimpleTriggerBean">
    <property name="jobDetail" ref="c3p0SupervisorTask" />
    <property name="repeatInterval" value="60000" />
  </bean>

  <bean id="schedulerFactoryBean" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
    <property name="triggers">
    <list>
      <ref bean="c3p0SupervisorTrigger" />
    </list>
    </property>
  </bean>
Using the config above, after the first database failure, the application won't respond during 60secs and then will be switched to the second database. The "unavailibility time" depends on the scheduler repeatInterval settings.