Thursday, 13 December 2012

Exchange data concurrently

As it comes to a point where two Threads that are running with their own logic have to exchange data, multiplies the possibilities of creating something huge rather than implement a simple exchange logic. Java concurrent API provides us the Exchanger class.

Exchanger tries to solve the producer-consumer problem when there is only one producer and only one consumer taking place. Producer-consumer problem describes the possibility of having two Threads the need of taking and giving data.

Imagine the following example; two threads are exchanging messages. Both sides are generating messages that need to be delivered to the other side, and only during this procedure are both available to be notified by their "inbox" of messages otherwise they are busy on generating their messages.

Having this implemented without using the Exchanger class you'd need a channel with a buffer which should be able to block (by Locking) the awaiting threads until a message arrives. In most cases these solutions take extra resources as extra threads and buffers should be implemented, such resources until now are costly enough and limit the scalability of the application. Needless to say, just consider a timeout logic implementation in your channel.

Enough said! Let's dig into the code. We shall first implement a wrapper having the exhangeable data wrapped.

public interface Exchangeable<T>
{
    T get();
}

As we proceed, a Message Exchanger should be implemented.

public interface MessageTrader<T extends Exchangeable<?>>
{
    void onExchange(T message);
}
/**
 * @author Evangelos Pappas - Evalonlabs.com
 */
public abstract class Messenger<T> implements MessageTrader<Exchangeable<T>>,
        Runnable
{
    private final Queue<Exchangeable<T>>        queue;
    private final Exchanger<Exchangeable<T>>    exchanger;
    
    public Messenger(Exchanger<Exchangeable<T>> exchanger)
    {
        queue = new LinkedBlockingQueue<Exchangeable<T>>();
        this.exchanger = exchanger;
    }
    
    public Messenger(Exchanger<Exchangeable<T>> exchanger, T... messages)
    {
        this(exchanger);
        for (int i = messages.length - 1; i > 0; --i)
        {
            this.queue.add(new DirectExchangeable<T>(messages[i]));
        }
    }
    
    public void add(T message)
    {
        this.queue.add(new DirectExchangeable<T>(message));
    }
    
    public void run()
    {
        while (true)
        {
            try
            {
                this.onExchange(this.exchanger.exchange(this.queue.poll()));
            }
            catch (Exception e)
            {
                
            }
        }
    }
    
    public abstract void onExchange(Exchangeable<T> message);
}

 The message exchanger accepts an exchanger as its messages should be submitted and exchanged to the other thread. Until this point, it might has come to your consideration that each thread doesn't know about the other side exchanger. So no matter if the exchanger actually implements a one-to-one relation is pretty much scalable if you extend it into a dynamic exchange for more than one-to-one concrete relation.

As always the mixed-up takes place in the Main.

public class Main
{
    
    /**
     * @param args
     * @author Evangelos Pappas - Evalonlabs.com
     * @throws InterruptedException
     */
    public static void main(String[] args) throws InterruptedException
    {
        Exchanger<Exchangeable<Long>> exchanger = new Exchanger<Exchangeable<Long>>();
        
        final Messenger<Long> messenger1 = new Messenger<Long>(exchanger)
        {
            @Override
            public void onExchange(Exchangeable<Long> message)
            {
                System.out.println("Messenger[1] @ " + System.nanoTime()
                        + " Received: " + message.get());
            }
        };
        final Messenger<Long> messenger2 = new Messenger<Long>(exchanger)
        {
            @Override
            public void onExchange(Exchangeable<Long> message)
            {
                System.out.println("Messenger[2] @ " + System.nanoTime()
                        + " Received: " + message.get());
            }
        };
        
        new Thread(new Runnable()
        {
            public void run()
            {
                while (true)
                {
                    messenger1.add(System.nanoTime());
                }
            }
        }, "Sink[1]").start();
        new Thread(new Runnable()
        {
            public void run()
            {
                while (true)
                {
                    messenger2.add(System.nanoTime());
                }
            }
        }, "Sink[2]").start();
        new Thread(messenger1, "Messenger[1]").start();
        new Thread(messenger2, "Messenger[2]").start();
    }
}

Have fun &
Cheers!

P.S. You can find the source code of examples on github.

No comments:

Post a Comment