package com.logentries.mq; import java.util.ArrayList; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.logentries.api.Batch; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; public class ChannelAdapter { private static final Logger LOGGER = LoggerFactory.getLogger(ChannelAdapter.class); private final String waitQueue; private final String workQueue; private final JedisPool jedisPool; /** * instantiates ChannelAdapter, in charge of communication with messaging * service. * * @param jedisPool * - provider of Jedis connections, if null means let * ChannelAdapter create one by itself using data from AppConfig */ public ChannelAdapter(JedisPool jedisPool) { this.jedisPool = jedisPool; this.waitQueue = "TestSample:waitQueue"; this.workQueue = "TestSample:workQueue"; } /* implementation of conversion Batch object into json presentation */ private String batchToJson(Batch job) { return job.toString(); } /* implementation of conversion json into Batch presentation */ private Batch jsonToBatch(String batchJson) { return null;// new Batch(batchJson); } /** * sends batch job to a queue for further processing. * * @param job * task that will be serialized and sent to queue * @return true if job has been successfully queued */ public boolean sendJobToWaitQueue(Batch job) { LOGGER.debug("Trying to push job to queue: " + job.toString()); String jobJson = batchToJson(job); Jedis instance = null; try { instance = this.jedisPool.getResource(); // left push to a wait queue instance.lpush(waitQueue, jobJson); LOGGER.debug("Job successfully published to channel {} {}", waitQueue, jobJson); return true; } catch (Exception e) { LOGGER.error("Problem while publishing message to a channel", e); return false; } finally { instance.close(); } } /** * checks if there is job available waiting in a 'wait' queue. If there is * job waiting in a queue, it will be transfered into 'work' queue and * returned back. * * @return Batch if available for work, otherwise null */ public Batch checkIfJobAvailable() { String jobJson = null; Jedis instance = null; Batch job = null; try { instance = this.jedisPool.getResource(); // trying to pick up new job from 'wait' queue and transfer it to // 'work' queue in single transaction String message = instance.rpoplpush(waitQueue, workQueue); if (message == null) { return null; } job = jsonToBatch(message); if (job == null) { return job; } LOGGER.debug("Job successfully transferred to 'work' queue:{} json:{}", workQueue, jobJson); return job; } catch (Exception e) { LOGGER.error("Problem while checking new job message", e); return null; } finally { instance.close(); } } /** * makes sure ChannelAdapter will stop its activities in a secure manner, * closing all connections. */ public void stopActivities() { this.jedisPool.close(); } /** * assures all needed for a job to be returned successfully to a 'wait' * queue. removes job from 'work' queue, * * @return information if transaction succeeded or not. */ public boolean returnJobBackToWaitQueue(Batch job) { boolean res = false; String jobId = job.getId(); LOGGER.info("Returning job {} back to 'wait' queue", jobId); // 1. remove it from working queue res = removeJobFromWorkQueue(job); if (!res) { LOGGER.error("Failed to take job off 'work' queue; id: {}", jobId); return res; } // 2. back to wait queue res = sendJobToWaitQueue(job); return res; } /* * searches for dedicated job on queue and removes it off queue, send back * true if removal successful */ private boolean removeJobFromWorkQueue(Batch job) { if (job.getId() == null) { LOGGER.warn("Got null ID of batch to remove off queue?!? Buckets: {}", job.getBuckets()); return false; } String batchJson = batchToJson(job); Jedis instance = null; Long res = -1L; try { instance = this.jedisPool.getResource(); res = instance.lrem(workQueue, 1, batchJson); } catch (Exception e) { LOGGER.warn("Problem while removing job {} off queue {} Ex:{}", job.getId(), workQueue, e); } finally { instance.close(); } return res == 1; } /** * lists all jobs that are currently in progress. */ public List<Batch> getJobsInProgress() { Jedis instance = null; List<String> res; try { instance = this.jedisPool.getResource(); LOGGER.debug("Trying to read all elements in {} queue", workQueue); res = instance.lrange(workQueue, 0, -1); List<Batch> jobs = new ArrayList<>(res.size()); for (String json : res) { Batch job = jsonToBatch(json); jobs.add(job); } return jobs; } catch (Exception e) { LOGGER.warn("Problem while listing job list of all elements, queue:{}", workQueue, e); return null; } finally { if (instance != null) { instance.close(); } } } }