Saturday, June 20, 2015

Distributed hibernate search with ApacheMQ and Spring

This catching title will be not about distributed hibernate search, but something really close :)

The case I've recently solved was a quite different. I have a frontend application that uses hibernate database and hibernate search, and then I needed to add additional application - let's call it an integration server, which exposes some API webservices for the overall system. Integration server uses the same database as the frontend application, and enables clients to put data to the database using its webservices. Both applications exist on two different physical servers, as well.

Everything looks simple unless you start thinking about hibernate search update from the integration server, while the index is located solely on the frontend application side, because only this part of the system uses it. When you put data to the database from the integration server, the frontend application full-text index is not updated, of course. I've been looking for simple solution to overcome this problem.

Firstly, let's take a look at what the hibernate search proposes. It supports distributed hibernate search index, with master-slave replication, where all nodes are connected using JMS. This solution was something I didn't really need because only one node uses the index for searching. Moreover this solution is based on periodical index replication, what causes the index is up-to-date on each node only after some interval. Finally, I didn't like this solution because it uses JNDI, what is not really Spring way to solve the problems (I don't really like JEE, I only like to work with lighweight Java application stacks).

So I figured out the solution with following prerequisites:
  1. I don't want to use replication because I only need to use search on frontend application side.
  2. I want to keep index physically on the side really using it, ie. on the frontend application side.
  3. When the integration server updates database data, I need to update the index.
  4. I don't want to use JNDI.
  5. We are already using ApacheMQ, I want to use it for this solution as well. AMQ broker is already located on the integration server side.
OK, let's delve into the solution. Here is the spring config of important beans on the frontend side:

<?xml version="1.0" encoding="utf-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:amq="http://activemq.apache.org/schema/core"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- Hibernate section -->
<bean id="sessionFactory" class="org.springframework.orm.hibernate4.LocalSessionFactoryBean">
<property name="dataSource" ref="dataSource"/>
<property name="hibernateProperties">
<props>
<prop key="hibernate.search.autoregister_listeners">true</prop>
<prop key="hibernate.search.default.directory_provider">filesystem</prop>
<prop key="hibernate.search.default.indexBase">/var/data/index</prop>
<prop key="hibernate.search.default.worker.backend">lucene</prop>
</props>
</property>
</bean>
<!-- AMQ section -->
<amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://localhost:8082"/>
<!-- CachingConnectionFactory Definition, sessionCacheSize property is the number of sessions to cache -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="amqConnectionFactory" />
<property name="sessionCacheSize" value="10" />
</bean>
<!-- listener container for local hibernate search queue -->
<jms:listener-container connection-factory="amqConnectionFactory" destination-type="queue" concurrency="1">
<jms:listener id="jmsHibernateSearchQueueListener" destination="queue.search"
ref="remoteHibernateSearchController"/>
</jms:listener-container>
</beans>

What do we have here? Standard hibernate session factory on which I'm showing the hibernate search config, that creates and uses local lucene index. Then AMQ connection factory, that connects to the broker running somewhere else. Nothing special. The only interesting bean is RemoteHibernateSearchController, which is derived from standard AbstractJMSHibernateSearchController, that already is a JMS message listener, and only needs to provide hibernate sesssion from our session factory:

import org.hibernate.Session;
import org.hibernate.SessionFactory;
import org.hibernate.search.backend.impl.jms.AbstractJMSHibernateSearchController;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.jms.MessageListener;
@Service
public class RemoteHibernateSearchController extends AbstractJMSHibernateSearchController implements MessageListener {
@Autowired protected SessionFactory sessionFactory;
@Override
protected Session getSession() {
return sessionFactory.openSession();
}
@Override
protected void cleanSessionIfNeeded(Session session) {
session.close();
}
}

Now let's take a look at integration server config, which is a little more interesting:

<?xml version="1.0" encoding="utf-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amq="http://activemq.apache.org/schema/core"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- Hibernate section -->
<bean id="sessionFactory" class="org.springframework.orm.hibernate4.LocalSessionFactoryBean"
depends-on="appContextProvider">
<property name="dataSource" ref="dataSource"/>
<property name="hibernateProperties">
<props>
<prop key="hibernate.search.autoregister_listeners">true</prop>
<prop key="hibernate.search.default.directory_provider">ram</prop>
<prop key="hibernate.search.default.worker.backend">AMQBackendQueueProcessor</prop>
<prop key="hibernate.search.default.worker.jms.queue">queue.search</prop>
</props>
</property>
</bean>
<!-- AMQ section -->
<amq:broker id="amqBroker" useJmx="false" persistent="true"
dataDirectory="/var/data/amq" brokerName="localhost">
<amq:transportConnectors>
<amq:transportConnector uri="tcp://localhost:8082" />
</amq:transportConnectors>
<amq:destinations>
<amq:queue physicalName="queue.search"/>
</amq:destinations>
<amq:destinationPolicy>
<amq:policyMap>
<amq:policyEntries>
<amq:policyEntry topic="queue.search"/>
</amq:policyEntries>
</amq:policyMap>
</amq:destinationPolicy>
</amq:broker>
<!-- Creates an activemq connection factory using the amq namespace -->
<amq:connectionFactory id="amqConnectionFactory" brokerURL="vm://localhost?create=false&amp;waitForStart=60000"/>
<!-- CachingConnectionFactory Definition, sessionCacheSize property is the number of sessions to cache -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="amqConnectionFactory" />
<property name="sessionCacheSize" value="10" />
</bean>
<!-- Initialize static application context provider -->
<bean id="appContextProvider" class="ApplicationContextProvider" depends-on="amqBroker"/>
</beans>

Same session factory, but configured in the other way. As the backend we use AMQBackendQueueProcessor - our own implementation, shown for a while, not the standard "lucene" implementation. Our implementation will delegate all hibernate search insert/update requests to the listnening frontend RemoteHibernateSearchController, through the JMS queue named "queue.search".

I need to mention here a thing. The integration server doesn't use hibernate search for searching at all (it is only insert/update oriented). But if we have enabled hibernate session for integration server, we need to have at least some index to work. This index won't be updated ever (AMQBackendQueueProcessor will delegate all updates to frontend index through JMS) and will never be read. So I decided to use "ram" provider, which holds whole this few-bytes index in RAM memory. You can, anyway, use any implementation you want - this is only a fake index.

AMQ configuration comes then, and we define only one queue here (this config is redundant, but you can add some parametrization to the config made this way). This is the part really starting the broker using TCP transport (in test environment both servers are run on localhost). Note, that AMQ connection factory connects to the (local) broker using VM transport, and doesn't start its own broker itself.

Now few words about the initialization order. We will use a little trick to bind AMQBackendQueueProcessor to Spring for a while, so the order is important. When session factory bean creates the session factory, hibernate search worker backend needs to be able to work right away. Our backend will work using AMQ, so AMQ needs to be initialized before the session factory bean is initialized. In the example it is done by depends-on attribute, and sessionFactory bean depends here on amqBroker bean (indirectly, the dependency goes through appContextProvider bean, which is also required to be initialized when session factory bean starts).

ApplicationContextProvider bean just accomplishes the commons hack to access Spring beans from non-spring aware code:

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
public class ApplicationContextProvider implements ApplicationContextAware {
protected static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
ApplicationContextProvider.applicationContext = applicationContext;
}
public static <T> T getBean(String name, Class<T> clazz) {
return applicationContext.getBean(name, clazz);
}
}

And finally AMQBackendQueueProcessor overrides JNDI-related code of standard JmsBackendQueueProcessor JMS connection factory lookup with spring-based implementation, using our container config and appContextProvider hack:

import org.hibernate.search.backend.impl.jms.JmsBackendQueueProcessor;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import java.util.Properties;
/**
* A bridge between {@link JmsBackendQueueProcessor} and spring-managed AMQ beans.
*/
public class AMQBackendQueueProcessor extends JmsBackendQueueProcessor {
@Override
protected QueueConnectionFactory initializeJMSQueueConnectionFactory(Properties props) {
return ApplicationContextProvider.getBean("amqConnectionFactory", QueueConnectionFactory.class);
}
@Override
protected Queue initializeJMSQueue(QueueConnectionFactory factory, Properties props) {
return new Queue() {
@Override
public String getQueueName() throws JMSException {
return "queue.search";
}
};
}
@Override
protected QueueConnection initializeJMSConnection(QueueConnectionFactory factory, Properties props) {
try {
return factory.createQueueConnection();
} catch (JMSException e) {
throw new RuntimeException("Error initializing JMS queue connection", e);
}
}
}