Общие / дополнительные события с базой данных Firebase в реальном времени

#firebase #firebase-realtime-database #publish-subscribe

#firebase #firebase-realtime-database #опубликовать-подписаться

Вопрос:

Я пытаюсь реализовать pub / sub с базой данных Firebase в реальном времени для распределенной системы. Я хочу что-то вроде Amazon SQS, но я не знаю, как я могу гарантировать, что только один подписчик обработает сообщение.

Будут ли транзакции выполнять эту работу? Есть ли способ заблокировать строку или что-то в этом роде?

Обновить

Вот мой текущий код для подписчика, это безопасно?

 public class TaskToRun {
    private String id;
    private String data;
    private boolean running;
    //getters and setters
}

public class TaskSubscriber implements ChildEventListener{
    private Query reference;
    private LinkedList<TaskToRun> queue = new LinkedList<>();
    private Object lockQueue = new Object();
    private final Semaphore available = new Semaphore(1, true);
    private boolean running;
    public TaskSubscriber(){
        reference = FirebaseDatabase.getInstance().getReference().child("tasks").orderByChild("running").equalTo(false);
    }
    public void start(){
        running = true;
        queue.clear();
        reference.addChildEventListener(this);
        AsyncTask t = new AsyncTask() {
            @Override
            protected Object doInBackground(Object[] objects) {
                while (running){
                    try {
                        available.acquire();
                        if(!running)
                            break;
                        TaskToRun task = null;
                        synchronized(lockQueue){
                            if(queue.size() > 0){
                                task = queue.pop();
                            }
                        }
                        if(task != null){

                            //Try to update the object to running
                            Transaction.Handler handler = new Transaction.Handler() {
                                private boolean canRun = false;
                                @Override
                                public Transaction.Result doTransaction(MutableData mutableData) {
                                    canRun = false;

                                    TaskToRun t = mutableData.getValue(TaskToRun.class);
                                    if(t == null)
                                        return  Transaction.success(mutableData);
                                    if(t.isRunning())
                                        return Transaction.success(mutableData);

                                    t.setRunning(true);
                                    mutableData.setValue(t);
                                    canRun = true;
                                    return  Transaction.success(mutableData);

                                }

                                @Override
                                public void onComplete(DatabaseError databaseError, boolean b, DataSnapshot dataSnapshot) {
                                    if(canRun amp;amp; databaseError == null) {
                                        runTask(dataSnapshot.getValue(TaskToRun.class));

                                    }
                                }
                            };
                            FirebaseDatabase.getInstance().getReference().child("tasks").child(task.getId()).runTransaction(handler);

                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                return null;
            }
        };
        t.execute();
    }
    private void runTask(TaskToRun task) {
        Log.e("TASKS","Running task " task.getId());
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public void stop(){
        running = false;
        reference.removeEventListener(this);
        synchronized (lockQueue) {
            queue.clear();
        }
        //Release all threads
        int usedPermits = 1 - available.availablePermits();
        available.release(usedPermits);
    }

    @Override
    public void onChildAdded(DataSnapshot dataSnapshot, String s) {
        synchronized (lockQueue){
            TaskToRun task = dataSnapshot.getValue(TaskToRun.class);
            task.setId(dataSnapshot.getKey());
            if(!task.isRunning()){
                queue.addLast(task);
                available.release();
            }
        }
    }

    @Override
    public void onChildChanged(DataSnapshot dataSnapshot, String s) {
        synchronized (lockQueue){
            TaskToRun task = dataSnapshot.getValue(TaskToRun.class);
            task.setId(dataSnapshot.getKey());

            if(task.isRunning()){
                for(int i = 0; i < queue.size();i  ){
                    TaskToRun t  = queue.get(i);
                    if(t.getId().equals(task.getId())){
                        queue.remove(i);
                        break;
                    }
                }
            }
        }
    }

    @Override
    public void onChildRemoved(DataSnapshot dataSnapshot) {
        synchronized (lockQueue){
            TaskToRun task = dataSnapshot.getValue(TaskToRun.class);
            task.setId(dataSnapshot.getKey());

            if(task.isRunning()){
                for(int i = 0; i < queue.size();i  ){
                    TaskToRun t  = queue.get(i);
                    if(t.getId().equals(task.getId())){
                        queue.remove(i);
                        break;
                    }
                }
            }
        }
    }

    @Override
    public void onChildMoved(DataSnapshot dataSnapshot, String s) {

    }

    @Override
    public void onCancelled(DatabaseError databaseError) {

    }
}
  

Основной код

 new TaskSubscriber().start();
  

Код для публикации

 TaskToRun task = new TaskToRun();
task.setData("send_sms");
FirebaseDatabase.getInstance().getReference().child("tasks").push().setValue(task);
  

Комментарии:

1. Существует множество методов, которые позволят легко выполнить это с помощью firebase, таких как идентификаторы push (), правила безопасности и многое другое. Пожалуйста, поделитесь некоторым кодом и тем, что вы, возможно, уже сделали, тогда вам будет намного легче направлять вас в правильном направлении.

2. Для стандартной реализации очереди см.: github.com/firebase/firebase-queue .

3. Я отредактировал свой ответ с помощью кода, который, как я думал, безопасен?