Rationale

I have been doing a little work with AWS data pipeline recently for undertaking ETL tasks at work. AWS data pipeline handles data driven workflows called pipelines. The data pipelines take care of scheduling, data depenedencies, data sources and destinations in a nicely managed workflow. My tasks take batch datasets from SQL databases, processing and loading those datasets into S3 buckets, then import into a Redshift reporting database.

Seeing that production database structures are frequently updated, those changes need to be reflected in the reporting backend service. Now for a couple of years I have struggled on with Amazon's web based data pipeline architect to manage those changes to the pipeline. This has been an onerous task, as the architect does not really lend itself very well to the task of managing a large set of pipelines. Here begins a little tale of delving into the AWS data pipeline API to find another way.

The original problem I was presented with when I started this project was that many parallel processes were written into several large pipelines which carried out an entire set of nightly ETL tasks. My job was thus to split these monsters into many smaller, and more manageable pieces. I decided to start by exporting the whole pipeline definition as a set of JSON definition files and make some sense of it, but no luck. The whole thing was so intertwined, cluttered and hard to make sense of that I decided to abandon ship and try another way.

New Beginning

I started by creating a brand new base pipeline definition, using the architect, that would be common to all processes and exported that as a new baseline definition JSON for all the other pipelines.

I then wrote a Python tool to bring together a set of config parameters in JSON format, containing details such as database passwords, S3 bucket locations and so on, and a series of JSON pipeline definition files. This was so that my pipeline template files could be stored in a Git repo without the accompanying sensitive data.

The steps for creating a new pipeline then become quite simple:

  • Create a new set of parameters for the pipeline in the configuration file.
  • Make a copy of the template file and amend as required for the process.
  • Run the tool to output the JSON definition file as required by the architect.
  • Build new pipeline within the architect tool importing the definition file created above.

Moving on though, I really wanted to dump the clunky web-based architect tool altogether by making use of the data pipeline API.

Using the Data Pipeline API

So the next step for me was to move from using the architect tool in the browser to using the data pipeline API. I initially thought that the pipeline definitions from the architect would be usable in the API, but no, the API needs definitions to be in a different format. More on this later.

Before moving on, let us look at a typical JSON formatted definition as used by the architect.

JSON Sample

Just for this post, I have created a simple data pipeline sample. This is how it looks in the web based architect tool:

 

AWS Data Pipeline Sample

 

Below is the JSON export of the pipeline sample above:

{
  "objects": [
    {
      "*password": "#{*myRDSPassword}",
      "name": "rds_mysql",
      "jdbcProperties": "allowMultiQueries=true",
      "id": "rds_mysql",
      "type": "RdsDatabase",
      "rdsInstanceId": "#{myRDSInstanceId}",
      "username": "#{myRDSUsername}"
    },
    {
      "database": {
        "ref": "rds_mysql"
      },
      "name": "SourceRDSTable",
      "id": "SourceRDSTable",
      "type": "SqlDataNode",
      "table": "#{myRDSTableName}",
      "selectQuery": "select * from #{table}"
    },
    {
      "instanceType": "#{myEC2InstanceType}",
      "name": "Ec2Instance",
      "actionOnTaskFailure": "terminate",
      "securityGroups": "#{myEc2RdsSecurityGrps}",
      "id": "Ec2Instance",
      "type": "Ec2Resource",
      "terminateAfter": "2 Hours"
    },
    {
      "output": {
        "ref": "S3OutputLocation"
      },
      "input": {
        "ref": "SourceRDSTable"
      },
      "name": "RDStoS3CopyActivity",
      "runsOn": {
        "ref": "Ec2Instance"
      },
      "id": "RDStoS3CopyActivity",
      "type": "CopyActivity"
    },
    {
      "failureAndRerunMode": "CASCADE",
      "resourceRole": "DataPipelineDefaultResourceRole",
      "role": "DataPipelineDefaultRole",
      "scheduleType": "ONDEMAND",
      "name": "Default",
      "id": "Default"
    },
    {
      "directoryPath": "#{myOutputS3Loc}/#{format(@scheduledStartTime, 'YYYY-MM-dd-HH-mm-ss')}",
      "name": "S3OutputLocation",
      "id": "S3OutputLocation",
      "type": "S3DataNode"
    }
  ],
  "parameters": [
    {
      "description": "RDS MySQL password",
      "id": "*myRDSPassword",
      "type": "String"
    },
    {
      "description": "Output S3 folder",
      "id": "myOutputS3Loc",
      "type": "AWS::S3::ObjectKey"
    },
    {
      "description": "RDS MySQL username",
      "id": "myRDSUsername",
      "type": "String"
    },
    {
      "description": "RDS MySQL table name",
      "id": "myRDSTableName",
      "type": "String"
    },
    {
      "description": "Ec2 Security group(s)",
      "isArray": "true",
      "optional": "true",
      "id": "myEc2RdsSecurityGrps",
      "type": "String"
    },
    {
      "default": "r3.xlarge",
      "description": "EC2 instance type",
      "id": "myEC2InstanceType",
      "type": "String"
    },
    {
      "description": "RDS Instance ID",
      "id": "myRDSInstanceId",
      "type": "String"
    }
  ],
  "values": {
    "myRDSInstanceId": "database.abcde123456.eu-west-1.rds.amazonaws.com",
    "myRDSUsername": "rdsuser",
    "myEC2InstanceType": "r3.xlarge",
    "myOutputS3Loc": "s3://my-output-logs/",
    "*myRDSPassword": "rdspassword",
    "myRDSTableName": "rdstable"
  }
}

Now for even a slightly larger and more complex pipeline it does not take much imagination to see that this definition can start to look quite complex and unmanageable, both in the architect and the JSON file itself. This complexity was the driving force behind me creating many smaller pipelines.

If you were to import the above file (say it was saved as definition.json) into a Python console using the json library, the root level dict keys would show as objects, values and parameters:

import json
out = json.load('definition.json')
out.keys() #[u'objects', u'values', u'parameters']

The lists and dict these keys refer to will be required later, so please read on...

Getting to Work with the API

I'll basically step through what I did to get a pipeline into the AWS infrastructure.

  1. Creating a Connection
  2. Creating a new Pipeline
  3. Check for Success
  4. Insert JSON Here...
  5. Putting the Definition

Creating a Connection

Firstly, create a client (the Docker container I use here keeps Amazon credentials as environment variables).

import boto3, os, pprint, uuid
client = boto3.client(
    'datapipeline',
    aws_access_key_id=os.environ.get('AWS_ACCESS_KEY_ID'),
    aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY'),
    region_name=os.environ.get('AWS_DEFAULT_REGION'),
)

Just a quick test; can I list the pipelines I already have?

pprint.pprint(client.list_pipelines())

{'ResponseMetadata': {'HTTPHeaders': {'content-length': '297',
                                      'content-type': 'application/x-amz-json-1.1',
                                      'date': 'Wed, 16 Nov 2016 18:39:00 GMT',
                                      'x-amzn-requestid': 'f0fad3de-ac2b-11e6-8797-6578b9a77e01'},
                      'HTTPStatusCode': 200,
                      'RequestId': 'f0fad3de-ac2b-11e6-8797-6578b9a77e01',
                      'RetryAttempts': 0},
 u'hasMoreResults': False,
 u'pipelineIdList': [{u'id': u'df-10264073JXS47Y8NS12O',
                      u'name': u'architect-01-s3-to-redshift'},
                     {u'id': u'df-07270122JR3GK3NC7O8Z',
                      u'name': u'architect-02-s3-to-redshift'},
                     {u'id': u'df-06847673DGG6UGYJSEG7',
                      u'name': u'architect-03-s3-to-rds'},
                     {u'id': u'df-0315779437OOQ2TYL69',
                      u'name': u'architect-04-s3-to-rds'},
                     {...}]}

Creating a new Pipeline

Create a new pipeline.

create = client.create_pipeline(name='api-test-1', uniqueId=str(uuid.uuid1()))
pprint.pprint(create)

{'ResponseMetadata': {'HTTPHeaders': {'content-length': '40',
                                      'content-type': 'application/x-amz-json-1.1',
                                      'date': 'Wed, 16 Nov 2016 18:40:51 GMT',
                                      'x-amzn-requestid': '33581d9a-ac2c-11e6-8797-6578b9a77e01'},
                      'HTTPStatusCode': 200,
                      'RequestId': '33581d9a-ac2c-11e6-8797-6578b9a77e01',
                      'RetryAttempts': 0},
 u'pipelineId': u'df-086679538TXJJEBBT0JI'}

The pipelineId value from the create dictionary will be the reference required in the next step when putting the pipeline definition.

Check for Success

Let us list the pipelines again to see that our new pipeline was created successfully:

pprint.pprint(client.list_pipelines())

{'ResponseMetadata': {'HTTPHeaders': {'content-length': '367',
                                      'content-type': 'application/x-amz-json-1.1',
                                      'date': 'Wed, 16 Nov 2016 18:49:25 GMT',
                                      'x-amzn-requestid': '65a3ad62-ac2d-11e6-8ebf-e74a8b25a45f'},
                      'HTTPStatusCode': 200,
                      'RequestId': '65a3ad62-ac2d-11e6-8ebf-e74a8b25a45f',
                      'RetryAttempts': 0},
 u'hasMoreResults': False,
 u'pipelineIdList': [{u'id': u'df-10264073JXS47Y8NS12O',
                      u'name': u'architect-01-s3-to-redshift'},
                     {u'id': u'df-07270122JR3GK3NC7O8Z',
                      u'name': u'architect-02-s3-to-redshift'},
                     {u'id': u'df-06847673DGG6UGYJSEG7',
                      u'name': u'architect-03-s3-to-rds'},
                     {u'id': u'df-0315779437OOQ2TYL69',
                      u'name': u'architect-04-s3-to-rds'},
                     {u'id': u'df-086679538TXJJEBBT0JI',
                      u'name': u'api-test-1'},
                     {...}]}

And there it is, named api-test-1 as requested. We can go ahead and get more information about our new pipeline now by using the describe_pipelines call:

description = client.describe_pipelines(pipelineIds=['df-086679538TXJJEBBT0JI'])
pprint.pprint(description)

{'ResponseMetadata': {'HTTPHeaders': {'content-length': '599',
   'content-type': 'application/x-amz-json-1.1',
   'date': 'Wed, 16 Nov 2016 18:55:19 GMT',
   'x-amzn-requestid': '66e79486-b1ad-11e6-8249-23409048a58c'},
  'HTTPStatusCode': 200,
  'RequestId': '66e79486-b1ad-11e6-8249-23409048a58c',
  'RetryAttempts': 0},
 u'pipelineDescriptionList': [{u'fields': [{u'key': u'pipelineCreator',
     u'stringValue': u'AIDAJV2YZC22FODOTLB54'},
    {u'key': u'@creationTime', u'stringValue': u'2016-11-16T18:49:25'},
    {u'key': u'name', u'stringValue': u'api-test-1'},
    {u'key': u'@sphere', u'stringValue': u'PIPELINE'},
    {u'key': u'@id', u'stringValue': u'df-086679538TXJJEBBT0JI'},
    {u'key': u'@pipelineState', u'stringValue': u'PENDING'},
    {u'key': u'@accountId', u'stringValue': u'816026221619'},
    {u'key': u'uniqueId',
     u'stringValue': u'70c30b52-b1ac-11e6-85e2-0242ac110003'},
    {u'key': u'@userId', u'stringValue': u'AIDAJV2YZC22FODOTLB54'}],
   u'name': u'api_test_2',
   u'pipelineId': u'df-086679538TXJJEBBT0JI',
   u'tags': []}]}

You may notice from the above return data that the pipeline seems okay; it is in a pending state but not yet ready to go. That is because it does not yet have a definition attached. That's next...

Insert JSON Here...

Brilliant, now we have a pipeline ready to put a definition into. This is the moment where I grab the JSON definition and push that into the API request, or so I thought before diving deeper into PutPipelineDefinition documentation.

Essentially, the data pipeline architect uses a different format of JSON to from the data pipeline API for handling the import and export of definition files.

First, I load an example definition file:

import json
definition = json.load(open('definition.json', 'r'))

If we extract the values dict from the JSON. we get something like the below, which is quite acceptable for the architect:

pprint.pprint(definition.get('values'))

{u'*myRDSPassword': u'rdspassword',
 u'myEC2InstanceType': u'r3.xlarge',
 u'myOutputS3Loc': u's3://my-output-logs/',
 u'myRDSInstanceId': u'database.abcde123456.eu-west-1.rds.amazonaws.com',
 u'myRDSTableName': u'rdstable',
 u'myRDSUsername': u'rdsuser'}

What we need for the API is formatted like the below; quite different from what is suitable for the architect. Here the key value pairs of the above dict have been split into separate dictionary objects of id and stringValue pairings within a list:

pprint.pprint(parameterValues)

[{'id': u'myEC2InstanceType', 'stringValue': u'r3.xlarge'},
 {'id': u'myRDSTableName', 'stringValue': u'rdstable'},
 {'id': u'myOutputS3Loc', 'stringValue': u's3://my-output-logs/'},
 {'id': u'myRDSInstanceId',
  'stringValue': u'database.abcde123456.eu-west-1.rds.amazonaws.com'},
 {'id': u'*myRDSPassword', 'stringValue': u'rdspassword'},
 {'id': u'myRDSUsername', 'stringValue': u'rdsuser'}]

Upon this discovery I momentarily considered writing some code to convert a definition into API objects, but then I thought, there must be another way. After all, most of what AWS provides as functionality on their console web site is built upon interactions with their API, so I reasoned that there must already be code out there that does this conversion from the web based architect front end into the data pipeline API.

It turns out that the awscli library presents a handy data pipeline customisations library enabling definition to API objects translation, and much more! See the code here in github for more information.

So, remember the lists and dict the objects, values and parameters keys referred to in the definition JSON from earlier? Well, what needs to be done is to convert each of those into a format parseable by the API; this is where the customizations library comes into its own:

import awscli.customizations.datapipeline.translator as trans

pipelineObjects = trans.definition_to_api_objects(definition)
parameterObjects = trans.definition_to_api_parameters(definition)
parameterValues = trans.definition_to_parameter_values(definition)

client.put_pipeline_definition will need the three lists from the code above, and the pipelineId from the create response earlier in order to succeed.

Putting the Definition

Once we have the objects, values and parameters in the right shape, and a pipeline ID, we can make a put pipeline definition request to the API. This will finish off the pipeline creation process, as the pipeline will be installed complete with the definition that tells it what to do:

response = client.put_pipeline_definition(
    pipelineId=create['pipelineId'],
    pipelineObjects=pipelineObjects,
    parameterObjects=parameterObjects,
    parameterValues=parameterValues
)

If the pipeline definition request is successful, then we should see something like this in the response object:

pprint.pprint(response)

{'ResponseMetadata': {'HTTPHeaders': {'content-length': '204',
   'content-type': 'application/x-amz-json-1.1',
   'date': 'Thu, 17 Nov 2016 11:55:27 GMT',
   'x-amzn-requestid': 'bb12cfac-acbc-11e6-8442-d1754b82fda0'},
  'HTTPStatusCode': 200,
  'RequestId': 'bb12cfac-acbc-11e6-8442-d1754b82fda0',
  'RetryAttempts': 0},
 u'errored': False,
 u'validationErrors': [],
 u'validationWarnings': [{u'id': u'Default',
   u'warnings': [u"'pipelineLogUri'is missing. It is recommended to set this value on Default object for better troubleshooting."]}]}

You may have spotted the validationWarnings key: the definition is validated by the API during creation. Clearly any fatal errors would result in a rejection, but anything that is a warning is merely notified in the response. It is worth noting that some warnings do not seem to be shown, such as incorrectly specifying a terminateAfter parameter.

Conclusion

So there it is. Not that difficult to go from managing an unwieldy set of pipelines in a web browser front end to a small set of JSON files and a handful of API calls. There is also the nice bonus of having all the data pipelines under version control with the sensitive data kept separately.


Comments

comments powered by Disqus