Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

unable to register a prefect flow using varying parameters

I'm trying to implement a prefect flow using varying parameters:

from prefect import Flow, Parameter
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock

a = Parameter('a', default=None, required=False)
b = Parameter('b', default=None, required=False)

schedule = Schedule(clocks=[
    CronClock(' 0 18  *  *  6', parameter_defaults={'a': 'a', 'b': 'b'}),
    CronClock(' 0 12  *  *  0', parameter_defaults={'a': 'a', 'b': 'b'})
])

flow = Flow(
    name='test flow', schedule=schedule
)

flow.register()

but I get the following error:

Result check: OK
Traceback (most recent call last):
  File "/home/psimakis/.config/JetBrains/PyCharm2020.2/scratches/scratch.py", line 18, in <module>
    flow.register()
  File "/home/psimakis/.local/share/virtualenvs/data-workflows-GfPV92cZ/lib/python3.6/site-packages/prefect/core/flow.py", line 1443, in register
    no_url=no_url,
  File "/home/psimakis/.local/share/virtualenvs/data-workflows-GfPV92cZ/lib/python3.6/site-packages/prefect/client/client.py", line 673, in register
    version_group_id=version_group_id,
  File "/home/psimakis/.local/share/virtualenvs/data-workflows-GfPV92cZ/lib/python3.6/site-packages/prefect/client/client.py", line 226, in graphql
    raise ClientError(result["errors"])
prefect.utilities.exceptions.ClientError: [{'message': "Extra parameters were supplied: {'a', 'b'}", 'locations': [{'line': 2, 'column': 5}], 'path': ['create_flow_from_compressed_string'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'errors': [{'message': "Extra parameters were supplied: {'a', 'b'}", 'locations': [], 'path': ['create_flow_from_compressed_string']}]}}}]

Environment:

  • python 3.6.9
  • prefect 0.12.0

Have you any idea what causes this error?

like image 759
Panagiotis Simakis Avatar asked Jun 24 '20 11:06

Panagiotis Simakis


1 Answers

Parameters are a special type of Task in Prefect. In order to use them they need to be added to the flow through methods like this:

from prefect import Flow, Parameter
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock

a = Parameter('a', default=None, required=False)
b = Parameter('b', default=None, required=False)

schedule = Schedule(clocks=[
    CronClock(' 0 18  *  *  6', parameter_defaults={'a': 'a', 'b': 'b'}),
    CronClock(' 0 12  *  *  0', parameter_defaults={'a': 'a', 'b': 'b'})
])

flow = Flow(
    name='test flow', schedule=schedule
)

flow.add_task(a)
flow.add_task(b)
like image 114
Josh Avatar answered Oct 06 '22 01:10

Josh