com/github/kyleburton/sandbox/rabbitmq.clj

Line Coverage Rate: 0.21238938053097345
     Lines Covered: 48
Branch Coverage Rate: 0.21212121212121213
    Branches Covered: 28
1(ns com.github.kyleburton.sandbox.rabbitmq
2  (:import (com.rabbitmq.client AMQP)
3           (com.rabbitmq.client Channel Connection ConnectionFactory
4                                ConnectionParameters QueueingConsumer
5                                RpcServer RpcClient))
6  (:use [com.github.kyleburton.sandbox.utils :as kutils])
7  (:use [clojure.contrib.str-utils :as str]))
8
9(def *connection-params*
10     (doto (ConnectionParameters.)
11       (.setUsername "guest")
12       (.setPassword "guest")))
13
14(def DELIVERY_MODE_NON_PERSISTENT 1)
15(def DELIVERY_MODE_PERSISTENT 2)
16
17(def *default-port* com.rabbitmq.client.AMQP$PROTOCOL/PORT)
18(def *default-host*         "localhost")
19(def *default-exchange*     "com.github.kyleburton.sandbox.rabbitmq.default.exchange.name")
20(def *default-queue*        "com.github.kyleburton.sandbox.rabbitmq.default.queue.name")
21(def *default-rpc-exchange* "com.github.kyleburton.sandbox.rabbitmq.default.rpc.queue.name")
22(def *default-rpc-queue*    "com.github.kyleburton.sandbox.rabbitmq.default.rpc.queue.name")
23(def *default-timeout*      250)
24(def *default-binding-key*  *default-queue*)
25(def *default-message-properties*
26     (let [prop (com.rabbitmq.client.AMQP$BasicProperties.)]
27       (set! (.deliveryMode prop) DELIVERY_MODE_NON_PERSISTENT)
28       prop))
29
30;; (map #(.get % *default-message-properties*) (kutils/fields-seq *default-message-properties*))
31;; (kutils/fields-and-values-map *default-message-properties*)
32
33;; (def *rabbit-host-name* "localhost")
34;; (def *default-routing-key* "my-routing-key")
35;; (def *default-exchange-name* "my-exchange")
36;; <field name="delivery mode" type="octet"> Non-persistent (1) or persistent (2) </field>
37
38(def #^{:doc "Var for holding a default factory"}
39     *amqp-factory* 
40     (ConnectionFactory. *connection-params*))
41
42(def #^{:doc "Var used to hold the connection to the factory."} 
43     *factory* nil)
44
45(def #^{:doc "Var used to hold the open connection on the broker."} 
46     *connection* nil)
47
48(def #^{:doc "Var used to hold the open channel on the connection."} 
49     *channel* nil)
50
51;; TODO: document more of the options and parameters that have meaning
52;; in the *env*
53(def #^{:doc "Var used to hold the current 'environment' for connections."} 
54     *env* {:acknowledge true
55            :host *default-host*
56            :port *default-port*
57            :user nil
58            :pass nil
59            })
60
61(defn make-connection-params
62  "Simplified constructor for ConnectionParameters."
63  [props]
64  (let [params (ConnectionParameters.)]
65    (if-let [val (:user props)]      (.setUsername params val))
66    (if-let [val (:username props)]  (.setUsername params val))
67    (if-let [val (:user-name props)] (.setUsername params val))
68    (if-let [val (:pass props)]      (.setPassword params val))
69    (if-let [val (:password props)]  (.setPassword params val))
70;;     (if (:pass props)         (.setPassword             params (:pass        props)))
71;;     (if (:password props)     (.setPassword             params (:password    props)))
72    (if (:vhost props)        (.setVirtualHost          params (:vhost       props)))
73    (if (:channel-max props)  (.setRequestedChannelMax  params (:channel-max props)))
74    (if (:frame-max props)    (.setRequestedFrame       params (:frame-max   props)))
75    (if (:heartbeat props)    (.setRequestedHeartbeat   params (:heartbeat   props)))
76    (if (:heart-beat props)   (.setRequestedHeartbeat   params (:heart-beat  props)))
77    params))
78
79(bean (make-connection-params {:user "guest" :pass "guest"}))
80
81;; set these if you want to override...
82(def *default-param-map* (atom
83                          {:user "guest"
84                           :pass "guest"}))
85
86
87;; TODO: refactor with-connection and do-connection, too much shared
88;; behavior for them to be so cut&paste
89(defmacro with-connection [params & body]
90  (let [[positional named] (kutils/parse-paired-arglist params)]
91    `(binding [*env* ~(merge *env* @*default-param-map* named)]
92       (binding [*factory*  (ConnectionFactory. (make-connection-params *env*))]
93         (with-open [conn# (.newConnection *factory* 
94                                                (:host *env* *default-host*)
95                                                (:port *env* *default-port*))]
96           (binding [*connection* conn#]
97             (with-open [channel# (.createChannel *connection*)]
98               (binding [*channel* channel#]
99                 (prn (format "with-connection: declaring exchange: %s" (:exchange *env* *default-exchange*)))
100                 (.exchangeDeclare *channel*
101                                   (:exchange *env* *default-exchange*)
102                                   (:exchange-type *env* "direct")
103                                   (:exchange-durable *env* false))
104                 (prn (format "with-connection: declaring queue: %s" (:queue *env* *default-queue*)))
105                 (.queueDeclare *channel* 
106                                (:queue *env* *default-queue*)
107                                (:queue-durable *env* false))
108                 (.queueBind *channel*
109                             (:queue *env* *default-queue*)
110                             (:exchange *env* *default-exchange*)
111                             (:binding-key *env* *default-binding-key*))
112                 ~@body))))))))
113
114;; params must be a hash...
115(defn do-connection
116  "Functional equivalent of with-connection macro"
117  [params fn]
118  (binding [*env* (merge *env* @*default-param-map* params)]
119     (binding [*factory*  (ConnectionFactory. (make-connection-params *env*))]
120       (with-open [connection (.newConnection *factory* 
121                                              (:host *env* *default-host*)
122                                              (:port *env* *default-port*))]
123         (binding [*connection* connection]
124           (with-open [channel (.createChannel *connection*)]
125             (binding [*channel* channel]
126               (prn (format "do-connection: declaring exchange: %s" (:exchange *env* *default-exchange*)))
127               (.exchangeDeclare *channel*
128                                 (:exchange *env* *default-exchange*)
129                                 (:exchange-type *env* "direct")
130                                 (:exchange-durable *env* false))
131               (prn (format "do-connection: declaring queue: %s" (:queue *env* *default-queue*)))
132               (.queueDeclare *channel* (:queue *env* *default-queue*) (:queue-durable *env* false))
133               (.queueBind *channel*
134                           (:queue *env* *default-queue*)
135                           (:exchange *env* *default-exchange*)
136                           (:binding-key *env* *default-binding-key*))
137               (fn))))))))
138
139;; (with-connection
140;;     [:exchange "test.exchange.name" 
141;;      :exchange-durable false
142;;      :queue "test.queue.name" :queue-durable false]
143;;   (prn "broker=" *factory*)
144;;   (prn "connection=" *connection*)
145;;   (prn "env=" *env*))
146
147;; (do-connection
148;;  {:exchange "test.exchange.name" 
149;;   :exchange-durable false
150;;   :queue "test.queue.name" :queue-durable false}
151;;  (fn []
152;;    (prn "broker=" *factory*)
153;;    (prn "connection=" *connection*)
154;;    (prn "env=" *env*)))
155
156(defn basic-publish [#^java.util.Map params #^String message]
157  (do-connection 
158   params
159   (fn []
160     (.basicPublish 
161      *channel*
162      (:exchange    *env* *default-exchange*)
163      (:routing-key *env* *default-queue*)
164      (:mandatory   *env* false)
165      (:immediate   *env* false)
166      nil   ;; (:message-properties *env* *default-message-properties*)
167      (.getBytes message)))))
168
169;; (basic-publish {} (format "[%s] this is my message..." (java.util.Date.)))
170;; (String. (.getBody (basic-get)))
171;; (String. (.getBody (basic-get {:queue "SimpleQueue"})))
172
173(defn basic-get
174  "Immediate pull, even if there are no messages waiting."
175  [& [#^java.util.Map params]]
176  (do-connection
177   params
178   (fn []
179     (prn (format "calling .basicGet on *channel*=%s" *channel*))
180     (.basicGet *channel* 
181                (:queue *env* *default-queue*) 
182                (:acknowledge *env* true)))))
183
184(defn do-consume [params f]
185  (do-connection 
186   params
187   (fn []
188     (let [consumer (let [consumer (QueueingConsumer. *channel*)]
189                      (.basicConsume *channel* (:routing-key *env* *default-queue*) consumer)
190                      consumer)]
191       (f consumer)))))
192
193(defn ack-delivery [consumer delivery]
194  (.basicAck (.getChannel consumer)
195             (.getDeliveryTag (.getEnvelope delivery))
196             true))
197
198
199;; (basic-publish {} (format "[%s] this is my message..." (java.util.Date.)))
200
201
202
203(defn try-get
204  "Using QueueingConsumer, try to pull off a message, returning nil if none were ready."
205  [& [#^java.util.Map params]]
206  (do-consume
207   params
208   (fn [consumer]
209     (if-let [delivery (.nextDelivery consumer (:timeout *env* *default-timeout*))]
210       (do
211         (if (:acknowledge *env* true)
212           (ack-delivery consumer delivery))
213         (String. (.getBody delivery)))))))
214
215;; (basic-publish {} (format "[%s] this is my message..." (java.util.Date.)))
216;; (try-get {:acknowledge false})
217;; (try-get)
218
219
220
221;; (with-connection [] (.queueDelete *channel* "SimpleQueue"))
222;; (with-connection [] (.queueDelete *channel* "test.queue.name"))
223;; (with-connection [] (.exchangeDelete *channel* "test.exchange.name"))
224
225;; (with-connection [] (.queueDelete *channel* *default-queue*))
226;; (with-connection [] (.exchangeDelete *channel* *default-exchange*))
227
228;; (def x (make-rpc-state))
229;; (:factory x)
230;; (:channel x)
231;; (:connection x)
232;; (shutdown-rpc-state x)
233
234(defn make-rpc-server [params callbacks]
235  (binding [*env* (merge {:queue *default-rpc-queue*} *env* params)]
236    (let [factory    (ConnectionFactory. (make-connection-params *env*))
237          connection (.newConnection factory
238                                     (:host *env* *default-host*) 
239                                     (:port *env* *default-port*))
240          channel    (.createChannel connection)
241          rpc-state {:factory factory
242                     :connection connection
243                     :channel channel
244                     :env *env* }
245          queue    (:queue (:env rpc-state))
246          exchange (:exchange *env* *default-rpc-exchange*)
247          map-call (:map-call callbacks)
248          map-cast (:map-cast callbacks)]
249      (.exchangeDeclare channel exchange "direct" true)
250      (.queueDeclare channel queue true)
251      (.queueBind channel
252                  queue
253                  exchange
254                  queue ;(:binding-key *env* *default-binding-key*)
255                  )
256      (prn (format "make-rpc-server: channel:%s queue:%s map-call:%s map-cast:%s"
257                   channel
258                   queue
259                   (:map-call callbacks)
260                   (:map-cast callbacks)))
261      (let [server (proxy [com.rabbitmq.client.MapRpcServer]
262                       [channel queue]
263                     (handleMapCall [request props]
264                                    ;;(prn (format "handleMapCall: this=%s request=%s props=%s" this request props))
265                                    (map-call rpc-state this request props))
266                     (handleMapCast [request]
267                                    ;;(prn (format "handleMapCast: this=%s request=%s=%s" this request))
268                                    (map-cast rpc-state this request)))]
269        (assoc rpc-state :server server)))))
270
271(defn shutdown-rpc-server [state]
272  (try 
273   (.close (:channel state))
274   (catch Exception ex
275     (prn "channel close failed: " ex)
276     (throw ex))
277   (finally
278    (try 
279     (.close (:connection state))
280     (catch Exception ex
281       (prn "connection close failed: " ex)
282       (throw ex))     
283     (finally
284      (try
285       (.close (:server state))
286       (catch Exception ex
287         (prn "server close failed: " ex)
288         (throw ex))))))))
289
290(defn map-call-handler [rpc-state this request props]
291  (prn (format "map-call-handler: this=%s command=%s %s"
292               this
293               (.get request "command")
294               (if (.get request "command")
295                 (.toString (.get request "command"))
296                 "")))
297  (if (and (.get request "command")
298           (= "exit" (.toString (.get request "command"))))
299    (.terminateMainloop this))
300  {"resp" (str "you pinged: " (if (.get request "ping")
301                                (.toString (.get request "ping"))
302                                "*nothing*"))
303   "time" (java.util.Date.)})
304
305
306
307'(
308
309(def rpc-server
310     (make-rpc-server 
311      {};;{:queue "rpc.test"}
312      {:map-call map-call-handler
313       :map-cast (fn rpc-map-cast [rpc-state this request]
314                   (prn (format "in the map-cast handler! rpc-state:%s this:%s request:%s" 
315                                rpc-state this request)))}))
316
317(:exchange (:env rpc-server) *default-rpc-exchange*)
318
319(with-connection [] (.queueDelete *channel* "com.github.kyleburton.sandbox.rabbitmq.default.rpc.queue.name"))
320
321(do (.mainloop (:server rpc-server)) 
322    (prn "mainloop returned")
323    (try (shutdown-rpc-server rpc-server)
324         (catch Exception ex
325           (prn "error shutting down: " ex)
326           (throw ex))))
327
328(.getQueueName (:server rpc-server))
329
330(defn close-rpc-client [client]
331  (.close (.getChannel client))
332  (.close client))
333
334(def rpc-client
335     (RpcClient. 
336      (.createChannel 
337       (.newConnection (:factory rpc-server)
338                       (:host *env* *default-host*) 
339                       (:port *env* *default-port*)))
340      (:exchange (:env rpc-server) *default-rpc-exchange*)
341      (:queue (:env rpc-server))))
342
343(.getExchange rpc-client)
344
345(let [res (.mapCall rpc-client {"ping" "value2"})]
346  (prn (format "returned: %s" res)))
347
348(let [res (.mapCall rpc-client {"command" "exit"})]
349  (prn (format "returned: %s" res)))
350
351(close-rpc-client rpc-client)
352
353)
354
355;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
356;; (def *rpc-server-channel-key* "rpc.channel.key")
357;; (def *rpc-server-connection* (.newConnection *amqp-factory* *default-host* *default-port*))
358;; (def *rpc-server-channel*
359;;      (let [channel (.createChannel *rpc-server-connection*)]
360;;        (.exchangeDeclare channel "rpc.exchange" "direct" true)
361;;        (.queueDeclare channel *rpc-server-channel-key* true)
362;;        channel))
363
364;; (def *rpc-server*
365;;      (let [server (proxy [com.rabbitmq.client.MapRpcServer]
366;;                       [*rpc-server-channel* *rpc-server-channel-key*]
367;;                     (handleMapCall [request props]
368;;                                    (prn "handle-map-call! req=" request)
369;;                                    (if (.get request "exit")
370;;                                      (do
371;;                                        (prn "exiting...")
372;;                                        (.terminateMainloop this)
373;;                                        (.close this)))
374;;                                    {"resp" "I hunger!"}))]
375;;        (.queueDeclare (.getChannel server) *rpc-server-channel-key*)
376;;        server))
377
378
379
380;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
381(comment
382
383;; (consume-message)
384
385;; play w/the rpc client/server
386
387(def *rpc-server-channel-key* "rpc-test")
388(def *rpc-server-connection* (.newConnection *factory* *rabbit-host-name* *rabbit-port*))
389(def *rpc-server-channel*    (open-channel *rpc-server-connection* *default-exchange-name* *rpc-server-channel-key*))
390(def *rpc-server*
391     (let [server (proxy [com.rabbitmq.client.MapRpcServer]
392                      [*rpc-server-channel* *rpc-server-channel-key*]
393                    (handleMapCall [request props]
394                                   (prn "handle-map-call! req=" request)
395                                   (if (.get request "exit")
396                                     (do
397                                       (prn "exiting...")
398                                       (.terminateMainloop this)
399                                       (.close this)))
400                                   {"resp" "I hunger!"}))]
401       (.queueDeclare (.getChannel server) *rpc-server-channel-key*)
402       server))
403
404
405(def *rpc-client-connection* (.newConnection *factory* *rabbit-host-name* *rabbit-port*))
406
407(def *rpc-client-channel*    (open-channel *rpc-client-connection* *default-exchange-name* *rpc-server-channel-key*))
408
409(def *rpc-client*
410     (RpcClient. *rpc-client-channel* "" *rpc-server-channel-key*))
411
412'(
413
414  (do (.mainloop *rpc-server*) 
415      (prn "mainloop returned"))
416
417  ;; (.getQueueName *rpc-server*)
418  ;; (.close *rpc-server*)
419
420  (let [res (.mapCall *rpc-client* {"ping" "value2"})]
421    (prn (format "returned: %s" res)))
422
423  (let [res (.mapCall *rpc-client* {"exit" "value2"})]
424    (prn (format "returned: %s" res)))
425
426  ;; (.getReplyQueue *rpc-client*)
427  ;; (.close *rpc-client*)
428  
429
430)
431
432)
433
434