2.1.0 • Published 16 days ago

@google-cloud/data-tasks-coordinator v2.1.0

Weekly downloads
-
License
Apache-2.0
Repository
github
Last release
16 days ago

Data Tasks Coordinator

Disclaimer: This is not an official Google product.

Data Tasks Coordinator (code name Sentinel) is a framework that organizes different tasks into an automatic data pipeline, plus a library that conducts interactive installation processes based on configurations.

This solution is not a general purposed orchestration solution. It is designed for marketing solutions with some specified tasks, mainly related to BigQuery.

1. Key Concepts

1.1. Terminology

Task: A data processing element for a specific objective, e.g. loading data from Cloud Storage to BigQuery.

Data pipeline: A series of connected data processing tasks. The output of one task is the input of the next one. The tasks of a pipeline are often executed in parallel or in time-sliced fashion.

Event-driven programming: A programming paradigm in which events, such as user activities or execution results from other programming threads, determine the flow of a program's execution.

1.2. To start a task

A task can be triggered by a Pub/Sub message.

In most cases, a Cloud Scheduler job created by deploy.sh will send those messages regularly to trigger the first task. After that, Sentinel will send messages to trigger next tasks.

  • Message attribute: taskId
  • Message body: the JSON string of the parameter object that will be passed into the task to start.

Task definitions and sql files support parameters in this format: ${parameterName}. The placeholders will be replaced with the value of the parameterName in the passed-in parameter JSON object.

Embedded parameters are supported, e.g. ${parameter.name}.

Each task will pass a parameter object to its next task(s). The passed-on parameter object is the merge result of the parameter that it receives, the new parameters that it generates, and the parameters from the configuration item appendedParameters if it exists.

1.3. General task definition

{
 "foo": {
   "type": "query",
   "source": {
     ...
   },
   "destination": {
     ...
   },
   "options": {
     ...
   },
   "next": "bar"
 }
}

Properties:

  1. foo is the task name.
  2. type, task type. Different types define the details of the task also have different configurations.
  3. source, destination and options are configurations. Refer to the detailed tasks for more information.
  4. next, defines what next task(s) will be started after the current one completed, in this case, task bar will be started after foo.

See config_task.json.template for templates of tasks.

1.4. To install the solution

In a Cloud Shell:

  1. clone the source code;
  2. enter the source code folder, edit the task configuration JSON file;
  3. run chmod a+x ./deploy.sh; ./deploy.sh.

2. Task Configuration Examples

2.1. Load Task

2.1.1. Load CSV file(s) with given schema

{
  "load_job": {
    "type": "load",
    "source": {
      "file": {
        "bucket": "[YOUR_STORAGE_BUCKET_ID]",
        "name": "[YOUR_FILENAME]"
      }
    },
    "destination": {
      "table": {
        "projectId": "[YOUR_CLOUD_PROJECT_ID]",
        "datasetId": "[YOUR_BIGQUERY_DATASET_ID]",
        "tableId": "[YOUR_BIGQUERY_TABLE_ID]",
        "location": "[YOUR_BIGQUERY_LOCATION_ID]"
      },
      "tableSchema": {
        "schema": {
          "fields": [
            {
              "mode": "NULLABLE",
              "name": "[YOUR_BIGQUERY_TABLE_COLUMN_1_NAME]",
              "type": "[YOUR_BIGQUERY_TABLE_COLUMN_1_TYPE]"
            },
            {
              "mode": "NULLABLE",
              "name": "[YOUR_BIGQUERY_TABLE_COLUMN_2_NAME]",
              "type": "[YOUR_BIGQUERY_TABLE_COLUMN_2_TYPE]"
            }
          ]
        }
      }
    },
    "options": {
      "sourceFormat": "CSV",
      "writeDisposition": "WRITE_TRUNCATE",
      "skipLeadingRows": 1,
      "autodetect": false
    }
  }
}

2.1.2. Load CSV file(s) with autodetected schema

{
  "load_job": {
    "type": "load",
    "source": {
      "file": {
        "bucket": "[YOUR_STORAGE_BUCKET_ID]",
        "name": "[YOUR_FILENAME]"
      }
    },
    "destination": {
      "table": {
        "projectId": "[YOUR_CLOUD_PROJECT_ID]",
        "datasetId": "[YOUR_BIGQUERY_DATASET_ID]",
        "tableId": "targetTable$${partitionDay}",
        "location": "[YOUR_BIGQUERY_LOCATION_ID]"
      }
    },
    "options": {
      "sourceFormat": "CSV",
      "writeDisposition": "WRITE_TRUNCATE",
      "skipLeadingRows": 1,
      "autodetect": true
    }
  }
}

2.2. Query task

2.2.1. Query through a simple SQL string

{
  "query_job_sql": {
    "type": "query",
    "source": {
      "sql": "[YOUR_QUERY_SQL]"
    },
    "destination": {
      "table": {
        "projectId": "[YOUR_CLOUD_PROJECT_ID]",
        "datasetId": "[YOUR_BIGQUERY_DATASET_ID]",
        "tableId": "[YOUR_BIGQUERY_TABLE_ID]"
      },
      "writeDisposition": "WRITE_TRUNCATE"
    }
  }
}

2.2.2. Query through a Cloud Storage file

{
  "query_job_gcs": {
    "type": "query",
    "source": {
      "file": {
        "bucket": "[YOUR_BUCKET_FOR_SQL_FILE]",
        "name": "[YOUR_SQL_FILE_FULL_PATH_NAME]"
      }
    },
    "destination": {
      "table": {
        "projectId": "[YOUR_CLOUD_PROJECT_ID]",
        "datasetId": "[YOUR_BIGQUERY_DATASET_ID]",
        "tableId": "[YOUR_BIGQUERY_TABLE_ID]"
      },
      "writeDisposition": "WRITE_TRUNCATE"
    }
  }
}

2.3. Export task

2.3.1. Export a table to Cloud Storage

{
  "export_job": {
    "type": "export",
    "source": {
      "projectId": "[YOUR_CLOUD_PROJECT_ID]",
      "datasetId": "[YOUR_BIGQUERY_DATASET_ID]",
      "tableId": "[YOUR_BIGQUERY_TABLE_ID]",
      "location": "[YOUR_BIGQUERY_LOCATION_ID]"
    },
    "destination": {
      "bucket": "[YOUR_BUCKET_FOR_EXPORTED_FILE]",
      "name": "[YOUR_FULL_PATH_NAME_FOR_EXPORTED_FILE]"
    },
    "options": {
      "destinationFormat": "NEWLINE_DELIMITED_JSON",
      "printHeader": false
    }
  }
}

2.3.2. Export a file to trigger Tentacles (usually after a ‘Query’ task)

{
  "export_for_tentacles": {
    "type": "export",
    "source": {
      "projectId": "${destinationTable.projectId}",
      "datasetId": "${destinationTable.datasetId}",
      "tableId": "${destinationTable.tableId}",
      "location": "#DATASET_LOCATION#"
    },
    "destination": {
      "bucket": "#GCS_BUCKET#",
      "name": "#OUTBOUND#/API[MP]_config[test]_${partitionDay}.ndjson"
    },
    "options": {
      "destinationFormat": "NEWLINE_DELIMITED_JSON",
      "printHeader": false
    }
  }
}
2.1.0

16 days ago

2.0.1

23 days ago

2.0.0

1 month ago

1.9.3-alpha

3 months ago

1.9.2-alpha

3 months ago

1.8.0

6 months ago

1.6.0

8 months ago

1.5.1-alpha

10 months ago

1.6.9-alpha

7 months ago

1.9.1

5 months ago

1.9.0

5 months ago

1.7.0

7 months ago

1.7.1-beta

6 months ago

1.5.0

11 months ago

1.4.9-alpha

11 months ago

1.4.1

12 months ago

1.4.0

1 year ago

1.3.3-beta

1 year ago

1.2.9-alpha

1 year ago

1.3.0

1 year ago

1.3.2-beta

1 year ago

1.3.1-beta

1 year ago

1.2.0-beta

2 years ago

1.1.0

2 years ago

1.0.0

2 years ago

1.0.1-beta

2 years ago

0.9.2-beta

2 years ago

0.9.3-beta

2 years ago

0.9.1-beta

2 years ago

0.9.0

2 years ago

0.8.4

2 years ago

0.8.1

2 years ago

0.8.2

2 years ago

0.7.5-beta

2 years ago

0.8.0

2 years ago

0.7.3-beta

2 years ago

0.7.4-beta

2 years ago

0.7.2-beta

3 years ago

0.7.1

3 years ago

0.7.0

3 years ago

0.6.4

3 years ago

0.6.3

3 years ago

0.6.2

3 years ago

0.6.1

3 years ago

0.6.0

3 years ago

0.5.3

3 years ago

0.5.3-beta

3 years ago

0.5.2

3 years ago

0.5.1

3 years ago

0.5.0

3 years ago

0.4.3-beta

3 years ago

0.4.2

3 years ago

0.4.1

3 years ago

0.4.0

3 years ago

0.3.1

3 years ago

0.3.0

3 years ago

0.2.1

3 years ago

0.2.0

3 years ago

0.2.3

3 years ago

0.2.2

3 years ago

0.2.4

3 years ago

0.1.2

3 years ago

0.1.1

3 years ago

0.1.0

3 years ago