Were you able to solve the issue? I'm running into similar issues. Can you help me with a workaround with this?
import logging
import apache_beam as beam
from apache_beam import pvalue
class transfrom(beam.DoFn):
def process(self,element):
yield pvalue.TaggedOutput("example",tuple(element.items()))
class Test(beam.DoFn):
def process(self,element):
with beam.Pipeline() as p:
read = (
p | 'read' >> beam.io.Read(beam.io.BigQuerySource(query="""{}""".format(query_input_table),use_standard_sql=True)))
trans = (read | 'transform' >> beam.Pardo(transform()))
((read | 'transform' >> beam.Pardo(Test(),pvalue.AsDict(trans))))