Processing Data from MQ with Spark Streaming: Part 4 - Implementation details for Spark MQ Connector
This is a multi-part series that provides information on messaging, including fault-tolerance techniques, and provide instruction and code to implement a connection between IBM MQ (formerly WebSphere MQ) and Spark Streaming.
Part 1 — Introduction to Messaging, JMS & MQ
Part 2 — Brief Discussion on Apache Spark Streaming and Use-cases
Part 3 — Reliable Delivery & Recovery Techniques with Spark Streaming
Part 4 — Implementation details for Spark MQ Connector
Part 4 — Implementation details for Spark MQ Connector
To enable WebSphere MQ users to process messages with Apache Spark, a Spark MQ Receiver was developed. The code is available on GitHub spark-mq-receiver with notes on dependencies and instructions for building. The code is an extension of JMS Spark Receiver by tfbenet. The JMS Spark Receiver has both an asynchronous and synchronous receiver. As reliability was required for the purpose of this code development, only an example of SynchronousJmsReceiver is provided.
There are two files that I will discuss in this blog. The first is an example program, MQReceiverExample.scala, that creates the Spark Streaming Context and is where a user defines the operation to perform on each incoming stream of data. The second,
MQJmsStreamUtils.scala, it the utilities file containing the MQConsumerFactory class which contains information on how to make an MQ consumer and connection.
Example Code: MQReceiverExample.scala
Below is a diagram showing the workflow of the example code.
The simple example code expects that the user passes the host name, port number, queue manager name and queue name as parameters when starting the session. There is a placeholder variable that needs to be set for the location of the checkpoint directory.
The method “getOrCreate” checks the checkpoint directory for metadata to restart a Spark Streaming Context.
When the program is being started for the first time, it will find the checkpoint directory empty. It will then call the method “functionToCreateContext”, also defined in the example code, forwarding the MQ parameters.
When the program is being restarted after a failure, it will re-create the StreamingContext from the metadata in the checkpoint directory and complete operations no any unfinished batches. Then it will continue to process any new messages it receives.
The Streaming Context is started calling “start()” and “awaitTermination()” means the session remains active until the user sends a termination signal (e.g. CTRL+C).
As the name suggestion, “functionToCreateContext” provides all the information on how to start the Streaming Context and what operations to be to be performed on the incoming data. All this information is saved in the checkpoint directory in case of a failure. To enable this checkpointing, “writeAheadLog” must be enabled and the checkpointing directory to be passed the the Context’s “checkpoint” method.
There are many ways that credentials can be passed but for simplicity this has just been hard coded in this example.
When a message is returned from the service, it is a Message Object. We are converting this to type String to allow for easier processing.
We are creating a synchronous JMS queue stream using the utilities file from tbfenet using client acknowledgement to ensure the message is received once and only once. The method to create this stream requires the Spark Streaming Context, the and a consumer factory and associated parameters, in our case the MQConsumerFactory.
Finally, this returns a ReceiverInputDStream which we can call “foreachRDD” and process the messages as required. Each RDD is a collection of messages pulled off the queue (if there was more than one), not each individual message. However, if converting to a DataFrame, each row will correspond to an individual message as expected. Within this block, all parsing (e.g. from CSV or XML), transforming (e.g. aggregation, filtering) and saving (to HBase, Hive, HDFS) operations are required.
MQ JMS Utilities Code: MQJmsStreamUtils.scala
This code defines the class MQConsumerFactory which is an extension of MessageConsumerFactory and provides the required information to connect to create a MQ consumer and connect to an MQ queue.
There are a few required libraries to note. As MQ is an implementation of JMS, we require the JMS library. The appropriate Spark libraries for logging and streaming, including JMS are imported also. In addition, IBM MQ libraries are required, these are available from IBM Fix Central and the GitHub Readme file provides instructions on acquiring these.
The properties to connect to MQ are expected, there are two classes on GitHub depending if channel name is passed. No example is provided for the latter but this is simply done by adding the channel name as an additional parameter.
There are two methods that need to be overwritten when defining a new consumer factory class. The first is “makeConsumer”, here we create a new MQQueue and pass it to the session’s “createConsumer” function. The second is “makeConnection”, which is where we create a new MQQueueConnection setting the required attributes such as queue name, port number, etc.
Final Remarks
I hope you found this series on Processing Data from MQ with Spark Streaming to be valuable. I’m consolidating and sharing the knowledge I gained implementing this solution in the hope that it is useful to someone else!