filmov
tv
Python Tutorial : Running an ingestion pipeline with Singer

Показать описание
---
Cool! Now that you know how to create schema messages with Singer, let us continue with the record and state messages, so that we can create a full ingestion pipeline with the Singer library.
Here are the user records we saw earlier.
To convert one such user into a Singer RECORD message, we’d call the “write_record” function, like this:
The “stream_name” would need to match the stream you specified earlier in a schema message. Otherwise, these records are ignored.
This would be almost equivalent to nesting the actual record dictionary in another dictionary that has two more keys, being the “type” and the “stream”. That can be done elegantly with the unpacking operator, which are these 2 asterisks here preceding the “fixed_dict”. That unpacks a dictionary in another one, and can be used in function calls as well. Really useful.
Now, I did say “almost equivalent”. The truth is that Singer does a few more transformations to make better JSON, but the details are not that important. Simply use the functionality that is offered to you to benefit most.
When you would combine the “write_schema” and “write_record” functions, you would have a Python module that prints JSON objects to stdout. If you also have a Singer target that can parse these messages, then you have a full ingestion pipeline. In this example, we used “write_records” instead of “write_record”. It can simply deal with many records compared to the single one of “write_record”.
We’re introducing the “target-csv” module, which is available on the Python Package Index. Its goal is to create CSV files from the JSON lines. The CSV file will be made in the same directory where you run this command, unless you configure it otherwise by providing a configuration file.
By the way, nothing prevents you from running a tap by parsing the code with the Python interpreter like this, but you’ll typically find taps and targets properly packaged, so you could call them directly, like this.
You might wonder why we’d go through all this trouble just to produce a CSV file from a few simple records. After all, Python’s csv module provides this functionality out of the box.
The answer is modularity. Each tap or target is designed to do one thing very well. They are easily configured through config files. By working with a standardized intermediate format, you could easily swap out the “target-csv” for “target-google-sheets” or “target-postgresql” for example, which write their output to whole different systems. This means you don’t need to write a lot of code, just pick the taps and targets that match with your intended source and destination and voilà, you’re ready.
We haven’t discussed Singer’s STATE messages yet. They are typically used to keep track of state, which is the way something is at some moment in time. That something is typically some form of memory of the process.
Imagine for example that you must extract only new records from this database daily at noon, local time. The easiest way to do so, is to keep track of the highest encountered “last_updated_on” value and emit that as a state message at the end of a successful run of your tap. Then, you can reuse the same message at a later time to extract only those records that were updated after this old state.
You emit these state messages using the “write_state” function. The only required attribute is the value, which can be any JSON serializable object. The value field is free form and only for use by the same tap.
Let’s write our own custom tap for ingesting data from a REST API.
#DataCamp #PythonTutorial #BuildingDataEngineeringPipelinesinPython
Комментарии