The Observer Pattern Revisited

The following simple class uses generics and proxies to help with the implementation of the observer pattern.

public class ObserverList {
    private static final Logger LOG
            = Logger.getLogger(ObserverList.class.getName());

    private final List<Object> observers = new ArrayList<>();

    public void addObserver(Object observer) {
        observers.add(observer);
    }

    public void removeObserver(Object observer) {
        observers.remove(observer);
    }

    public <T> T getNotifier(Class<T> intf) {
        ClassLoader loader = Thread.currentThread().getContextClassLoader();
        Object proxy = Proxy.newProxyInstance(loader, new Class<?>[] {intf},
                (pxy, method, args) -> {
            return dispatch(intf, method, args);
        });
        return intf.cast(proxy);
    }

    private <T> Object dispatch(Class<T> intf, Method method, Object[] args) {
        Object result = null;
        for (Object observer: observers) {
            if (intf.isInstance(observer)) {
                try {
                    result = method.invoke(observer, args);
                } catch (IllegalAccessException
                        | InvocationTargetException ex) {
                    LOG.log(Level.SEVERE, null, ex);
                }
            }
        }
        return result;
    }
}

How it’s used

As an example, let’s say that the subject being observed is a picture storage system. The pictures are stored in folders and there are two kinds of observers:

Picture observers:

public interface PictureObserver {
    public void pictureAdded(Picture pic, Folder folder);
    public void pictureRemoved(Picture pic, Folder folder);
    public void pictureUpdated(Picture oldPic, Picture newPic);
}

Folder observers:

public interface FolderObserver {
    public void folderCreated(Folder folder);
    public void folderDeleted(Folder folder);
    public void folderUpdated(Folder oldFolder, Folder newFolder);
}

Now the Storage class would look like this:

public class Storage {
    // ...
    private final ObserverList observers = new ObserverList();
    private final PictureObserver pictureNotifier
            = observers.getNotifier(PictureObserver.class);
    private final FolderObserver folderNotifier
            = observers.getNotifier(FolderObserver.class);
    // ...

    public void addPictureObserver(PictureObserver obs) {
        observers.addObserver(obs);
    }

    public void removePictureObserver(PictureObserver obs) {
        observers.removeObserver(obs);
    }

    public void addFolderObserver(FolderObserver obs) {
        observers.addObserver(obs);
    }

    public void removeFolderObserver(FolderObserver obs) {
        observers.removeObserver(obs);
    }

    // ...

    public void addPicture(Picture pic, Folder folder) {
        // ...
        pictureNotifier.pictureAdded(pic, folder);
    }

    public void removePicture(Picture pic, Folder folder) {
        // ...
        pictureNotifier.pictureRemoved(pic, folder);
    }

    public void updatePicture(Picture pic) {
        Picture old = getPictureById(pic.getId());
        // ...
        pictureNotifier.pictureUpdated(old, pic);
    }

    // ...
}

You can see that we take advantage of the fact that the method to “fire” a notification has the same signature as the method that handles the notification, so the same interface can be used.

These methods always have a void return type; however, if it wasn’t the case, a notifier would return the value returned by the last observer notified. I believe that .NET’s delegates have the same behavious.

Source code here: https://gitlab.com/_java-examples/observer1

Dealing with transactions

There are times when you want several operations to be either all executed or none at all. The typical example is a transfer operation in an accounting application, where the balance of one account must be increased, while the balance of another is decreased by the same amount.

One way to deal with transactions in the observer pattern is to postpone the dispatching of notifications until the end of the transaction. This can be done manually in simple cases, but it can be difficult to keep track of all modifications in more complex transactions.

Here’s how it’s done:

The transaction interface:

public interface Transaction extends AutoCloseable {
    @Override
    public void close();
    public void commit();
}

It’s used as follows:

    try (Transaction tr = observers.startTransaction()) {
        // ...
        // The actual transaction with notifications
        // ...
        tr.commit();
    }

Here’s the implementation in class ObserverList:

public class ObserverList {
    // ...
    private TransactionImpl currentTrans;
    // ...
    public Transaction startTransaction() {
        if (currentTrans != null) {
            throw new IllegalStateException("Already in a transaction.");
        }
        return currentTrans = new TransactionImpl();
    }
    // ...

    private <T> Object dispatch(Class<T> intf, Method method, Object[] args) {
        Runnable disp = () -> {
            for (Object observer: observers) {
                if (intf.isInstance(observer)) {
                    try {
                        method.invoke(observer, args);
                    } catch (IllegalAccessException
                            | InvocationTargetException ex) {
                        LOG.log(Level.SEVERE, null, ex);
                    }
                }
            }
        };
        if (currentTrans != null) {
            currentTrans.add(disp);
        } else {
            disp.run();
        }
        return null;
    }

    private class TransactionImpl implements Transaction {
        private final List<Runnable> notificationQueue = new ArrayList<>();

        @Override
        public void commit() {
            while (!notificationQueue.isEmpty()) {
                notificationQueue.remove(0).run();
            }
        }

        @Override
        public void close() {
            notificationQueue.clear();
            currentTrans = null;
        }

        private void add(Runnable op) {
            notificationQueue.add(op);
        }
    }
}

Example: accounting

Let’s define the interface AccountingObserver as follows:

public interface AccountObserver {
    public void accountCreated(String name, double balance);
    public void accountDeleted(String name);
    public void balanceChanged(String name, double balance);
}

And the accounting class:

public class Accounting {
    private final SessionFactory sessionFactory;
    private final ObserverList observers = new ObserverList();
    private final AccountObserver notifier
            = observers.getNotifier(AccountObserver.class);

    public Accounting(SessionFactory sessionFactory) {
        this.sessionFactory = sessionFactory;
    }

    public void addAccountObserver(AccountObserver obs) {
        observers.addObserver(obs);
    }

    public void removeAccountObserver(AccountObserver obs) {
        observers.removeObserver(obs);
    }

    public void createAccount(String name, double initialBalance) {
        try (Transaction tr = observers.startTransaction();
                Session session = sessionFactory.openSession()) {
            Account account = new Account();
            account.setName(name);
            account.setBalance(initialBalance);
            session.insert(account);
            notifier.accountCreated(name, initialBalance);
            session.commit();
            tr.commit();
        }
    }

    public void deleteAccount(String name) {
        try (Transaction tr = observers.startTransaction();
                Session session = sessionFactory.openSession()) {
            Account account = session.load(Account.class, name);
            if (account == null) {
                throw new IllegalArgumentException("Account not found: " + name);
            }
            session.delete(account);
            notifier.accountDeleted(name);
            session.commit();
        }
    }

    public double getBalance(String name) {
        try (Session session = sessionFactory.openSession()) {
            Account account = session.load(Account.class, name);
            if (account == null) {
                throw new IllegalArgumentException("Account not found: " + name);
            }
            return account.getBalance();
        }
    }

    public void transfer(double amount, String from, String to) {
        try (Transaction tr = observers.startTransaction();
                Session session = sessionFactory.openSession()) {
            Account fromAccount = session.load(Account.class, from);
            if (fromAccount == null) {
                throw new IllegalArgumentException("Account not found: " + from);
            }
            fromAccount.debit(amount);
            session.update(fromAccount);
            notifier.balanceChanged(
                    fromAccount.getName(), fromAccount.getBalance());
            Account toAccount = session.load(Account.class, to);
            if (fromAccount == null) {
                throw new IllegalArgumentException("Account not found: " + to);
            }
            toAccount.credit(amount);
            session.update(toAccount);
            notifier.balanceChanged(
                    toAccount.getName(), toAccount.getBalance());
            session.commit();
            tr.commit();
        }
    }
}

Source code here: https://gitlab.com/_java-examples/observer2