Apache NiFi

Sunday May 21, 2017

Record-Oriented Data with NiFi

Record-Oriented Data with NiFi

Mark Payne -  @dataflowmark

Intro - The What

Apache NiFi is being used by many companies and organizations to power their data distribution needs. One of NiFi's strengths is that the framework is data agnostic. It doesn't care what type of data you are processing. There are processors for handling JSON, XML, CSV, Avro, images and video, and several other formats. There are also several general-purposes processors, such as RouteText and CompressContent. Data can be hundreds of bytes or many gigabytes. This makes NiFi a powerful tool for pulling data from external sources; routing, transforming, and aggregating it; and finally delivering it to its final destinations.

While this ability to handle any arbitrary data is incredibly powerful, we often see users working with record-oriented data. That is, large volumes of small "records," or "messages," or "events." Everyone has their own way to talk about this data, but we all mean the same thing. Lots of really small, often structured, pieces of information. This data comes in many formats. Most commonly, we see CSV, JSON, Avro, and log data.

While there are many tasks that NiFi makes easy, there are some common tasks that we can do better with. So in version 1.2.0 of NiFi, we released a new set of Processors and Controller Services, for working with record-oriented data. The new Processors are configured with a Record Reader and a Record Writer Controller Service. There are readers for JSON, CSV, Avro, and log data. There are writers for JSON, CSV, and Avro, as well as a writer that allows users to enter free-form text. This can be used to write data out in a log format, like it was read in, or any other custom textual format.

The Why

These new processors make building flows to handle this data simpler. It also means that we can build processors that accept any data format without having to worry about the parsing and serialization logic. Another big advantage of this approach is that we are able to keep FlowFiles larger, each consisting of multiple records, which results in far better performance.

The How - Explanation

In order to make sense of the data, Record Readers and Writers need to know the schema that is associated with the data. Some Readers (for example, the Avro Reader) allow the schema to be read from the data itself. The schema can also be included as a FlowFile attribute. Most of the time, though, it will be looked up by name from a Schema Registry. In this version of NiFi, two Schema Registry implementations exist: an Avro-based Schema Registry service and a client for an external Hortonworks Schema Registry.

Configuring all of this can feel a bit daunting at first. But once you've done it once or twice, it becomes rather quick and easy to configure. Here, we will walk through how to set up a local Schema Registry service, configure a Record Reader and a Record Writer, and then start putting to use some very powerful processors. For this post, we will keep the flow simple. We'll ingest some CSV data from a file and then use the Record Readers and Writers to transform the data into JSON, which we will then write to a new directory.

The How - Tutorial

In order to start reading and writing the data, as mentioned above, we are going to need a Record Reader service. We want to parse CSV data and turn that into JSON data. So to do this, we will need a CSV Reader and a JSON Writer. So we will start by clicking the "Settings" icon on our Operate palette. This will allow us to start configuring our Controller Services. When we click the 'Add' button in the top-right corner, we see a lot of options for Controller Services:

Since we know that we want to read CSV data, we can type "CSV" into our Filter box in the top-right corner of the dialog, and this will narrow down our choices pretty well:

We will choose to add the CSVReader service and then configure the Controller Service. In the Properties tab, we have a lot of different properties that we can set:

Fortunately, most of these properties have default values that are good for most cases, but you can choose which delimiter character you want to use, if it's not a comma. You can choose whether or not to skip the first line, treating it as a header, etc. For my case, I will set the "Skip Header Line" property to "true" because my data contains a header line that I don't want to process as a record. The first properties are very important though. The "Schema Access Strategy" is used to instruct the reader on how to obtain the schema. By default, it is set to "Use String Fields From Header." Since we are also going to be writing the data, though, we will have to configure a schema anyway. So for this demo, we will change this strategy to "Use 'Schema Name' Property." This means that we are going to lookup the schema from a Schema Registry. As a result, we are now going to have to create our Schema Registry. If we click on the "Schema Registry" property, we can choose to "Create new service...":

We will choose to create an AvroSchemaRegistry. It is important to note here that we are reading CSV data and writing JSON data - so why are we using an Avro Schema Registry? Because this Schema Registry allows us to convey the schema using the Apache Avro Schema format, but it does not imply anything about the format of the data being read. The Avro format is used because it is already a well-known way of storing data schemas.

Once we've added our Avro Schema Registry, we can configure it and see in the Properties tab that it has no properties at all. We can add a schema by adding a new user-defined property (by clicking the 'Add' / 'Plus' button in the top-right corner). We will give our schema the name "demo-schema" by using this as the name of the property. We can then type or paste in our schema. For those unfamiliar with Avro schemas, it is a JSON formatted representation that has a syntax like the following:

  "name": "recordFormatName",
  "namespace": "nifi.examples",
  "type": "record",
  "fields": [
    { "name": "id", "type": "int" },
    { "name": "firstName", "type": "string" },
    { "name": "lastName", "type": "string" },
    { "name": "email", "type": "string" },
    { "name": "gender", "type": "string" }

Here, we have a simple schema that is of type "record." This is typically the case, as we want multiple fields. We then specify all of the fields that we have. There is a field named "id" of type "int" and all other fields are of type "string." See the Avro Schema documentation for more information. We've now configured our schema! We can enable our Controller Services now.

We can now add our JsonRecordSetWriter controller service as well. When we configure this service, we see some familiar options for indicating how to determine the schema. For the "Schema Access Strategy," we will again use the "Use 'Schema Name' Property," which is the default. Also note that the default value for the "Schema Name" property uses the Expression Language to reference an attribute named "schema.name". This provides a very nice flexibility, because now we can re-use our Record Readers and Writers and simply convey the schema by using an UpdateAttribute processor to specify the schema name. No need to keep creating Record Readers and Writers. We will set the "Schema Registry" property to the AvroSchemaRegistry that we just created and configured.

Because this is a Record Writer instead of a Record Reader, we also have another interesting property: "Schema Write Strategy." Now that we have configured how to determine the data's schema, we need to tell the writer how to convey that schema to the next consumer of the data. The default option is to add the name of the schema as an attribute. We will accept the default. But we could also write the entire schema as a FlowFile attribute or use some strategies that are useful for interacting with the Hortonworks Schema Registry.

Now that we've configured everything, we can apply the settings and start our JsonRecordSetWriter as well. We've now got all of our Controller Services setup and enabled:

Now for the fun part of building our flow! The above will probably take about 5 minutes, but it makes laying out the flow super easy! For our demo, we will have a GetFile processor bring data into our flow. We will use UpdateAttribute to add a "schema.name" attribute of "demo-schema" because that is the name of the schema that we configured in our Schema Registry:

We will then use the ConvertRecord processor to convert the data into JSON. Finally, we want to write the data out using PutFile:

We still need to configure our ConvertRecord processor, though. To do so, all that we need to configure in the Properties tab is the Record Reader and Writer that we have already configured:

Now, starting the processors, we can see the data flowing through our system!

Also, now that we have defined these readers and writers and the schema, we can easily create a JSON Reader, and an Avro Writer, for example. And adding additional processors to split the data up, query and route the data becomes very simple because we've already done the "hard" part.


In version 1.2.0 of Apache NiFi, we introduced a handful of new Controller Services and Processors that will make managing dataflows that process record-oriented data much easier. I fully expect that the next release of Apache NiFi will have several additional processors that build on this. Here, we have only scratched the surface of the power that this provides to us. We can delve more into how we are able to transform and route the data, split the data up, and migrate schemas. In the meantime, each of the Controller Services for reading and writing records and most of the new processors that take advantage of these services have fairly extensive documentation and examples. To see that information, you can right-click on a processor and click "Usage" or click the "Usage" icon on the left-hand side of your controller service in the configuration dialog. From there, clicking the "Additional Details..." link in the component usage will provide pretty significant documentation. For example, the JsonTreeReader provides a wealth of knowledge in its Additional Details.



is there an example for converting an avro file to csv ?

Posted by muiler on May 24, 2017 at 08:31 AM GMT #

Hi @muiler. I don't have a specific example of that handy. It should very much follow the same pattern above, though. The only downside currently is that if you have your schema in the Avro data, the Avro Reader is able to make use of it, but the CSV Writer won't be able to, so you'll still need the schema defined in the Schema Registry. There's a JIRA that will address this so that we can just pass the schema through to the writer. So you'd still need the Schema Registry service, and you'll need an Avro Reader service and a CSV Writer service. Then you can use ConvertRecord the same as above.

Posted by Mark Payne on May 24, 2017 at 12:14 PM GMT #

how to convert xml to json using nifi?

Posted by leena on January 30, 2018 at 09:22 PM GMT #

how to convert xml to json using nifi?

Posted by leena on January 30, 2018 at 09:23 PM GMT #

Great article! Does ConvertRecord support flowfiles with multiple records (currently using NiFi 1.5)? Example: Incoming flowfile has 5 AVROs split up by newlines. Can I use ConvertRecord to convert these AVROs to JSONs? Or do I need to first split the flowfile up using SplitText (split by newline) and then past each single AVRO flowfile to ConvertRecord? If not, this seems to be a major bottleneck (compared to spark-streaming) when parsing data. Appreciate any feedback, thanks, Joey

Posted by Joey Wilkinson on August 01, 2018 at 12:05 AM GMT #

@Joey - thanks for the feedback! One of the principle goals of the Record-oriented processors is to make it easier to deal with FlowFiles that contain many records instead of needing to split the data up. So ConvertRecord and all other Record-oriented processors do support a single FlowFile with many AVRO records, many JSON records, etc. The AVRO Reader, though, will expect properly formed AVRO. So I don't know that you can have newline delimiters in between AVRO Records. If memory serves, I think it expects the records to be concatenated together. But it uses the Avro Data Reader, so as long as the Avro Reader is able to read your data, then you should be good!

Posted by Mark Payne on August 01, 2018 at 02:32 PM GMT #

@Mark - Thanks for pointing that out, figured I was doing something wrong. I removed the newlines between my records and data is flowing now! Thanks appreciate your time.

Posted by Joey Wilkinson on August 06, 2018 at 03:28 PM GMT #

Is there a way to register multiple different CSV and AVRO schemas? We have a parser that generates output for 300 tables, so it outputs the JSON for that table along with the table and we split the output into separate flow files, then convert from CSV to AVRO. Was wondering if there is a way to provide the JSON / AVRO schema to the Controller Service from flow file attributes. My guess is no, but surprise me.

Posted by Jeff Watson on August 08, 2018 at 03:33 PM GMT #

@Jeff - sure! The Record Readers have a "Schema Access Strategy." If you set that to "Use Schema Text Property" then the schema comes from whatever value you set in the "Schema Text" property. The default value is "${avro.schema}" which means "use the avro.schema attribute as the schema." You can reference any attribute you'd like there. For the Record Writer, the "Schema Access Strategy" will most often just be "Inherit From Record." This will cause the Writer to use whatever schema the reader uses. Hopefully this all makes sense?

Posted by Mark Payne on August 08, 2018 at 03:38 PM GMT #

Is there a way to read csv file process it record by recored and convert records into json, wait untill several records have been processed and write to database, inshort batching records in nifi with generating batch ids and correlations ids.. eg batches of 1000 records

Posted by Ankita on August 16, 2018 at 08:17 PM GMT #

@Ankita - yes, you can batch together using the MergeRecord processor. It will allow you to aggregate up to some configurable amount of data and then merge the 'like data' together, into a single FlowFile. So you could, for example, set min records = 800 and max records = 1000. Then, once you have your batch of Records, you can use PutDatabaseRecord to push them into a database.

Posted by Mark on September 25, 2018 at 05:28 PM GMT #

ConvertRecord with JsonRecordSetWriter generates array of JSON objects. Any clues on how to generate a single JSON object instead?

Posted by Himanshu on September 27, 2018 at 11:19 AM GMT #

@Himanshu - the JsonRecordSetWriter has a property named "Output Grouping" that can be set to "One Line Per Object." That should give you what you need. I believe this was added in version 1.7.0.

Posted by Mark on September 27, 2018 at 12:38 PM GMT #

Bonjour, Et encore merci pour l'éclairage, ça m'aide pour le projet

Posted by Jean-Denis TSATI on October 03, 2018 at 01:06 PM GMT #

Thanks, it's nicely explained!

Posted by Abhijeet Kumar on January 15, 2019 at 08:23 AM GMT #

how to convert xml to json using nifi?

Posted by film izle on June 29, 2019 at 08:10 PM GMT #

@film_izle you can use the ConvertRecord processor with an XML Reader and a JSON Writer.

Posted by Mark on July 01, 2019 at 03:32 PM GMT #

Hey, I cannot understand the meaning of Avro schema.Can anyone help me out?

Posted by Shrey Mittal on October 07, 2019 at 09:16 AM GMT #

@Shrey - Apache Avro is a data serialization format, similar to JSON, XML, etc. It is a binary format, though, which embeds the schema in the data itself. The schema itself is specified in JSON, though. You can learn more about the schema syntax from https://avro.apache.org/docs/current/spec.html

Posted by Mark on October 07, 2019 at 01:50 PM GMT #

Post a Comment:
Comments are closed for this entry.