123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178 |
- import base64
- import json
- import os
- import re
- import time
- import logging
- from google.cloud import pubsub_v1, translate, storage
- def doTranslation(translate_client, project_id, text, src_lang="it", target_lang="en-US"):
- """
- Args:
- text: str -
- src_lang: str - default it
- target_lang: str - default en
- Returns:
- translated_txt: txt - response from translate API
- """
- logging.info('Translating text into {}.'.format(target_lang))
- parent = translate_client.location_path(project_id, location="global")
- # Detail on supported types can be found here:
- # https://cloud.google.com/translate/docs/supported-formats
- translated_dict = translate_client.translate_text(parent=parent,
- contents=[text],
- mime_type="text/plain",
- source_language_code=src_lang,
- target_language_code=target_lang)
- for translation in translated_dict.translations:
- translated_txt = translation.translated_text
- return translated_txt
- def publishMsg(publisher_client, project_id, text, doc_title, topic_name):
- """
- Publish message with text and doc_title.
- Args:
- text: str - Text contained in the document
- doc_title: str -
- topic_name: str -
- Returns:
- """
- # Compose the message to be sent to pubsub
- message = {
- 'text': text,
- 'doc_title': doc_title,
- }
- # Note: the message_data needs to be in bytestring
- # Refer to the documentation:
- # https://googleapis.dev/python/pubsub/latest/publisher/api/client.html
- message_data = json.dumps(message).encode('utf-8')
- topic_path = publisher_client.topic_path(project_id, topic_name)
- # Publish method returns a future instance
- future = publisher_client.publish(topic_path, data=message_data)
- # We need to call result method to extract the message ID
- # Refer to the documentation:
- # https://googleapis.dev/python/pubsub/latest/publisher/api/futures.html#google.cloud.pubsub_v1.publisher.futures.Future
- message_id = future.result()
- logging.info("Message id: {} was published in topic: {}".format(message_id, topic_name))
- def uploadBlob(storage_client, bucket_name, txt_content, destination_blob_name):
- """
- Uploads a file to the bucket.
- Args:
- storage_client:
- bucket_name:
- txt_content: str - text
- destination_blob_name: str - prefix
- Returns:
- """
- destination_blob_name = destination_blob_name.split('gs://{}/'.format(bucket_name))[-1]
- bucket_client = storage_client.bucket(bucket_name)
- blob = bucket_client.blob(destination_blob_name)
- blob.upload_from_string(txt_content)
- logging.info("Text uploaded to {}".format(destination_blob_name))
- def cleanEngText(eng_raw_string, customize_stop_words=[]):
- """
- Args:
- eng_raw_string: str -
- customize_stop_words: list - all stopwords to remove
- Returns:
- refined_doc: str - curated string of eng text
- """
- # Remove dates
- # 1 or 2 digit number followed by back slash followed by 1 or 2 digit number ...
- pattern_dates = '(\d{1,2})/(\d{1,2})/(\d{4})'
- pattern_fig = 'Figure (\d{1,2})'
- pattern_image = '^Image .$'
- replace = ''
- eng_raw_string = re.sub(pattern_dates, replace, eng_raw_string)
- eng_raw_string = re.sub(pattern_fig, replace, eng_raw_string)
- eng_raw_string = re.sub(pattern_image, replace, eng_raw_string)
- # remove punctuation and special characters
- eng_raw_string = re.sub("[^A-Za-z0-9]+", ' ', eng_raw_string)
- # Remove custom stop words
- tokens = [token for token in eng_raw_string.split() if token not in customize_stop_words]
- refined_doc = ''
- for word in tokens:
- refined_doc += ' {}'.format(word)
- return refined_doc
- def translateAndRefine(event, context):
- """
- This Cloud Function will be triggered when a message is published on the
- PubSub topic of interest. It will call Translate API.
- args:
- event (dict): Metadata of the event, received from Pub/Sub.
- context (google.cloud.functions.Context): Metadata of triggering event.
- returns:
- None; the output is written to stdout and Stackdriver Logging
- """
- # INSTANTIATION
- translate_client = translate.TranslationServiceClient()
- publisher_client = pubsub_v1.PublisherClient()
- storage_client = storage.Client()
- # SET VARIABLES
- project_id = os.environ['GCP_PROJECT']
- #RESULT_TOPIC = "it2eng"
- start_time = time.time()
- if event.get('data'):
- message_data = base64.b64decode(event['data']).decode('utf-8')
- message = json.loads(message_data)
- else:
- raise ValueError('Data sector is missing in the Pub/Sub message.')
- it_text = message.get('text')
- doc_title = message.get('doc_title')
- # Step 1: Call Translate API
- raw_eng_text = doTranslation(translate_client,project_id, it_text)
- # Step 2: Clean eng text
- curated_eng_text = cleanEngText(raw_eng_text)
- # Step 3: Publish pub/sub
- # topic_name = RESULT_TOPIC
- # publishMsg(publisher_client, project_id, curated_eng_text, doc_title, topic_name)
- # Step 4: Upload translated text
- dest_bucket = 'covid19-repo-test'
- prefix_raw_eng_txt = 'eng_txt/{}.txt'.format(doc_title)
- uploadBlob(storage_client, dest_bucket, raw_eng_text, prefix_raw_eng_txt)
- prefix_curated_eng_txt = 'curated_eng_txt/{}.txt'.format(doc_title)
- uploadBlob(storage_client, dest_bucket, curated_eng_text, prefix_curated_eng_txt)
- end_time = time.time() - start_time
- logging.info("Completion of text_extract took: {} seconds".format(round(end_time, 1)))
|