SPARK-SOLACE Connector, Writing to Solace in a Distributed way using Spark.

solace running in docker on the local system

Solace is a queueing mechanism following the pub-sub(publisher-subscriber ) pattern. A producer can publish events/messages to a solace Queue/Topic and the consumer can subscribe to a topic or consume a queue as per the requirement.
The problem was to send events to Solace from HDFS (Hadoop Distributed File System) in a distributed fashion along with adhering to some other business alignments. There are multiple ways to publish and consume data from Solace. One way is via Rest Services which is pretty straight forward if we follow Solace’s Documentation. If you have millions of rows which are distributed as blocks in an HDFS cluster (or some other underlying structure as the idea is generic and can be applied to other file systems such as Amazon S3) and if you want to send data in a distributed way to Solace you can leverage Solace JMS-API to publish data to Solace’s Queue or Topic.

Design:

The Architecture was to open a connection object to Solace’s Broker per partition of Spark Executor and send rows of each spark partition either as individual events or as an Array of Json’s by clubbing all rows of a single Spark Partition to Solace’s Queue or Topic as one event.

Approach:

Before Executing the following code, you can repartition your Spark Dataset based on your custom logic or integral value to create even size partitions, avoiding skewed barriers of one kind of data residing in one partition. Spark’s dataset API gives a way to write custom code at the partition level.

//Dataset.forEachPartition( row-> {//custom code});

So I decided to make a connection to Solace JMS API, you can make this connection either using JNDI information if you have it available, or you can make a dynamic connection. The Solace JMS API needs VPN-NAME, HOST, USERNAME, and PASSWORD to create a connection to the Solace broker.

Depending on your requirement you can create a topic or a queue and using the standard JMS Api’s you can publish the records from record iterator (either individually or bulk) to the message producer.

The code looks something like this.

Usage:

You can use the spark’s data frame API to write your own connector to Solace. All you need to do is make a class named “Default Source” and implement Register and and implement its method . So, the above code goes into the implementation of method. To understand more on how to use Data Source API: http://sparkdatasourceapi.blogspot.com/2016/10/spark-data-source-api-write-custom.html

So if our java package name is com.connector.harjeet.solace Spark will look for the class in this package, and you can use it to call you custom connector which will send data to Solace in a distributed way.

Tip: You can make the host, VPN-name, etc. configurable and pass it into options in the API call to save.

Performance:

Let’s talk about performance. Each partition is creating a connection object to Solace, so if your spark’s data frame has thousands of partitions that would create that many connection objects. One thing we can do is to make connection singleton so that limits the no. of connection objects to an upper limit of the number of executors you give to your spark job which can be 10–20. But it creates another problem we’ll have to let connection remain open (so that one partition does not close the link for other partitions in that executor, I commented the code earlier of -> //conn.close). Solace’s developers/community suggestion is to keep the connection open. Let Solace itself close it at its end because creating too many relationships is a costly operation at the broker’s end. There are other performance-boosting tricks like committing all transactions in a single commit and combine all rows of a separate partition into one event and send it in one go. For more details, please check out the below links.

https://solace.community/discussion/248/writing-to-solace-with-spark#latest
https://solace.community/discussion/257/clarity-needed-on-some-core-concepts#latest
https://solace.com/products/event-broker/software/getting-started/
http://sparkdatasourceapi.blogspot.com/2016/10/spark-data-source-api-write-custom.html

Data Engineer 3 at Mongo DB