After upgrading from 3.4 to 3.6 my JGroups code stopped working. On a 2 nodes setup when the second node tried to join I got the following errors:
2015-04-22 21:38:12,510 INFO [ViewHandler,monalisa,tux-7762|fr.mcc.test.TestJGroups] Detected a change in cluster members: [tux-7762, tux-9846]
2015-04-22 21:38:14,460 WARN [main|org.jgroups.protocols.pbcast.GMS] tux-9846: JOIN(tux-9846) sent to tux-7762 timed out (after 2000 ms), on try 1
2015-04-22 21:38:16,463 WARN [main|org.jgroups.protocols.pbcast.GMS] tux-9846: JOIN(tux-9846) sent to tux-7762 timed out (after 2000 ms), on try 2
2015-04-22 21:38:18,466 WARN [main|org.jgroups.protocols.pbcast.GMS] tux-9846: JOIN(tux-9846) sent to tux-7762 timed out (after 2000 ms), on try 3
2015-04-22 21:38:20,468 WARN [main|org.jgroups.protocols.pbcast.GMS] tux-9846: JOIN(tux-9846) sent to tux-7762 timed out (after 2000 ms), on try 4
2015-04-22 21:38:22,471 WARN [main|org.jgroups.protocols.pbcast.GMS] tux-9846: JOIN(tux-9846) sent to tux-7762 timed out (after 2000 ms), on try 5
2015-04-22 21:38:24,473 WARN [main|org.jgroups.protocols.pbcast.GMS] tux-9846: JOIN(tux-9846) sent to tux-7762 timed out (after 2000 ms), on try 6
2015-04-22 21:38:26,475 WARN [main|org.jgroups.protocols.pbcast.GMS] tux-9846: JOIN(tux-9846) sent to tux-7762 timed out (after 2000 ms), on try 7
2015-04-22 21:38:28,479 WARN [main|org.jgroups.protocols.pbcast.GMS] tux-9846: JOIN(tux-9846) sent to tux-7762 timed out (after 2000 ms), on try 8
2015-04-22 21:38:30,482 WARN [main|org.jgroups.protocols.pbcast.GMS] tux-9846: JOIN(tux-9846) sent to tux-7762 timed out (after 2000 ms), on try 9
2015-04-22 21:38:32,485 WARN [main|org.jgroups.protocols.pbcast.GMS] tux-9846: JOIN(tux-9846) sent to tux-7762 timed out (after 2000 ms), on try 10
2015-04-22 21:38:32,485 WARN [main|org.jgroups.protocols.pbcast.GMS] tux-9846: too many JOIN attempts (10): becoming singleton
2015-04-22 21:38:32,493 WARN [Incoming-2,monalisa,tux-9846|org.jgroups.protocols.pbcast.NAKACK2] JGRP000011: tux-9846: dropped message 1 from non-member tux-7762 (view=[tux-9846|0] (1) [tux-9846])
It took some time but finally I managed to write a “simple” (quick and ugly) test to replicate the problem.
<pre lang="java">
package fr.mcc.test;
import java.io.FileInputStream;
import org.apache.log4j.Logger;
import org.jgroups.Address;
import org.jgroups.JChannel;
import org.jgroups.MembershipListener;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.blocks.MessageDispatcher;
import org.jgroups.blocks.RequestHandler;
import org.jgroups.blocks.RequestOptions;
import org.jgroups.blocks.ResponseMode;
import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;
import org.jgroups.util.Util;
public class TestJGroups implements RequestHandler, MembershipListener, Runnable{
private static Logger log = Logger.getLogger(TestJGroups.class);
private static final String CHANNEL_NAME = "monalisa";
private JChannel channel;
private MessageDispatcher dispatcher;
private boolean isCheckingStatus = false;
public void run(){
try {
String config = System.getProperty("config");
log.info("Using config: " + config);
FileInputStream is = new FileInputStream(config);
channel = new JChannel(is);
channel.setDiscardOwnMessages(true);
dispatcher = new MessageDispatcher(channel, null, this, this);
channel.connect(CHANNEL_NAME);
} catch (Exception e) {
log.error("Could not start TestJGroups", e);
}
}
public void stop(){
try {
if(channel!=null){
channel.disconnect();
channel.close();
}
if(dispatcher!=null)dispatcher.stop();
log.info("TestJGroups stopped.");
} catch (Exception e) {
log.warn("TestJGroups stopped with errors", e);
}
}
@Override
public Object handle(Message msg) throws Exception {
String msgString = (String)msg.getObject();
if(msgString.startsWith("XXX")){
return "ABC";
}
return "NOP";
}
@Override
public void block() {
//NOP
}
@Override
public void suspect(Address address) {
//NOP
}
@Override
public void unblock() {
//NOP
}
@Override
public void viewAccepted(View view) {
log.info("Detected a change in cluster members: " + view.getMembers());
if(!isCheckingStatus){
checkStatus();
}
}
private synchronized void checkStatus(){
try {
//if(active)return;
isCheckingStatus = true;
RequestOptions opts = new RequestOptions(ResponseMode.GET_ALL, 0);
RspList<rsp> responses=dispatcher.castMessage(null, new Message(null, null, "REQ"), opts);
for(Rsp rsp:responses){
String response = (String)rsp.getValue();
log.info("CheckStatus response: " + response);
}
} catch (Exception e) {
//TODO
log.error("CheckStatus error", e);
} finally {
isCheckingStatus = false;
}
}
class CheckStatusThread implements Runnable {
public void run(){
if(!isCheckingStatus){
checkStatus();
}
}
}
class SecondClient implements RequestHandler {
@Override
public Object handle(Message arg0) throws Exception {
return null;
}
private JChannel channel;
private MessageDispatcher dispatcher;
public void test(){
try {
String config = System.getProperty("config");
log.info("Using config: " + config);
FileInputStream is = new FileInputStream(config);
channel = new JChannel(is);
channel.setDiscardOwnMessages(true);
dispatcher = new MessageDispatcher(channel, null, null, this);
channel.connect(CHANNEL_NAME);
RequestOptions opts = new RequestOptions(ResponseMode.GET_ALL, 0);
RspList<rsp> responses=dispatcher.castMessage(null, new Message(null, null, "XXX"), opts);
for(Rsp rsp:responses){
String response = (String)rsp.getValue();
log.info("response: " + response);
}
} catch (Exception e) {
log.error("Test error", e);
} finally {
channel.close();
dispatcher.stop();
}
}
}
public static void main(String[] args) throws Exception{
TestJGroups t1 = new TestJGroups();
new Thread(t1).start();
Util.sleep(5000);
t1.new SecondClient().test();
//stop is not used use Ctrl+C
}
}
</rsp></rsp>
Can you spot the error? Well it’s not so obvious until you read the following in the JGroups doc:
Note that anything that could block should not be done in a callback.
In fact the code should be written using either a thread, or, in order not to create a thread on each view change (threads are expensive), an executor.
<pre lang="java">
@Override
public void viewAccepted(View view) {
log.info("Detected a change in cluster members: " + view.getMembers());
executor.execute(new ChangeStatusCommand());
}
where the executor is a single thread executor obtained with
<pre lang="java">
executor = Executors.newSingleThreadExecutor();