79789714

Date: 2025-10-14 00:07:28
Score: 1.5
Natty:
Report link

I was unable to use the setup provided by @lightning_missile. Instead I manually wrapped the client like so:

class OpensearchAsyncClient:
    def __init__(self, endpoint, region, access_key, secret_key, session_token):
        self.endpoint = endpoint
        self.signer = AWSV4Signer(
            boto3.Session(
                aws_access_key_id=access_key,
                aws_secret_access_key=secret_key,
                aws_session_token=session_token,
            ).get_credentials(),
            region
        )
        self.session = aiohttp.ClientSession()

    async def make_request(self, method, path, params, body: Dict[str, any], timeout=30):
        url = f"https://{self.endpoint}/{path.lstrip('/')}"
        url_encoded = str(URL(url).with_query(params))
        body_str = json.dumps(body)
        headers = self.signer.sign(
            method,
            url_encoded,
            body_str,
            {'Content-Type': 'application/json'},
        )
        async with self.session.request(
            method, url,
            params=params,
            json=body,
            headers=headers,
            timeout=timeout
        ) as response:
            if response.status != 200:
                res_json = await response.json()
                raise ValueError(f"Unable to make request: {str(response)}: {res_json}")
            response = await response.json()
            return response

Unfortunate because this prevents using opensearch-py directly. Including the answer because it does make use of the constructs provided by it, and there is likely a way to inject an Http client into opensearch-py so that it works but i cannot find a method at the moment

Reasons:
  • Blacklisted phrase (1.5): i cannot find
  • Blacklisted phrase (0.5): i cannot
  • Long answer (-1):
  • Has code block (-0.5):
  • User mentioned (1): @lightning_missile
Posted by: Asad-ullah Khan