DoiT Wins Google Cloud Global Sales Partner of the Year Award – Learn more

Using Google API Python Client in Production-Grade Apps

1 24dwwcvdshngex319wgtha

How to build better software with Python and Google APIs

The official Google API Python Client has seen some years. It was written for Python 2 and made to work for Python 3. It uses venerable urllib2 and finally, it’s considered to be in maintenance mode (read “no new features”).

New, product-specific libraries are being developed but they still cover only a small range of Google APIs, mostly focused around GCP. I find them very useful but they don’t cover services like Google Drive or Google Directory API. As a result, we’re largely stuck with the original low-level client I mentioned above for the unforeseeable future.

One of the features that Google API Python Client lacks is Thread Safety

When your app is hanging by a thread

I’ve been writing mostly async Python code for the last years. Projects like FastAPI spearhead the approach, both in terms of technology and mentorship, and make it easy to write both sync and async web services using a single web framework. The async approach is quite addictive in that it liberates you from thinking about what can happen between two lines of code in another thread — with asyncio, you are the one who sets context-switching points.

Getting back into sync work, I simply forgot about thread-safety for a moment, at least until my Cloud Run application started to crash (as in Segmentation Fault) and threw random exceptions like SSLError, TimeoutError after just 25ms, IncompleteRead, etc.

After realizing this was a thread-safety issue, I was pondering on how to fix it. My code uses G Suite APIs and I have a G Suite Manager service class (kingdom of nouns, I know) that I initialize on startup and use while handling web requests in my app.

Staying on the thread-safe side

The official documentation provides some advice on how to use the library in a thread-safe manner — namely to manually manage httplib2 transport objects while calling Google APIs and making sure that each thread uses a unique one.

However, it’s not clear how to apply this advice in the context of a web server where each request is handled in a dedicated thread which may or may not stay alive after the request is finished. Creating a new httplib2.Http() object on every request is wasteful since we’ll be creating a new TCP connection to Google API backend on every request we handle, as well as losing httplib2's ability to utilize ETags to reduce response payload from Google servers.

Eventually, I came up with the idea of creating my own thread-safe pool of httplib2.Http() objects and proxying all Google API Python client requests through this pool — I call it APIConnector.

Here is how it looks:

from google_auth_httplib2 import AuthorizedHttp
from googleapiclient.http import HttpRequest
import google.auth
import httplib2
@dataclass
class APIConnector:
    factory: Callable[[], AuthorizedHttp]
    pool: List[AuthorizedHttp] = field(default_factory=[])
    @classmethod
    def new(
        cls,
        credentials: google.auth.Credentials,
        initial_size: int = 5,
        timeout_seconds: int = 10,
    ) -> APIConnector:
        factory = lambda: AuthorizedHttp(
          credentials,    
          http=httplib2.Http(timeout=timeout_seconds)
        )
        pool: List[AuthorizedHttp] = []
        for i in range(initial_size):
            pool.append(factory())
        return cls(factory, pool=pool)
    def execute(self, req: HttpRequest) -> Any:
        http: Optional[AuthorizedHttp] = None
        try:
            http = self._provision_http()
            return req.execute(http=http)
        finally:
            if http:
                self.pool.append(http)
    def _provision_http(self) -> AuthorizedHttp:
        # This function can run in parallel in multiple threads.
        try:
            return self.pool.pop()
        except IndexError:
            logger.info("Pool exhausted. Creating new transport")
            return self.factory()

A pretty small amount of code to pay for thread safety, right? It’s important to note that our pool manager is thread-safe without using any locks — atomic operations in Python are thread-safe since we leave it to GIL to cover for us.

Here’s how to use the above:

@dataclass
class GSuiteUserManager:
    api: APIConnector
    users: googleapiclient.discovery.Resource
    domain: str
    @classmethod
    def new(cls, domain, credentials) -> GSuiteUsersManager:
        api = APIConnector.new(Credentials)
        service = googleapiclient.discovery.build(
            "admin",           
            "directory_v1",    
            credentials=credentials,        
            cache_discovery=False,          
        )
        users = service.users()
        return cls(api=api, users=users, domain=domain)
    def list(self) -> dict:
        return self.api.execute(
            self.users.list(domain=self.domain)
        )
    def get(self, email: str) -> dict:
        ...

To proxy all of the relevant API methods through Managers, as in the above example, may look like a lot of boilerplate, but I find that in reality such managers will do much more than just returning API responses verbatim. At the very minimum they should model Google API responses into proper descriptive Python objects (e.g. User, Group, etc.) instead of just returning dictionaries which make upstream code incomprehensible very quickly.

Improving it further

If you are finding it interesting so far, let’s go to the bonus content sections.

Timeouts

Google API Python client doesn’t provide a native way to specify timeouts, but since our APIConnector digs into its intricacies anyway, we used that opportunity to control timeouts, as an astute reader might have noticed already.

Cleanups

Using the above APIConnector “as is” will probably leave you up with warnings like the one below when your program finishes.

sys:1: ResourceWarning: unclosed <ssl.SSLSocket fd=5, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('192.168.1.1', 54988), raddr=('142.250.66.173', 443)>

That’s because we have unclosed httplib2.Http() objects in our pool. Let’s fix it by adding the following methods to our APIConnector:

def close(self) -> None:
    for ahttp in self.pool:
        ahttp.http.close()
def __del__(self) -> None:
    self.close()

Now the warnings are gone and we have a clean shutdown.

You also may notice that the pool never shrinks, which is true, but since my use-case is Cloud Run where instances are short-lived anyway, I think it’s a good trade-off in favor of simplicity.

Cache

Httplib2 supports caching to utilize stored ETags with If-None-Match headers when re-retrieving a resource. This can save you from retrieving the data again from Google servers but still incurs a network round trip though.

Httplib2 caching is based on files and is not thread-safe either. However, we do want to have a shared cache between different Http() objects since we use them as a connection pool to talk to the same upstream API.

So once again, utilizing the fact that atomic operations on Python objects are thread-safe, we can easily build a lockless memory cache for httplib2:

@dataclass
class MemCache:
    data: dict[Hashable, Any] = field(default_factory=dict)
    def get(self, key: Hashable) -> Any:
        if hit := self.data.get(key, None):
            logger.debug("Cache hit", key=key)
        return hit
    def set(self, key: Hashable, data: Any) -> None:
        self.data[key] = data
    def delete(self, key):
        try:
            del self.data[1]
        except KeyError:
            pass

(The class interface has been copied from httplib2.FileCache object)

And now we update our classes to use the cache:

@dataclass
class APIConnector:
    ...
    @classmethod
    def new(
        ...
        cache: Optional[MemCache] = None,
    ) -> APIConnector:
        factory = lambda: AuthorizedHttp(
            credentials,    
            http=httplib2.Http(
                timeout=timeout_seconds,
                cache=cache,
            ),
        )        
        ...
class GSuiteUserManager:
    ...
    @classmethod
    def new(
        cls, domain, credentials, use_cache: bool = True
    ) -> GSuiteUsersManager:
        cache = MemCache() if use_cache else None
        api = APIConnector.new(credentials, cache=cache)
        ...

Discovery Cache

Finally, you may be wondering why did I pass cache_discover=False when building service in GSuiteUserManager — the answer is that this functionality is quite broken and creates traceback noise as described in detail here and here.


I hope this article helps you build better software with Python and Google APIs despite that the official library has a couple of rough edges.

The full code for APIConnector and Memcache is available here.

Subscribe to updates, news and more.