1.2.0 • Published 3 years ago

kpipe-plan v1.2.0

Weekly downloads
37
License
MIT
Repository
github
Last release
3 years ago

kpipe Plan Compiler

Breaking change for v1.0.0: exported function compileOps has been changed to an async function and returns a Promise<[ Op[], State ]> instead of the former synchronous Result value, [ Op[], State ]

Basics

A similar structure is used for all items found in a set of kpipe operations. Some arguments are optional and/or inferred by their position. The valid forms of an operation are:

[ "OPCODE", [...OPS]]
[ "OPCODE", "STRING", ([...OPS])]
[ "OPCODE", {...DEF}, ([...OPS])]
[ "OPCODE", "STRING", {...DEF}, ([...OPS])]

String substitution is performed on all string values when the operation is invoked. An immutable state object is presented when the operation is parsed and invoked. Variables can be declared using a "def" operation and those variables will be presented to all subsequent operations and their children.

For Example:

[
  ["def", {
    "param1": "Froderick Frankensteen"
  }],
  ["echo", "The name is ${param1}"]
]

Output

The name is Froderick Frankensteen

Expressions and Substitution

Expressions in strings embedded within the plan strings are enclosed with ${ and }. The expression within the enclosure is evaluated and produces a string result which is substituted for the expression in the string. Variables defined in def ops or otherwise available in the plan compile state (named with), are available as substitution variables in expressions. Math symbolic operators as well as unary and binary math operators are available. kpipe-plan uses the module expr-eval (https://www.npmjs.com/package/expr-eval) to parse, simplify, and evaluate expressions.

In addition to the standard math expression operators, the following utility functions are available

  • padZero(x: string|number, n: number = 5): string Left pad (with 0) the supplied string or number x to n digits (default 5)
  • concat( ...args: string[]): string Concatenate the supplied string arguments into a single string

Basic Operations

def

A def defines a set of variables which are available to operations for substitution

["def", {
  "param1": "This is the first parameter",
  "param2": "This is the second",
  "param3": "This is another ${var}",
  "arg1": "${1}"
}]

Substitution is performed in def operations using variables defined by earlier or ancestor operations

echo

An echo operation simply outputs a string to the log. (primarily used for development or testing)

["echo", "Write something to the log"]
Write something to the log

task

A task is an invocation of a kpipe process (usually executes a command in tasks folder depending on configuration of the machine)

["task", "tasks/convertCsv", [...ARGS]]

-or-

["task", { "command": "tasks/convertCsv" }, [...ARGS]]

exec

An exec will invoke a shell command on the machine.

["exec", "ls", ["/tmp"]]

spread

A spread represents a list of tasks which may be executed concurrently

["spread", {}, [
  ["task", "tasks/convertCsv", ["file1.csv"] ],
  ["task", "tasks/convertCsv", ["file2.csv"] ],
  ["task", "tasks/convertCsv", ["file3.csv"] ]
]]

stage

A stage is a logical grouping of tasks. Conventionally used to define an idempotent operation which can be safely re-executed upon failure.

[
  "stage", NAME, {args}, [
    ["task", COMMAND, {}],
    ["spread", ]
  ]
]

plan

A plan is a list of sequential stages. Stages are assumed to be sequentially dependent on one another. That is, before a stage may begin execution, all previous stages must have successfully executed.

[ 
  "plan", {args}, [
    ["stage", "one", [...]],
    ["stage", "two", [...]]
  ]
]

Higher Order Operations

The following operations allow for higher level construction of the basic operations above. In general, when compiled, these operations are replaced with a set of basic operations built by the higher order statement. Typically, theses are used to loop over a set of values, such as partition numbers, and repeat a set of commands substituting the iterated value in each repeated set.

include

An include statement will insert the contents of an external JSON file into the plan.

[
  "include", "relative/path/to/file"  
]

seq

A seq generates integer sequences. The sequence is presented to the inner task list as the variables X (padded string, eg. 00023) and I (numeric value, eg. 23)

[
  "seq", "1 10 2", [
    ["task", "tasks/convertFile", ["path/to/files/file+${X}.csv"]]
  ]
]

-or-

[
  "seq", {
    "start": 1,
    "end": 10,
    "by": 2
  }, [
    ["task", "tasks/convertFile", ["path/to/files/file+${X}.csv"]]
  ]
]

This gets transformed into:

[
  ["task", "tasks/convertFile", ["path/to/files/file+00001.csv"]],
  ["task", "tasks/convertFile", ["path/to/files/file+00003.csv"]],
  ["task", "tasks/convertFile", ["path/to/files/file+00005.csv"]],
  ["task", "tasks/convertFile", ["path/to/files/file+00007.csv"]],
  ["task", "tasks/convertFile", ["path/to/files/file+00009.csv"]]
]

Options

OptsResult
{"start": 1, "end": 10}[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
{"start": 1, "end": 10, "by": 2}[1, 3, 5, 7, 9]
{"count": 10}[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
{"start": 1, "count": 10}[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
{"count": 10, "by": 2}[0, 2, 4, 6, 8]
"10" (count)[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
"1 10" (start, end)[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
"0 10 2" (start, end, by)[0, 2, 4, 6, 8, 10]

with

A with is a parameterized tasks generator. Some inputs are provided, with iteration methods, and a set of tasks are repeated for each of the provided iterations

[
  "with",  {
    "PART": ["001", "002", "003"]
  }, [
    ["task", "tasks/convertFile", ["path/to/files/file+${PART}.csv"]]
  ]
]

This gets transformed into:

[
  ["task", "tasks/convertFile", ["path/to/files/file+001.csv"]],
  ["task", "tasks/convertFile", ["path/to/files/file+002.csv"]],
  ["task", "tasks/convertFile", ["path/to/files/file+003.csv"]]
]

new in v1.0.0

list

A list is an external reference to a text file stored on the filesystem or in Amazon S3. The contents of the file are read and the nested Ops in the list are repeated for every line in the file. The special variable IT is set to the contents of the line for each repeated instance of the nested Ops. The IT variable can be substituted in child ops (using ${IT}) and it will be replaced with the current line in the file.

Note: It's assumed the machine/user performing the compilation has sufficient permissions to access AWS and the S3 location.

new in v0.9.11

A with can specify a name. A named with can be referred to in subsequent with statements which will re-use the definition from earlier. This serves as a mechanism to lift the sequence arrays out of inner loops of tasks and avoid restatements of the with definitions. A named with is maintained in the state when compiling the ops list, so it can be referred to inside dependent (externally defined) sub-plans.

Note: Since the with definition is maintained as part of the compile state, its label must not interfere with other state variables used by the plan

[
  "with",  "FILEPARTS" {
    "PART": ["001", "002", "003"]
  }, [
    ["task", "tasks/convertFile", ["path/to/files/file+${PART}.csv"]]
  ]
]
...
[
  "with", "FILEPARTS", [
    ["task", "tasks/convertFile", ["path/to/otherfiles/file+${PART}.csv"]]
  ]
]

This gets transformed into:

[
  ["task", "tasks/convertFile", ["path/to/files/file+001.csv"]],
  ["task", "tasks/convertFile", ["path/to/files/file+002.csv"]],
  ["task", "tasks/convertFile", ["path/to/files/file+003.csv"]],
  ...
  ["task", "tasks/convertFile", ["path/to/otherfiles/file+001.csv"]],
  ["task", "tasks/convertFile", ["path/to/otherfiles/file+002.csv"]],
  ["task", "tasks/convertFile", ["path/to/otherfiles/file+003.csv"]]
]

An empty named with is valid. This will set the definition to the labeled variable, but will generate no sub-tasks.

[
  ["with", "FILEPARTS", {
    "PART": ["001", "002", "003"]
  }],
  ...
  ["with", "FILEPARTS", [
    ["task", "tasks/convertFile", ["path/to/files/file+${PART}.csv"]]
  ]]
]

This gets transformed into:

[
  ["task", "tasks/convertFile", ["path/to/files/file+001.csv"]],
  ["task", "tasks/convertFile", ["path/to/files/file+002.csv"]],
  ["task", "tasks/convertFile", ["path/to/files/file+003.csv"]]
]

new in v0.10.0

pipeline

A pipeline defines a sequence of operations (see pipe below) which can be performed in parallel, such as operating on a partition of a file set. The pipeline allows for controllably executing the operations in parallel contrained to a certain level of concurrency. (Limits the number of concurrently executing pipe sequences). The pipeline operation will be replaced upon compilation with a sequence of spread operations which encompasses the concurrent work for a total number of pipe sequences (defined by the depth parameter)

["pipeline", {
  "concurrency": 10,
  "depth": 1000,
}, [
  ["pipe", ... ],
  ["pipe", ... ],
  ["pipe", ... ],
  ["pipe", ... ]
]]

pipe

A pipe is a step in a pipeline. (pipeline may only contain pipe as sub-operations). A pipe operation defines a list of sub-operations which will be grouped into a spread operation with other stages of pipe operations. Optionally, a pipe may define a set of step-wise (non-spread) operations to perform setup of the sub-operations, or their tear-down. A pipeline provides to pipe operations the state variables P_X (a padded string, eg. 00023) and P_I (an integer, eg. 23) which represent the value of the depth of the pipeline

["pipe", {
  "pre": [["exec", "kpipe", ["create", "topicA"]]],
  "post": [["echo", "Pipe ${P_X} complete"]]
}, [
  ["task", "tasks/storepartition", ["topicA", "s3://anybucket/file+${P_X}"]]
]]
1.2.0

3 years ago

1.1.2

4 years ago

1.1.1

4 years ago

1.1.0

4 years ago

1.0.0

4 years ago

0.11.0

4 years ago

0.10.1

4 years ago

0.10.0

4 years ago

0.9.14

4 years ago

0.9.13

4 years ago

0.9.12

4 years ago

0.9.11

4 years ago

0.9.10

4 years ago

0.9.9

4 years ago

0.9.8

4 years ago

0.9.7

4 years ago

0.9.6

4 years ago

0.9.5

4 years ago

0.9.4

4 years ago

0.9.3

4 years ago

0.9.2

4 years ago

0.9.1

4 years ago

0.9.0

4 years ago