#firebase #firebase-realtime-database #publish-subscribe
#firebase #firebase-realtime-database #опубликовать-подписаться
Вопрос:
Я пытаюсь реализовать pub / sub с базой данных Firebase в реальном времени для распределенной системы. Я хочу что-то вроде Amazon SQS, но я не знаю, как я могу гарантировать, что только один подписчик обработает сообщение.
Будут ли транзакции выполнять эту работу? Есть ли способ заблокировать строку или что-то в этом роде?
Обновить
Вот мой текущий код для подписчика, это безопасно?
public class TaskToRun {
private String id;
private String data;
private boolean running;
//getters and setters
}
public class TaskSubscriber implements ChildEventListener{
private Query reference;
private LinkedList<TaskToRun> queue = new LinkedList<>();
private Object lockQueue = new Object();
private final Semaphore available = new Semaphore(1, true);
private boolean running;
public TaskSubscriber(){
reference = FirebaseDatabase.getInstance().getReference().child("tasks").orderByChild("running").equalTo(false);
}
public void start(){
running = true;
queue.clear();
reference.addChildEventListener(this);
AsyncTask t = new AsyncTask() {
@Override
protected Object doInBackground(Object[] objects) {
while (running){
try {
available.acquire();
if(!running)
break;
TaskToRun task = null;
synchronized(lockQueue){
if(queue.size() > 0){
task = queue.pop();
}
}
if(task != null){
//Try to update the object to running
Transaction.Handler handler = new Transaction.Handler() {
private boolean canRun = false;
@Override
public Transaction.Result doTransaction(MutableData mutableData) {
canRun = false;
TaskToRun t = mutableData.getValue(TaskToRun.class);
if(t == null)
return Transaction.success(mutableData);
if(t.isRunning())
return Transaction.success(mutableData);
t.setRunning(true);
mutableData.setValue(t);
canRun = true;
return Transaction.success(mutableData);
}
@Override
public void onComplete(DatabaseError databaseError, boolean b, DataSnapshot dataSnapshot) {
if(canRun amp;amp; databaseError == null) {
runTask(dataSnapshot.getValue(TaskToRun.class));
}
}
};
FirebaseDatabase.getInstance().getReference().child("tasks").child(task.getId()).runTransaction(handler);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return null;
}
};
t.execute();
}
private void runTask(TaskToRun task) {
Log.e("TASKS","Running task " task.getId());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void stop(){
running = false;
reference.removeEventListener(this);
synchronized (lockQueue) {
queue.clear();
}
//Release all threads
int usedPermits = 1 - available.availablePermits();
available.release(usedPermits);
}
@Override
public void onChildAdded(DataSnapshot dataSnapshot, String s) {
synchronized (lockQueue){
TaskToRun task = dataSnapshot.getValue(TaskToRun.class);
task.setId(dataSnapshot.getKey());
if(!task.isRunning()){
queue.addLast(task);
available.release();
}
}
}
@Override
public void onChildChanged(DataSnapshot dataSnapshot, String s) {
synchronized (lockQueue){
TaskToRun task = dataSnapshot.getValue(TaskToRun.class);
task.setId(dataSnapshot.getKey());
if(task.isRunning()){
for(int i = 0; i < queue.size();i ){
TaskToRun t = queue.get(i);
if(t.getId().equals(task.getId())){
queue.remove(i);
break;
}
}
}
}
}
@Override
public void onChildRemoved(DataSnapshot dataSnapshot) {
synchronized (lockQueue){
TaskToRun task = dataSnapshot.getValue(TaskToRun.class);
task.setId(dataSnapshot.getKey());
if(task.isRunning()){
for(int i = 0; i < queue.size();i ){
TaskToRun t = queue.get(i);
if(t.getId().equals(task.getId())){
queue.remove(i);
break;
}
}
}
}
}
@Override
public void onChildMoved(DataSnapshot dataSnapshot, String s) {
}
@Override
public void onCancelled(DatabaseError databaseError) {
}
}
Основной код
new TaskSubscriber().start();
Код для публикации
TaskToRun task = new TaskToRun();
task.setData("send_sms");
FirebaseDatabase.getInstance().getReference().child("tasks").push().setValue(task);
Комментарии:
1. Существует множество методов, которые позволят легко выполнить это с помощью firebase, таких как идентификаторы push (), правила безопасности и многое другое. Пожалуйста, поделитесь некоторым кодом и тем, что вы, возможно, уже сделали, тогда вам будет намного легче направлять вас в правильном направлении.
2. Для стандартной реализации очереди см.: github.com/firebase/firebase-queue .
3. Я отредактировал свой ответ с помощью кода, который, как я думал, безопасен?