Apache NiFi

Sunday May 21, 2017

Record-Oriented Data with NiFi

Record-Oriented Data with NiFi

Mark Payne -  @dataflowmark


Intro - The What

Apache NiFi is being used by many companies and organizations to power their data distribution needs. One of NiFi's strengths is that the framework is data agnostic. It doesn't care what type of data you are processing. There are processors for handling JSON, XML, CSV, Avro, images and video, and several other formats. There are also several general-purposes processors, such as RouteText and CompressContent. Data can be hundreds of bytes or many gigabytes. This makes NiFi a powerful tool for pulling data from external sources; routing, transforming, and aggregating it; and finally delivering it to its final destinations.

While this ability to handle any arbitrary data is incredibly powerful, we often see users working with record-oriented data. That is, large volumes of small "records," or "messages," or "events." Everyone has their own way to talk about this data, but we all mean the same thing. Lots of really small, often structured, pieces of information. This data comes in many formats. Most commonly, we see CSV, JSON, Avro, and log data.

While there are many tasks that NiFi makes easy, there are some common tasks that we can do better with. So in version 1.2.0 of NiFi, we released a new set of Processors and Controller Services, for working with record-oriented data. The new Processors are configured with a Record Reader and a Record Writer Controller Service. There are readers for JSON, CSV, Avro, and log data. There are writers for JSON, CSV, and Avro, as well as a writer that allows users to enter free-form text. This can be used to write data out in a log format, like it was read in, or any other custom textual format.

The Why

These new processors make building flows to handle this data simpler. It also means that we can build processors that accept any data format without having to worry about the parsing and serialization logic. Another big advantage of this approach is that we are able to keep FlowFiles larger, each consisting of multiple records, which results in far better performance.

The How - Explanation

In order to make sense of the data, Record Readers and Writers need to know the schema that is associated with the data. Some Readers (for example, the Avro Reader) allow the schema to be read from the data itself. The schema can also be included as a FlowFile attribute. Most of the time, though, it will be looked up by name from a Schema Registry. In this version of NiFi, two Schema Registry implementations exist: an Avro-based Schema Registry service and a client for an external Hortonworks Schema Registry.

Configuring all of this can feel a bit daunting at first. But once you've done it once or twice, it becomes rather quick and easy to configure. Here, we will walk through how to set up a local Schema Registry service, configure a Record Reader and a Record Writer, and then start putting to use some very powerful processors. For this post, we will keep the flow simple. We'll ingest some CSV data from a file and then use the Record Readers and Writers to transform the data into JSON, which we will then write to a new directory.

The How - Tutorial

In order to start reading and writing the data, as mentioned above, we are going to need a Record Reader service. We want to parse CSV data and turn that into JSON data. So to do this, we will need a CSV Reader and a JSON Writer. So we will start by clicking the "Settings" icon on our Operate palette. This will allow us to start configuring our Controller Services. When we click the 'Add' button in the top-right corner, we see a lot of options for Controller Services:

Since we know that we want to read CSV data, we can type "CSV" into our Filter box in the top-right corner of the dialog, and this will narrow down our choices pretty well:

We will choose to add the CSVReader service and then configure the Controller Service. In the Properties tab, we have a lot of different properties that we can set:

Fortunately, most of these properties have default values that are good for most cases, but you can choose which delimiter character you want to use, if it's not a comma. You can choose whether or not to skip the first line, treating it as a header, etc. For my case, I will set the "Skip Header Line" property to "true" because my data contains a header line that I don't want to process as a record. The first properties are very important though. The "Schema Access Strategy" is used to instruct the reader on how to obtain the schema. By default, it is set to "Use String Fields From Header." Since we are also going to be writing the data, though, we will have to configure a schema anyway. So for this demo, we will change this strategy to "Use 'Schema Name' Property." This means that we are going to lookup the schema from a Schema Registry. As a result, we are now going to have to create our Schema Registry. If we click on the "Schema Registry" property, we can choose to "Create new service...":

We will choose to create an AvroSchemaRegistry. It is important to note here that we are reading CSV data and writing JSON data - so why are we using an Avro Schema Registry? Because this Schema Registry allows us to convey the schema using the Apache Avro Schema format, but it does not imply anything about the format of the data being read. The Avro format is used because it is already a well-known way of storing data schemas.

Once we've added our Avro Schema Registry, we can configure it and see in the Properties tab that it has no properties at all. We can add a schema by adding a new user-defined property (by clicking the 'Add' / 'Plus' button in the top-right corner). We will give our schema the name "demo-schema" by using this as the name of the property. We can then type or paste in our schema. For those unfamiliar with Avro schemas, it is a JSON formatted representation that has a syntax like the following:

{
  "name": "recordFormatName",
  "namespace": "nifi.examples",
  "type": "record",
  "fields": [
    { "name": "id", "type": "int" },
    { "name": "firstName", "type": "string" },
    { "name": "lastName", "type": "string" },
    { "name": "email", "type": "string" },
    { "name": "gender", "type": "string" }
  ]
}

Here, we have a simple schema that is of type "record." This is typically the case, as we want multiple fields. We then specify all of the fields that we have. There is a field named "id" of type "int" and all other fields are of type "string." See the Avro Schema documentation for more information. We've now configured our schema! We can enable our Controller Services now.

We can now add our JsonRecordSetWriter controller service as well. When we configure this service, we see some familiar options for indicating how to determine the schema. For the "Schema Access Strategy," we will again use the "Use 'Schema Name' Property," which is the default. Also note that the default value for the "Schema Name" property uses the Expression Language to reference an attribute named "schema.name". This provides a very nice flexibility, because now we can re-use our Record Readers and Writers and simply convey the schema by using an UpdateAttribute processor to specify the schema name. No need to keep creating Record Readers and Writers. We will set the "Schema Registry" property to the AvroSchemaRegistry that we just created and configured.

Because this is a Record Writer instead of a Record Reader, we also have another interesting property: "Schema Write Strategy." Now that we have configured how to determine the data's schema, we need to tell the writer how to convey that schema to the next consumer of the data. The default option is to add the name of the schema as an attribute. We will accept the default. But we could also write the entire schema as a FlowFile attribute or use some strategies that are useful for interacting with the Hortonworks Schema Registry.

Now that we've configured everything, we can apply the settings and start our JsonRecordSetWriter as well. We've now got all of our Controller Services setup and enabled:

Now for the fun part of building our flow! The above will probably take about 5 minutes, but it makes laying out the flow super easy! For our demo, we will have a GetFile processor bring data into our flow. We will use UpdateAttribute to add a "schema.name" attribute of "demo-schema" because that is the name of the schema that we configured in our Schema Registry:

We will then use the ConvertRecord processor to convert the data into JSON. Finally, we want to write the data out using PutFile:

We still need to configure our ConvertRecord processor, though. To do so, all that we need to configure in the Properties tab is the Record Reader and Writer that we have already configured:

Now, starting the processors, we can see the data flowing through our system!

Also, now that we have defined these readers and writers and the schema, we can easily create a JSON Reader, and an Avro Writer, for example. And adding additional processors to split the data up, query and route the data becomes very simple because we've already done the "hard" part.

Conclusion

In version 1.2.0 of Apache NiFi, we introduced a handful of new Controller Services and Processors that will make managing dataflows that process record-oriented data much easier. I fully expect that the next release of Apache NiFi will have several additional processors that build on this. Here, we have only scratched the surface of the power that this provides to us. We can delve more into how we are able to transform and route the data, split the data up, and migrate schemas. In the meantime, each of the Controller Services for reading and writing records and most of the new processors that take advantage of these services have fairly extensive documentation and examples. To see that information, you can right-click on a processor and click "Usage" or click the "Usage" icon on the left-hand side of your controller service in the configuration dialog. From there, clicking the "Additional Details..." link in the component usage will provide pretty significant documentation. For example, the JsonTreeReader provides a wealth of knowledge in its Additional Details.

 

Comments:

is there an example for converting an avro file to csv ?

Posted by muiler on May 24, 2017 at 08:31 AM GMT #

Hi @muiler. I don't have a specific example of that handy. It should very much follow the same pattern above, though. The only downside currently is that if you have your schema in the Avro data, the Avro Reader is able to make use of it, but the CSV Writer won't be able to, so you'll still need the schema defined in the Schema Registry. There's a JIRA that will address this so that we can just pass the schema through to the writer. So you'd still need the Schema Registry service, and you'll need an Avro Reader service and a CSV Writer service. Then you can use ConvertRecord the same as above.

Posted by Mark Payne on May 24, 2017 at 12:14 PM GMT #

Post a Comment:
  • HTML Syntax: NOT allowed

Calendar

Search