SPARK-SOLACE Connector, Writing to Solace in a Distributed way using Spark.

Solace is a queueing mechanism following the pub-sub(publisher-subscriber ) pattern. A producer can publish events/messages to a solace Queue/Topic and the consumer can subscribe to a topic or consume a queue as per the requirement.
The problem was to send events to Solace from HDFS (Hadoop Distributed File System) in a distributed fashion along with adhering to some other business alignments. There are multiple ways to publish and consume data from Solace. One way is via Rest Services which is pretty straight forward if we follow Solace’s Documentation. If you have millions of rows which are distributed as blocks in an HDFS cluster (or some other underlying structure as the idea is generic and can be applied to other file systems such as Amazon S3) and if you want to send data in a distributed way to Solace you can leverage Solace JMS-API to publish data to Solace’s Queue or Topic.


The Architecture was to open a connection object to Solace’s Broker per partition of Spark Executor and send rows of each spark partition either as individual events or as an Array of Json’s by clubbing all rows of a single Spark Partition to Solace’s Queue or Topic as one event.


Before Executing the following code, you can repartition your Spark Dataset based on your custom logic or integral value to create even size partitions, avoiding skewed barriers of one kind of data residing in one partition. Spark’s dataset API gives a way to write custom code at the partition level.

//Dataset.forEachPartition( row-> {//custom code});

So I decided to make a connection to Solace JMS API, you can make this connection either using JNDI information if you have it available, or you can make a dynamic connection. The Solace JMS API needs VPN-NAME, HOST, USERNAME, and PASSWORD to create a connection to the Solace broker.

Depending on your requirement you can create a topic or a queue and using the standard JMS Api’s you can publish the records from record iterator (either individually or bulk) to the message producer.

The code looks something like this.

Dataset.foreachPartition( rowIterator->{

SolConnectionFactory factory= SolJmsUtitlity.createConnectionFactory();





Connection connection= factory.createConnection();

Session session= connection.createSession();

Topic topicName = session.createTopic(“myTopic”);

MessageProducer msgP=session.createProducer(T);

TextMessage msg=session.createTextMessage( //you can also custom handle how you want to create text message from your dataframe’s row object

msgP.send(topicName,msg,DeliveryMode ….);

//conn.close //explained later why it’s commented



You can use the spark’s data frame API to write your own connector to Solace. All you need to do is make a class named “Default Source” and implement DataSource Register and CreatableRelationProvider and implement its method createRelation. So, the above code goes into the implementation of createRelation method. To understand more on how to use Data Source API:

So if our java package name is com.connector.harjeet.solace Spark will look for the class DefaultSource in this package, and you can use it to call you custom connector which will send data to Solace in a distributed way.

//some spark code. creating spark session , data frame etc


Tip: You can make the host, VPN-name, etc. configurable and pass it into options in the API call to save.


Let’s talk about performance. Each partition is creating a connection object to Solace, so if your spark’s data frame has thousands of partitions that would create that many connection objects. One thing we can do is to make connection singleton so that limits the no. of connection objects to an upper limit of the number of executors you give to your spark job which can be 10–20. But it creates another problem we’ll have to let connection remain open (so that one partition does not close the link for other partitions in that executor, I commented the code earlier of -> //conn.close). Solace’s developers/community suggestion is to keep the connection open. Let Solace itself close it at its end because creating too many relationships is a costly operation at the broker’s end. There are other performance-boosting tricks like committing all transactions in a single commit and combine all rows of a separate partition into one event and send it in one go. For more details, please check out the below links.



Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store