Thursday, 13 December 2012

Exchange data concurrently

As it comes to a point where two Threads that are running with their own logic have to exchange data, multiplies the possibilities of creating something huge rather than implement a simple exchange logic. Java concurrent API provides us the Exchanger class.

Exchanger tries to solve the producer-consumer problem when there is only one producer and only one consumer taking place. Producer-consumer problem describes the possibility of having two Threads the need of taking and giving data.

Imagine the following example; two threads are exchanging messages. Both sides are generating messages that need to be delivered to the other side, and only during this procedure are both available to be notified by their "inbox" of messages otherwise they are busy on generating their messages.

Having this implemented without using the Exchanger class you'd need a channel with a buffer which should be able to block (by Locking) the awaiting threads until a message arrives. In most cases these solutions take extra resources as extra threads and buffers should be implemented, such resources until now are costly enough and limit the scalability of the application. Needless to say, just consider a timeout logic implementation in your channel.

Enough said! Let's dig into the code. We shall first implement a wrapper having the exhangeable data wrapped.

public interface Exchangeable<T>
{
    T get();
}

As we proceed, a Message Exchanger should be implemented.

public interface MessageTrader<T extends Exchangeable<?>>
{
    void onExchange(T message);
}
/**
 * @author Evangelos Pappas - Evalonlabs.com
 */
public abstract class Messenger<T> implements MessageTrader<Exchangeable<T>>,
        Runnable
{
    private final Queue<Exchangeable<T>>        queue;
    private final Exchanger<Exchangeable<T>>    exchanger;
    
    public Messenger(Exchanger<Exchangeable<T>> exchanger)
    {
        queue = new LinkedBlockingQueue<Exchangeable<T>>();
        this.exchanger = exchanger;
    }
    
    public Messenger(Exchanger<Exchangeable<T>> exchanger, T... messages)
    {
        this(exchanger);
        for (int i = messages.length - 1; i > 0; --i)
        {
            this.queue.add(new DirectExchangeable<T>(messages[i]));
        }
    }
    
    public void add(T message)
    {
        this.queue.add(new DirectExchangeable<T>(message));
    }
    
    public void run()
    {
        while (true)
        {
            try
            {
                this.onExchange(this.exchanger.exchange(this.queue.poll()));
            }
            catch (Exception e)
            {
                
            }
        }
    }
    
    public abstract void onExchange(Exchangeable<T> message);
}

 The message exchanger accepts an exchanger as its messages should be submitted and exchanged to the other thread. Until this point, it might has come to your consideration that each thread doesn't know about the other side exchanger. So no matter if the exchanger actually implements a one-to-one relation is pretty much scalable if you extend it into a dynamic exchange for more than one-to-one concrete relation.

As always the mixed-up takes place in the Main.

public class Main
{
    
    /**
     * @param args
     * @author Evangelos Pappas - Evalonlabs.com
     * @throws InterruptedException
     */
    public static void main(String[] args) throws InterruptedException
    {
        Exchanger<Exchangeable<Long>> exchanger = new Exchanger<Exchangeable<Long>>();
        
        final Messenger<Long> messenger1 = new Messenger<Long>(exchanger)
        {
            @Override
            public void onExchange(Exchangeable<Long> message)
            {
                System.out.println("Messenger[1] @ " + System.nanoTime()
                        + " Received: " + message.get());
            }
        };
        final Messenger<Long> messenger2 = new Messenger<Long>(exchanger)
        {
            @Override
            public void onExchange(Exchangeable<Long> message)
            {
                System.out.println("Messenger[2] @ " + System.nanoTime()
                        + " Received: " + message.get());
            }
        };
        
        new Thread(new Runnable()
        {
            public void run()
            {
                while (true)
                {
                    messenger1.add(System.nanoTime());
                }
            }
        }, "Sink[1]").start();
        new Thread(new Runnable()
        {
            public void run()
            {
                while (true)
                {
                    messenger2.add(System.nanoTime());
                }
            }
        }, "Sink[2]").start();
        new Thread(messenger1, "Messenger[1]").start();
        new Thread(messenger2, "Messenger[2]").start();
    }
}

Have fun &
Cheers!

P.S. You can find the source code of examples on github.

Friday, 23 November 2012

My First Post

As in each beginning, I should share my salutations. I am a software engineer with passion of my scientific field. In this blog you will find yourselves reading my researches and implementations in neural networks and various software topics. If I am allowed by my schedule also academic topics of various scientific fields that have concern me will be covered.

As this's my first post, my statements to various topics should be shared. I state as pro-java engineer and a lover for hybrids architectures and implementations. I also stand positive for the open-source's big picture, yet I keep disagree in various details which I hope to be covered in this blog.

In conclusion I hope to get feedback for each of my posts as this get us better.

Until my next post, grab your beer and
Cheers!

P.S. Get Social:
GitHub
Twitter