1 | (ns com.github.kyleburton.sandbox.rabbitmq |
2 | (:import (com.rabbitmq.client AMQP) |
3 | (com.rabbitmq.client Channel Connection ConnectionFactory |
4 | ConnectionParameters QueueingConsumer |
5 | RpcServer RpcClient)) |
6 | (:use [com.github.kyleburton.sandbox.utils :as kutils]) |
7 | (:use [clojure.contrib.str-utils :as str])) |
8 |
|
9 | (def *connection-params* |
10 | (doto (ConnectionParameters.) |
11 | (.setUsername "guest") |
12 | (.setPassword "guest"))) |
13 |
|
14 | (def DELIVERY_MODE_NON_PERSISTENT 1) |
15 | (def DELIVERY_MODE_PERSISTENT 2) |
16 |
|
17 | (def *default-port* com.rabbitmq.client.AMQP$PROTOCOL/PORT) |
18 | (def *default-host* "localhost") |
19 | (def *default-exchange* "com.github.kyleburton.sandbox.rabbitmq.default.exchange.name") |
20 | (def *default-queue* "com.github.kyleburton.sandbox.rabbitmq.default.queue.name") |
21 | (def *default-rpc-exchange* "com.github.kyleburton.sandbox.rabbitmq.default.rpc.queue.name") |
22 | (def *default-rpc-queue* "com.github.kyleburton.sandbox.rabbitmq.default.rpc.queue.name") |
23 | (def *default-timeout* 250) |
24 | (def *default-binding-key* *default-queue*) |
25 | (def *default-message-properties* |
26 | (let [prop (com.rabbitmq.client.AMQP$BasicProperties.)] |
27 | (set! (.deliveryMode prop) DELIVERY_MODE_NON_PERSISTENT) |
28 | prop)) |
29 |
|
30 | ;; (map #(.get % *default-message-properties*) (kutils/fields-seq *default-message-properties*)) |
31 | ;; (kutils/fields-and-values-map *default-message-properties*) |
32 |
|
33 | ;; (def *rabbit-host-name* "localhost") |
34 | ;; (def *default-routing-key* "my-routing-key") |
35 | ;; (def *default-exchange-name* "my-exchange") |
36 | ;; <field name="delivery mode" type="octet"> Non-persistent (1) or persistent (2) </field> |
37 |
|
38 | (def #^{:doc "Var for holding a default factory"} |
39 | *amqp-factory* |
40 | (ConnectionFactory. *connection-params*)) |
41 |
|
42 | (def #^{:doc "Var used to hold the connection to the factory."} |
43 | *factory* nil) |
44 |
|
45 | (def #^{:doc "Var used to hold the open connection on the broker."} |
46 | *connection* nil) |
47 |
|
48 | (def #^{:doc "Var used to hold the open channel on the connection."} |
49 | *channel* nil) |
50 |
|
51 | ;; TODO: document more of the options and parameters that have meaning |
52 | ;; in the *env* |
53 | (def #^{:doc "Var used to hold the current 'environment' for connections."} |
54 | *env* {:acknowledge true |
55 | :host *default-host* |
56 | :port *default-port* |
57 | :user nil |
58 | :pass nil |
59 | }) |
60 |
|
61 | (defn make-connection-params |
62 | "Simplified constructor for ConnectionParameters." |
63 | [props] |
64 | (let [params (ConnectionParameters.)] |
65 | (if-let [val (:user props)] (.setUsername params val)) |
66 | (if-let [val (:username props)] (.setUsername params val)) |
67 | (if-let [val (:user-name props)] (.setUsername params val)) |
68 | (if-let [val (:pass props)] (.setPassword params val)) |
69 | (if-let [val (:password props)] (.setPassword params val)) |
70 | ;; (if (:pass props) (.setPassword params (:pass props))) |
71 | ;; (if (:password props) (.setPassword params (:password props))) |
72 | (if (:vhost props) (.setVirtualHost params (:vhost props))) |
73 | (if (:channel-max props) (.setRequestedChannelMax params (:channel-max props))) |
74 | (if (:frame-max props) (.setRequestedFrame params (:frame-max props))) |
75 | (if (:heartbeat props) (.setRequestedHeartbeat params (:heartbeat props))) |
76 | (if (:heart-beat props) (.setRequestedHeartbeat params (:heart-beat props))) |
77 | params)) |
78 |
|
79 | (bean (make-connection-params {:user "guest" :pass "guest"})) |
80 |
|
81 | ;; set these if you want to override... |
82 | (def *default-param-map* (atom |
83 | {:user "guest" |
84 | :pass "guest"})) |
85 |
|
86 |
|
87 | ;; TODO: refactor with-connection and do-connection, too much shared |
88 | ;; behavior for them to be so cut&paste |
89 | (defmacro with-connection [params & body] |
90 | (let [[positional named] (kutils/parse-paired-arglist params)] |
91 | `(binding [*env* ~(merge *env* @*default-param-map* named)] |
92 | (binding [*factory* (ConnectionFactory. (make-connection-params *env*))] |
93 | (with-open [conn# (.newConnection *factory* |
94 | (:host *env* *default-host*) |
95 | (:port *env* *default-port*))] |
96 | (binding [*connection* conn#] |
97 | (with-open [channel# (.createChannel *connection*)] |
98 | (binding [*channel* channel#] |
99 | (prn (format "with-connection: declaring exchange: %s" (:exchange *env* *default-exchange*))) |
100 | (.exchangeDeclare *channel* |
101 | (:exchange *env* *default-exchange*) |
102 | (:exchange-type *env* "direct") |
103 | (:exchange-durable *env* false)) |
104 | (prn (format "with-connection: declaring queue: %s" (:queue *env* *default-queue*))) |
105 | (.queueDeclare *channel* |
106 | (:queue *env* *default-queue*) |
107 | (:queue-durable *env* false)) |
108 | (.queueBind *channel* |
109 | (:queue *env* *default-queue*) |
110 | (:exchange *env* *default-exchange*) |
111 | (:binding-key *env* *default-binding-key*)) |
112 | ~@body)))))))) |
113 |
|
114 | ;; params must be a hash... |
115 | (defn do-connection |
116 | "Functional equivalent of with-connection macro" |
117 | [params fn] |
118 | (binding [*env* (merge *env* @*default-param-map* params)] |
119 | (binding [*factory* (ConnectionFactory. (make-connection-params *env*))] |
120 | (with-open [connection (.newConnection *factory* |
121 | (:host *env* *default-host*) |
122 | (:port *env* *default-port*))] |
123 | (binding [*connection* connection] |
124 | (with-open [channel (.createChannel *connection*)] |
125 | (binding [*channel* channel] |
126 | (prn (format "do-connection: declaring exchange: %s" (:exchange *env* *default-exchange*))) |
127 | (.exchangeDeclare *channel* |
128 | (:exchange *env* *default-exchange*) |
129 | (:exchange-type *env* "direct") |
130 | (:exchange-durable *env* false)) |
131 | (prn (format "do-connection: declaring queue: %s" (:queue *env* *default-queue*))) |
132 | (.queueDeclare *channel* (:queue *env* *default-queue*) (:queue-durable *env* false)) |
133 | (.queueBind *channel* |
134 | (:queue *env* *default-queue*) |
135 | (:exchange *env* *default-exchange*) |
136 | (:binding-key *env* *default-binding-key*)) |
137 | (fn)))))))) |
138 |
|
139 | ;; (with-connection |
140 | ;; [:exchange "test.exchange.name" |
141 | ;; :exchange-durable false |
142 | ;; :queue "test.queue.name" :queue-durable false] |
143 | ;; (prn "broker=" *factory*) |
144 | ;; (prn "connection=" *connection*) |
145 | ;; (prn "env=" *env*)) |
146 |
|
147 | ;; (do-connection |
148 | ;; {:exchange "test.exchange.name" |
149 | ;; :exchange-durable false |
150 | ;; :queue "test.queue.name" :queue-durable false} |
151 | ;; (fn [] |
152 | ;; (prn "broker=" *factory*) |
153 | ;; (prn "connection=" *connection*) |
154 | ;; (prn "env=" *env*))) |
155 |
|
156 | (defn basic-publish [#^java.util.Map params #^String message] |
157 | (do-connection |
158 | params |
159 | (fn [] |
160 | (.basicPublish |
161 | *channel* |
162 | (:exchange *env* *default-exchange*) |
163 | (:routing-key *env* *default-queue*) |
164 | (:mandatory *env* false) |
165 | (:immediate *env* false) |
166 | nil ;; (:message-properties *env* *default-message-properties*) |
167 | (.getBytes message))))) |
168 |
|
169 | ;; (basic-publish {} (format "[%s] this is my message..." (java.util.Date.))) |
170 | ;; (String. (.getBody (basic-get))) |
171 | ;; (String. (.getBody (basic-get {:queue "SimpleQueue"}))) |
172 |
|
173 | (defn basic-get |
174 | "Immediate pull, even if there are no messages waiting." |
175 | [& [#^java.util.Map params]] |
176 | (do-connection |
177 | params |
178 | (fn [] |
179 | (prn (format "calling .basicGet on *channel*=%s" *channel*)) |
180 | (.basicGet *channel* |
181 | (:queue *env* *default-queue*) |
182 | (:acknowledge *env* true))))) |
183 |
|
184 | (defn do-consume [params f] |
185 | (do-connection |
186 | params |
187 | (fn [] |
188 | (let [consumer (let [consumer (QueueingConsumer. *channel*)] |
189 | (.basicConsume *channel* (:routing-key *env* *default-queue*) consumer) |
190 | consumer)] |
191 | (f consumer))))) |
192 |
|
193 | (defn ack-delivery [consumer delivery] |
194 | (.basicAck (.getChannel consumer) |
195 | (.getDeliveryTag (.getEnvelope delivery)) |
196 | true)) |
197 |
|
198 |
|
199 | ;; (basic-publish {} (format "[%s] this is my message..." (java.util.Date.))) |
200 |
|
201 |
|
202 |
|
203 | (defn try-get |
204 | "Using QueueingConsumer, try to pull off a message, returning nil if none were ready." |
205 | [& [#^java.util.Map params]] |
206 | (do-consume |
207 | params |
208 | (fn [consumer] |
209 | (if-let [delivery (.nextDelivery consumer (:timeout *env* *default-timeout*))] |
210 | (do |
211 | (if (:acknowledge *env* true) |
212 | (ack-delivery consumer delivery)) |
213 | (String. (.getBody delivery))))))) |
214 |
|
215 | ;; (basic-publish {} (format "[%s] this is my message..." (java.util.Date.))) |
216 | ;; (try-get {:acknowledge false}) |
217 | ;; (try-get) |
218 |
|
219 |
|
220 |
|
221 | ;; (with-connection [] (.queueDelete *channel* "SimpleQueue")) |
222 | ;; (with-connection [] (.queueDelete *channel* "test.queue.name")) |
223 | ;; (with-connection [] (.exchangeDelete *channel* "test.exchange.name")) |
224 |
|
225 | ;; (with-connection [] (.queueDelete *channel* *default-queue*)) |
226 | ;; (with-connection [] (.exchangeDelete *channel* *default-exchange*)) |
227 |
|
228 | ;; (def x (make-rpc-state)) |
229 | ;; (:factory x) |
230 | ;; (:channel x) |
231 | ;; (:connection x) |
232 | ;; (shutdown-rpc-state x) |
233 |
|
234 | (defn make-rpc-server [params callbacks] |
235 | (binding [*env* (merge {:queue *default-rpc-queue*} *env* params)] |
236 | (let [factory (ConnectionFactory. (make-connection-params *env*)) |
237 | connection (.newConnection factory |
238 | (:host *env* *default-host*) |
239 | (:port *env* *default-port*)) |
240 | channel (.createChannel connection) |
241 | rpc-state {:factory factory |
242 | :connection connection |
243 | :channel channel |
244 | :env *env* } |
245 | queue (:queue (:env rpc-state)) |
246 | exchange (:exchange *env* *default-rpc-exchange*) |
247 | map-call (:map-call callbacks) |
248 | map-cast (:map-cast callbacks)] |
249 | (.exchangeDeclare channel exchange "direct" true) |
250 | (.queueDeclare channel queue true) |
251 | (.queueBind channel |
252 | queue |
253 | exchange |
254 | queue ;(:binding-key *env* *default-binding-key*) |
255 | ) |
256 | (prn (format "make-rpc-server: channel:%s queue:%s map-call:%s map-cast:%s" |
257 | channel |
258 | queue |
259 | (:map-call callbacks) |
260 | (:map-cast callbacks))) |
261 | (let [server (proxy [com.rabbitmq.client.MapRpcServer] |
262 | [channel queue] |
263 | (handleMapCall [request props] |
264 | ;;(prn (format "handleMapCall: this=%s request=%s props=%s" this request props)) |
265 | (map-call rpc-state this request props)) |
266 | (handleMapCast [request] |
267 | ;;(prn (format "handleMapCast: this=%s request=%s=%s" this request)) |
268 | (map-cast rpc-state this request)))] |
269 | (assoc rpc-state :server server))))) |
270 |
|
271 | (defn shutdown-rpc-server [state] |
272 | (try |
273 | (.close (:channel state)) |
274 | (catch Exception ex |
275 | (prn "channel close failed: " ex) |
276 | (throw ex)) |
277 | (finally |
278 | (try |
279 | (.close (:connection state)) |
280 | (catch Exception ex |
281 | (prn "connection close failed: " ex) |
282 | (throw ex)) |
283 | (finally |
284 | (try |
285 | (.close (:server state)) |
286 | (catch Exception ex |
287 | (prn "server close failed: " ex) |
288 | (throw ex)))))))) |
289 |
|
290 | (defn map-call-handler [rpc-state this request props] |
291 | (prn (format "map-call-handler: this=%s command=%s %s" |
292 | this |
293 | (.get request "command") |
294 | (if (.get request "command") |
295 | (.toString (.get request "command")) |
296 | ""))) |
297 | (if (and (.get request "command") |
298 | (= "exit" (.toString (.get request "command")))) |
299 | (.terminateMainloop this)) |
300 | {"resp" (str "you pinged: " (if (.get request "ping") |
301 | (.toString (.get request "ping")) |
302 | "*nothing*")) |
303 | "time" (java.util.Date.)}) |
304 |
|
305 |
|
306 |
|
307 | '( |
308 |
|
309 | (def rpc-server |
310 | (make-rpc-server |
311 | {};;{:queue "rpc.test"} |
312 | {:map-call map-call-handler |
313 | :map-cast (fn rpc-map-cast [rpc-state this request] |
314 | (prn (format "in the map-cast handler! rpc-state:%s this:%s request:%s" |
315 | rpc-state this request)))})) |
316 |
|
317 | (:exchange (:env rpc-server) *default-rpc-exchange*) |
318 |
|
319 | (with-connection [] (.queueDelete *channel* "com.github.kyleburton.sandbox.rabbitmq.default.rpc.queue.name")) |
320 |
|
321 | (do (.mainloop (:server rpc-server)) |
322 | (prn "mainloop returned") |
323 | (try (shutdown-rpc-server rpc-server) |
324 | (catch Exception ex |
325 | (prn "error shutting down: " ex) |
326 | (throw ex)))) |
327 |
|
328 | (.getQueueName (:server rpc-server)) |
329 |
|
330 | (defn close-rpc-client [client] |
331 | (.close (.getChannel client)) |
332 | (.close client)) |
333 |
|
334 | (def rpc-client |
335 | (RpcClient. |
336 | (.createChannel |
337 | (.newConnection (:factory rpc-server) |
338 | (:host *env* *default-host*) |
339 | (:port *env* *default-port*))) |
340 | (:exchange (:env rpc-server) *default-rpc-exchange*) |
341 | (:queue (:env rpc-server)))) |
342 |
|
343 | (.getExchange rpc-client) |
344 |
|
345 | (let [res (.mapCall rpc-client {"ping" "value2"})] |
346 | (prn (format "returned: %s" res))) |
347 |
|
348 | (let [res (.mapCall rpc-client {"command" "exit"})] |
349 | (prn (format "returned: %s" res))) |
350 |
|
351 | (close-rpc-client rpc-client) |
352 |
|
353 | ) |
354 |
|
355 | ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; |
356 | ;; (def *rpc-server-channel-key* "rpc.channel.key") |
357 | ;; (def *rpc-server-connection* (.newConnection *amqp-factory* *default-host* *default-port*)) |
358 | ;; (def *rpc-server-channel* |
359 | ;; (let [channel (.createChannel *rpc-server-connection*)] |
360 | ;; (.exchangeDeclare channel "rpc.exchange" "direct" true) |
361 | ;; (.queueDeclare channel *rpc-server-channel-key* true) |
362 | ;; channel)) |
363 |
|
364 | ;; (def *rpc-server* |
365 | ;; (let [server (proxy [com.rabbitmq.client.MapRpcServer] |
366 | ;; [*rpc-server-channel* *rpc-server-channel-key*] |
367 | ;; (handleMapCall [request props] |
368 | ;; (prn "handle-map-call! req=" request) |
369 | ;; (if (.get request "exit") |
370 | ;; (do |
371 | ;; (prn "exiting...") |
372 | ;; (.terminateMainloop this) |
373 | ;; (.close this))) |
374 | ;; {"resp" "I hunger!"}))] |
375 | ;; (.queueDeclare (.getChannel server) *rpc-server-channel-key*) |
376 | ;; server)) |
377 |
|
378 |
|
379 |
|
380 | ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; |
381 | (comment |
382 |
|
383 | ;; (consume-message) |
384 |
|
385 | ;; play w/the rpc client/server |
386 |
|
387 | (def *rpc-server-channel-key* "rpc-test") |
388 | (def *rpc-server-connection* (.newConnection *factory* *rabbit-host-name* *rabbit-port*)) |
389 | (def *rpc-server-channel* (open-channel *rpc-server-connection* *default-exchange-name* *rpc-server-channel-key*)) |
390 | (def *rpc-server* |
391 | (let [server (proxy [com.rabbitmq.client.MapRpcServer] |
392 | [*rpc-server-channel* *rpc-server-channel-key*] |
393 | (handleMapCall [request props] |
394 | (prn "handle-map-call! req=" request) |
395 | (if (.get request "exit") |
396 | (do |
397 | (prn "exiting...") |
398 | (.terminateMainloop this) |
399 | (.close this))) |
400 | {"resp" "I hunger!"}))] |
401 | (.queueDeclare (.getChannel server) *rpc-server-channel-key*) |
402 | server)) |
403 |
|
404 |
|
405 | (def *rpc-client-connection* (.newConnection *factory* *rabbit-host-name* *rabbit-port*)) |
406 |
|
407 | (def *rpc-client-channel* (open-channel *rpc-client-connection* *default-exchange-name* *rpc-server-channel-key*)) |
408 |
|
409 | (def *rpc-client* |
410 | (RpcClient. *rpc-client-channel* "" *rpc-server-channel-key*)) |
411 |
|
412 | '( |
413 |
|
414 | (do (.mainloop *rpc-server*) |
415 | (prn "mainloop returned")) |
416 |
|
417 | ;; (.getQueueName *rpc-server*) |
418 | ;; (.close *rpc-server*) |
419 |
|
420 | (let [res (.mapCall *rpc-client* {"ping" "value2"})] |
421 | (prn (format "returned: %s" res))) |
422 |
|
423 | (let [res (.mapCall *rpc-client* {"exit" "value2"})] |
424 | (prn (format "returned: %s" res))) |
425 |
|
426 | ;; (.getReplyQueue *rpc-client*) |
427 | ;; (.close *rpc-client*) |
428 | |
429 |
|
430 | ) |
431 |
|
432 | ) |
433 |
|
434 |
|