to_dataframe is expecting a PCollection[1], but you are passing a single element of your PCollection instead. Instead of calling df = to_dataframe(element)
as part of PandasTransform
, you can do something like:
messages = (p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=input_subscription)
| 'Parse PubSub Message' >> beam.ParDo(ParsePubSubMessage())
| 'Attaching the schema' >> beam.Map(lambda x: BmsSchema(**x)).with_output_types(BmsSchema)
)
df_messages = to_dataframe(messages)
and then you can manipulate df_messages with dataframe operations. See https://beam.apache.org/documentation/dsls/dataframes/overview/ for more info.