79145234

Date: 2024-10-31 15:07:53
Score: 1
Natty:
Report link

to_dataframe is expecting a PCollection[1], but you are passing a single element of your PCollection instead. Instead of calling df = to_dataframe(element) as part of PandasTransform, you can do something like:

messages = (p
                    | 'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=input_subscription)
                    | 'Parse PubSub Message' >> beam.ParDo(ParsePubSubMessage())
                    | 'Attaching the schema' >> beam.Map(lambda x: BmsSchema(**x)).with_output_types(BmsSchema)
                    )
df_messages = to_dataframe(messages)

and then you can manipulate df_messages with dataframe operations. See https://beam.apache.org/documentation/dsls/dataframes/overview/ for more info.

[1] https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.convert.html#apache_beam.dataframe.convert.to_dataframe

Reasons:
  • Probably link only (1):
  • Long answer (-0.5):
  • Has code block (-0.5):
  • Low reputation (1):
Posted by: Danny McCormick