Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java Interop -- Netty + Clojure

Tags:

clojure

netty

I'm trying to use netty via clojure. I'm able to startup the server, however, it fails to initialize an accepted socket. Below are the error message and code respectively. Does anyone know what is/or could be wrong? I believe the issue is with (Channels/pipeline (server-handler)) Thanks.

Error Message

#<NioServerSocketChannel [id: 0x01c888d9, /0.0.0.0:843]>
Jun 6, 2012 12:15:35 PM org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink
WARNING: Failed to initialize an accepted socket.
java.lang.IllegalArgumentException: No matching method found: pipeline

project.clj

(defproject protocol "1.0.0-SNAPSHOT"
  :description "Upload Protocol Server"
  :dependencies [
    [org.clojure/clojure "1.2.1"]
    [io.netty/netty "3.4.5.Final"]])

core.clj

(ns protocol.core
    (:import (java.net InetSocketAddress)
             (java.util.concurrent Executors)
             (org.jboss.netty.bootstrap ServerBootstrap)
             (org.jboss.netty.channel Channels ChannelPipelineFactory SimpleChannelHandler)
             (org.jboss.netty.channel.socket.nio NioServerSocketChannelFactory)
             (org.jboss.netty.buffer ChannelBuffers)))

(def policy
    "<content>Test</content>")


(defn server-handler
    "Returns netty handler."
    []
    (proxy [SimpleChannelHandler] []
        (messageReceived [ctx e]
            (let [ch (.getChannel e)]
                (.write ch policy)
                (.close ch)))

        (channelConnected [ctx e]
            (let [ch (.getChannel e)]
                (.write ch policy)
                (.close ch)))

        (exceptionCaught [ctx e]
            (let [ex (.getCause e)]
                (println "Exception" ex)
                (-> e .getChannel .close)))))

(defn setup-pipeline
    "Returns channel pipeline."
    []
    (proxy [ChannelPipelineFactory] []
        (getPipeline []
            (Channels/pipeline (server-handler)))))

(defn startup
    "Starts netty server."
    [port]
    (let [channel-factory (NioServerSocketChannelFactory. (Executors/newCachedThreadPool) (Executors/newCachedThreadPool))
          bootstrap (ServerBootstrap. channel-factory)]
        (.setPipelineFactory bootstrap (setup-pipeline))
        (.setOption bootstrap "child.tcpNoDelay" true)
        (.setOption bootstrap "child.keepAlive" true)
        (.bind bootstrap (InetSocketAddress. port))))
like image 883
Ari Avatar asked Jun 06 '12 16:06

Ari


1 Answers

There are three problems with your code

  1. Java interop with vararg Channels.channel() method. you can make a vector of channel handlers and wrap it with (into-array ChannelHandler ..)

  2. You can not write String objects directly to a Netty Channel. you have to write the string to a ChannelBuffer first and write that buffer or use a StringCodecHandler.

  3. Writing to Netty channel is asynchronus, so you can not close it immediately. you have to register a future listener and close the channel when its done.

Here is the working code.

  (ns clj-netty.core
   (:import (java.net InetSocketAddress)
            (java.util.concurrent Executors)
            (org.jboss.netty.bootstrap ServerBootstrap)
            (org.jboss.netty.buffer ChannelBuffers)
            (org.jboss.netty.channel Channels ChannelFutureListener ChannelHandler ChannelPipelineFactory SimpleChannelHandler)
           (org.jboss.netty.channel.socket.nio NioServerSocketChannelFactory)
           (org.jboss.netty.buffer ChannelBuffers)))


(def policy
  (ChannelBuffers/copiedBuffer
    (.getBytes "<content>Test</content>")))


(defn server-handler
  "Returns netty handler."
  []
  (proxy [SimpleChannelHandler] []
    (messageReceived [ctx e]
      (let [ch (.getChannel e)]
        (.addListener
          (.write ch policy)
          (ChannelFutureListener/CLOSE))))

    (channelConnected [ctx e]
      (let [ch (.getChannel e)]
        (.addListener
          (.write ch policy)
          (ChannelFutureListener/CLOSE))))

    (exceptionCaught [ctx e]
      (let [ex (.getCause e)]
        (println "Exception" ex)
        (-> e .getChannel .close)))))

(defn setup-pipeline
  "Returns channel pipeline."
  []
  (proxy [ChannelPipelineFactory] []
    (getPipeline []
      (let [handler (server-handler)]
        (Channels/pipeline (into-array ChannelHandler [handler]))))))



 (defn startup
      "Starts netty server."
      [port]
      (let [channel-factory (NioServerSocketChannelFactory. (Executors/newCachedThreadPool) (Executors/newCachedThreadPool))
            bootstrap (ServerBootstrap. channel-factory)]
        (.setPipelineFactory bootstrap (setup-pipeline))
        (.setOption bootstrap "child.tcpNoDelay" true)
        (.setOption bootstrap "child.keepAlive" true)
        (.bind bootstrap (InetSocketAddress. port))))

Have a look at Aleph (also uses Netty) which can used to build clients and servers in many different protocols with nice Clojure API.

like image 140
Jestan Nirojan Avatar answered Nov 06 '22 22:11

Jestan Nirojan