79677690

Date: 2025-06-24 13:48:25
Score: 1.5
Natty:
Report link

thank you for you answers.

I followed the approach in my [edit 1] proposal and came up with the following.

  1. I converted my list(parts) into a list(list(parts))
# FROM
packages_to_dl = [
    { "part": "file_1.7z.001" },
    { "part": "file_1.7z.xxx" },

    { "part": "file_N.7z.001" },
    { "part": "file_N.7z.xxx" },
]

# TO
packages_to_dl = [
    [ "file_1.7z.001", "file_1.7z.xxx" ],
    [ "file_N.7z.001", "file_N.7z.xxx" ],
]
  1. Then I took advantage of list index to know where I am
async def download(self, packages_to_dl: list) -> None:
    for idx, packages in enumerate(packages_to_dl):
        if idx == 0:
            """ Download batch of parts """
            async with asyncio.TaskGroup() as tg:
                [
                    tg.create_task(
                        coro=self.download_from_gitlab(
                            url,
                            output_document,
                        )
                    ) for x in packages
                ]
    
        if idx != 0:
            async with asyncio.TaskGroup() as tg:
                """ Download idx parts... """
                [
                    tg.create_task(
                        coro=self.download_from_gitlab(
                            url,
                            output_document
                        )
                    ) for x in packages
                ]
    
                """ ... While extracting idx-1 parts """
                args = [
                    'x',
                    packages_to_dl[idx-1][0],
                    save_dir
                ]
    
                tg.create_task(
                    coro=self.extract(
                        "7z",
                        args
                    )
                )
    
    """ Once the loop is done, extract last batch of parts """
    args = [
        'x',
        packages_to_dl[-1][0],
        save_dir
    ]
    await self.extract("7z", args)


async def download_from_gitlab(self, url: str, output_document: str, limiter=2) -> None: 
    async with asyncio.Semaphore(limiter): # download parts 2/2 by default
        async with self._session.get(url=url) as r:
            with open(output_document, "wb") as f:
                chunk_size = 64*1024
                async for data in r.content.iter_chunked(chunk_size):
                    f.write(data) 

async def extract(self, program: str, args: list[str]) -> None:
    proc = await asyncio.create_subprocess_exec(
        program,
        *args,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )
    await proc.communicate()
    print(f'{program} {" ".join(args)} exited with {proc.returncode}')

Cheers,

Reasons:
  • Blacklisted phrase (0.5): thank you
  • Blacklisted phrase (1): Cheers
  • Long answer (-1):
  • Has code block (-0.5):
  • Self-answer (0.5):
  • Low reputation (1):
Posted by: aymericpineau