Explorar el Código

Automatic extraction with cloud functions

Aziz Ketari hace 4 años
padre
commit
423fd5f16a

+ 1 - 2
README.md

@@ -45,8 +45,7 @@ gcloud services enable datastore.googleapis.com
 gcloud services enable bigquery.googleapis.com
 ```
 
-- Install package requirements:curl -O https://bootstrap.pypa.io/get-pip.py
-sudo python3 get-pip.py
+- Install package requirements:
 > Make sure you have a python version >=3.6.0. Otherwise you will face some version errors [Useful link](https://stackoverflow.com/questions/47273260/google-cloud-compute-engine-change-to-python-3-6)
 
 `ERROR: Package 'scispacy' requires a different Python: 3.5.3 not in '>=3.6.0'`

+ 3 - 1
content/env_variables.sh

@@ -4,4 +4,6 @@ export BUCKET_NAME="bucket_contains_data"
 export LOCATION="compute_region"
 export BQ_DATASET_NAME="covid19"
 export BQ_TABLE_NAME="ISMIR"
-export TEST_CASE="case14" # lowercase any case from 1 to 49 (e.g case1, case32 ...)
+export TEST_CASE="case14" # lowercase any case from 1 to 49 (e.g case1, case32 ...)
+export RESULT_TOPIC="topic_of_choice"
+export DEST_BUCKET="name_bucket" #trigger bucket must be different than data bucket

+ 4 - 1
requirements.txt

@@ -1,10 +1,13 @@
 google-api-core==1.16.0
 google-api-python-client==1.7.11
+google-cloud-storage==1.27.0
 google-cloud-bigquery==1.24.0
 google-cloud-datastore==1.11.0
 google-cloud-translate==2.0.1
 google-cloud-vision==1.0.0
 google-oauth2-tool==0.0.3
 googleapis-common-protos==1.51.0
+google-cloud-pubsub==1.4.2
 pandas
-scispacy
+scispacy
+

+ 10 - 9
scripts/extraction.py

@@ -1,8 +1,9 @@
 from google.cloud import storage, vision
 from google.oauth2 import service_account
-from utils.preprocessing_fcn import async_detect_document, read_json_result, upload_blob
+from utils.preprocessing_fcn import asyncDetectDocument, readJsonResult, uploadBlob
 
 import logging
+
 logging.getLogger().setLevel(logging.INFO)
 
 import time
@@ -34,11 +35,11 @@ for blob in lst_pdf_blobs:
     json_gcs_dest_path = 'gs://' + bucket_name + '/json/' + doc_title + '-'
 
     # OCR pdf documents
-    async_detect_document(vision_client,
-                          gcs_source_path,
-                          json_gcs_dest_path)
+    asyncDetectDocument(vision_client,
+                        gcs_source_path,
+                        json_gcs_dest_path)
 total_time = time.time() - start_time
-logging.info("Vision API successfully completed OCR of all documents on {} minutes".format(round(total_time / 60,1)))
+logging.info("Vision API successfully completed OCR of all documents on {} minutes".format(round(total_time / 60, 1)))
 
 # Extracting the text now
 start_time = time.time()
@@ -46,15 +47,15 @@ for blob in lst_json_blobs:
     doc_title = blob.name.split('/')[-1].split('-')[0]
 
     # Define GCS paths
-    #json_gcs_dest_path = 'gs://' + bucket_name + '/{}'.format(blob.name)
+    # json_gcs_dest_path = 'gs://' + bucket_name + '/{}'.format(blob.name)
     txt_gcs_dest_path = 'gs://' + bucket_name + '/raw_txt/' + doc_title + '.txt'
 
     # Parse json
-    all_text = read_json_result(storage_client=storage_client, bucket_name=bucket_name, doc_title=doc_title)
+    all_text = readJsonResult(storage_client=storage_client, bucket_name=bucket_name, doc_title=doc_title)
 
     # Upload raw text to GCS
-    upload_blob(storage_client=storage_client, bucket_name=bucket_name,
-                txt_content=all_text, destination_blob_name=txt_gcs_dest_path)
+    uploadBlob(storage_client=storage_client, bucket_name=bucket_name,
+               txt_content=all_text, destination_blob_name=txt_gcs_dest_path)
 
 total_time = time.time() - start_time
 logging.info(

+ 193 - 0
scripts/utils/CF_OCR.py

@@ -0,0 +1,193 @@
+import logging
+import json
+import os
+import time
+
+from google.cloud import pubsub_v1
+from google.cloud import vision, storage
+from google.protobuf import json_format
+
+publisher_client = pubsub_v1.PublisherClient()
+vision_client = vision.ImageAnnotatorClient()
+storage_client = storage.Client()
+
+project_id = os.environ['GCP_PROJECT']
+RESULT_TOPIC = os.environ["RESULT_TOPIC"] #e.g pdf2text
+
+
+def documentOCR(vision_client, gcs_source_uri, gcs_destination_uri, batch_size=20):
+    """
+
+    Args:
+        vision_client:
+        gcs_source_uri:
+        gcs_destination_uri:
+        batch_size:
+
+    Returns:
+
+    """
+    doc_title = gcs_source_uri.split('/')[-1].split('.pdf')[0]
+
+    # Supported mime_types are: 'application/pdf' and 'image/tiff'
+    mime_type = 'application/pdf'
+
+    # Feature in vision API
+    feature = vision.types.Feature(
+        type=vision.enums.Feature.Type.DOCUMENT_TEXT_DETECTION)
+
+    gcs_source = vision.types.GcsSource(uri=gcs_source_uri)
+    input_config = vision.types.InputConfig(
+        gcs_source=gcs_source, mime_type=mime_type)
+
+    gcs_destination = vision.types.GcsDestination(uri=gcs_destination_uri)
+    output_config = vision.types.OutputConfig(
+        gcs_destination=gcs_destination, batch_size=batch_size)
+
+    async_request = vision.types.AsyncAnnotateFileRequest(
+        features=[feature], input_config=input_config,
+        output_config=output_config)
+
+    operation = vision_client.async_batch_annotate_files(
+        requests=[async_request])
+
+    # print('Waiting for the operation to finish.')
+    operation.result(timeout=180)
+    logging.info('Text extraction from document {} is completed.'.format(doc_title))
+
+
+def readJsonResult(storage_client, bucket_name, doc_title):
+    """
+    Parsing the json files and extract text.
+    Args:
+        storage_client:
+        bucket_name:
+        doc_title:
+
+    Returns:
+        all_text: str - Containing all text of the document
+    """
+    gcs_src_prefix = 'json/' + '{}-'.format(doc_title)
+
+    # List objects with the given prefix.
+    bucket_client = storage_client.get_bucket(bucket_name)
+    blob_list = list(bucket_client.list_blobs(prefix=gcs_src_prefix))
+    all_text = ''
+    for blob in blob_list:
+
+        json_string = blob.download_as_string()
+        response = json_format.Parse(
+            json_string, vision.types.AnnotateFileResponse())
+
+        # The actual response for the first page of the input file.
+        for response in response.responses:
+            # first_page_response = response.responses[0]
+            text_response = response.full_text_annotation.text
+            all_text += text_response
+            all_text += ' '
+
+    logging.info("Parsing of {} json doc was successful.".format(doc_title))
+    return all_text
+
+
+def uploadBlob(storage_client, bucket_name, txt_content, destination_blob_name):
+    """
+    Uploads a file to the bucket.
+    Args:
+        storage_client:
+        bucket_name:
+        txt_content: str - text
+        destination_blob_name: str - prefix
+
+    Returns:
+
+    """
+    destination_blob_name = destination_blob_name.split('gs://{}/'.format(bucket_name))[-1]
+    bucket_client = storage_client.bucket(bucket_name)
+    blob = bucket_client.blob(destination_blob_name)
+
+    blob.upload_from_string(txt_content)
+
+    logging.info("Text uploaded to {}".format(destination_blob_name))
+
+
+def publishMsg(text, doc_title, topic_name):
+    """
+    Publish message with text and filename.
+    Args:
+        text: str - Text contained in the document
+        doc_title: str -
+        topic_name: str -
+    Returns:
+
+    """
+
+    # Compose the message to be sent to pubsub
+    message = {
+        'text': text,
+        'doc_title': doc_title,
+    }
+
+    # Publish message to PubSub
+    # Note: the message_data needs to be in bytestring
+    # Refer to the documentation:
+    # https://googleapis.dev/python/pubsub/latest/publisher/api/client.html
+    message_data = json.dumps(message).encode('utf-8')
+    topic_path = publisher_client.topic_path(project_id, topic_name)
+
+    # Publish method returns a future instance
+    future = publisher_client.publish(topic_path, data=message_data)
+
+    # We need to call result method to extract the message ID
+    # Refer to the documentation:
+    # https://googleapis.dev/python/pubsub/latest/publisher/api/futures.html#google.cloud.pubsub_v1.publisher.futures.Future
+    message_id = future.result()
+
+    logging.info("Message id: {} was published in topic: {}".format(message_id, topic_name))
+
+
+def processPDFFile(file, context):
+    """
+    This function will be triggered when a pdf file is uploaded to the GCS bucket of interest.
+    Args:
+        file (dict): Metadata of the changed file, provided by the triggering
+                                 Cloud Storage event.
+        context (google.cloud.functions.Context): Metadata of triggering event.
+    Returns:
+        None; the output is written to stdout and Stackdriver Logging
+    """
+    start_time = time.time()
+    src_bucket = file.get('bucket')
+    dest_bucket = 'covid19-repo-test'
+
+    prefix_and_doc_title = file.get('name')
+    doc_title = prefix_and_doc_title.split('/')[-1].split('.')[0]
+    print('name is: {}'.format(prefix_and_doc_title))
+
+    # Step 1: Call OCR helper function
+    gcs_source_path = 'gs://' + src_bucket + '/' + prefix_and_doc_title
+    print('source gcs path: {}'.format(gcs_source_path))
+    print('=============================')
+    json_gcs_dest_path = 'gs://' + dest_bucket + '/json/' + doc_title + '-'
+    print('destination json path: {}'.format(json_gcs_dest_path))
+    print('=============================')
+    documentOCR(vision_client, gcs_source_path, json_gcs_dest_path)
+    print("completed OCR!")
+    print('=============================')
+    # Step 2: Parse json file
+    text = readJsonResult(storage_client, dest_bucket, doc_title)
+    print("Completed json parsing!")
+    print('=============================')
+    # Step 3: Publish on pubsub
+    topic_name = RESULT_TOPIC
+    publishMsg(text, doc_title, topic_name)
+    print("Completed pubsub messaging!")
+    print('=============================')
+    # Step 4: Save on GCS
+    upload_dest_prefix = 'raw_txt/{}.txt'.format(doc_title)
+    uploadBlob(storage_client, dest_bucket, text, upload_dest_prefix)
+    print("Completed upload!")
+    print('=============================')
+    print('File {} processed.'.format(doc_title))
+    end_time = time.time() - start_time
+    logging.info("Completion of text_extract took: {} seconds".format(round(end_time,1)))

+ 178 - 0
scripts/utils/CF_translate.py

@@ -0,0 +1,178 @@
+import base64
+import json
+import os
+import re
+import time
+import logging
+
+from google.cloud import pubsub_v1, translate, storage
+
+
+def doTranslation(translate_client, project_id, text, src_lang="it", target_lang="en-US"):
+    """
+
+    Args:
+        text: str -
+        src_lang: str - default it
+        target_lang: str - default en
+
+    Returns:
+        translated_txt: txt - response from translate API
+    """
+    logging.info('Translating text into {}.'.format(target_lang))
+
+    parent = translate_client.location_path(project_id, location="global")
+
+    # Detail on supported types can be found here:
+    # https://cloud.google.com/translate/docs/supported-formats
+    translated_dict = translate_client.translate_text(parent=parent,
+                                                      contents=[text],
+                                                      mime_type="text/plain",
+                                                      source_language_code=src_lang,
+                                                      target_language_code=target_lang)
+
+    for translation in translated_dict.translations:
+        translated_txt = translation.translated_text
+    return translated_txt
+
+
+def publishMsg(publisher_client, project_id, text, doc_title, topic_name):
+    """
+    Publish message with text and doc_title.
+    Args:
+        text: str - Text contained in the document
+        doc_title: str -
+        topic_name: str -
+
+    Returns:
+
+    """
+
+    # Compose the message to be sent to pubsub
+    message = {
+        'text': text,
+        'doc_title': doc_title,
+    }
+
+    # Note: the message_data needs to be in bytestring
+    # Refer to the documentation:
+    # https://googleapis.dev/python/pubsub/latest/publisher/api/client.html
+    message_data = json.dumps(message).encode('utf-8')
+    topic_path = publisher_client.topic_path(project_id, topic_name)
+
+    # Publish method returns a future instance
+    future = publisher_client.publish(topic_path, data=message_data)
+
+    # We need to call result method to extract the message ID
+    # Refer to the documentation:
+    # https://googleapis.dev/python/pubsub/latest/publisher/api/futures.html#google.cloud.pubsub_v1.publisher.futures.Future
+    message_id = future.result()
+
+    logging.info("Message id: {} was published in topic: {}".format(message_id, topic_name))
+
+
+def uploadBlob(storage_client, bucket_name, txt_content, destination_blob_name):
+    """
+    Uploads a file to the bucket.
+    Args:
+        storage_client:
+        bucket_name:
+        txt_content: str - text
+        destination_blob_name: str - prefix
+
+    Returns:
+
+    """
+    destination_blob_name = destination_blob_name.split('gs://{}/'.format(bucket_name))[-1]
+    bucket_client = storage_client.bucket(bucket_name)
+    blob = bucket_client.blob(destination_blob_name)
+
+    blob.upload_from_string(txt_content)
+
+    logging.info("Text uploaded to {}".format(destination_blob_name))
+
+
+def cleanEngText(eng_raw_string, customize_stop_words=[]):
+    """
+
+    Args:
+        eng_raw_string: str -
+        customize_stop_words: list - all stopwords to remove
+
+    Returns:
+        refined_doc: str - curated string of eng text
+    """
+
+    # Remove dates
+    # 1 or 2 digit number followed by back slash followed by 1 or 2 digit number ...
+    pattern_dates = '(\d{1,2})/(\d{1,2})/(\d{4})'
+    pattern_fig = 'Figure (\d{1,2})'
+    pattern_image = '^Image .$'
+    replace = ''
+
+    eng_raw_string = re.sub(pattern_dates, replace, eng_raw_string)
+    eng_raw_string = re.sub(pattern_fig, replace, eng_raw_string)
+    eng_raw_string = re.sub(pattern_image, replace, eng_raw_string)
+
+    # remove punctuation and special characters
+    eng_raw_string = re.sub("[^A-Za-z0-9]+", ' ', eng_raw_string)
+
+    # Remove custom stop words
+    tokens = [token for token in eng_raw_string.split() if token not in customize_stop_words]
+
+    refined_doc = ''
+    for word in tokens:
+        refined_doc += ' {}'.format(word)
+
+    return refined_doc
+
+
+def translateAndRefine(event, context):
+    """
+    This Cloud Function will be triggered when a message is published on the
+    PubSub topic of interest. It will call Translate API.
+    args:
+        event (dict): Metadata of the event, received from Pub/Sub.
+        context (google.cloud.functions.Context): Metadata of triggering event.
+    returns:
+        None; the output is written to stdout and Stackdriver Logging
+    """
+    # INSTANTIATION
+    translate_client = translate.TranslationServiceClient()
+    publisher_client = pubsub_v1.PublisherClient()
+    storage_client = storage.Client()
+
+    # SET VARIABLES
+    project_id = os.environ['GCP_PROJECT']
+    #RESULT_TOPIC = "it2eng"
+
+    start_time = time.time()
+    if event.get('data'):
+        message_data = base64.b64decode(event['data']).decode('utf-8')
+        message = json.loads(message_data)
+    else:
+        raise ValueError('Data sector is missing in the Pub/Sub message.')
+
+    it_text = message.get('text')
+    doc_title = message.get('doc_title')
+
+    # Step 1: Call Translate API
+    raw_eng_text = doTranslation(translate_client,project_id, it_text)
+
+    # Step 2: Clean eng text
+    curated_eng_text = cleanEngText(raw_eng_text)
+
+    # Step 3: Publish pub/sub
+    # topic_name = RESULT_TOPIC
+    # publishMsg(publisher_client, project_id, curated_eng_text, doc_title, topic_name)
+
+    # Step 4: Upload translated text
+    dest_bucket = 'covid19-repo-test'
+    prefix_raw_eng_txt = 'eng_txt/{}.txt'.format(doc_title)
+    uploadBlob(storage_client, dest_bucket, raw_eng_text, prefix_raw_eng_txt)
+
+    prefix_curated_eng_txt = 'curated_eng_txt/{}.txt'.format(doc_title)
+    uploadBlob(storage_client, dest_bucket, curated_eng_text, prefix_curated_eng_txt)
+
+    end_time = time.time() - start_time
+    logging.info("Completion of text_extract took: {} seconds".format(round(end_time, 1)))

+ 16 - 5
scripts/utils/bq_fcn.py

@@ -1,4 +1,5 @@
 from google.cloud import bigquery
+import os
 import logging
 
 
@@ -162,19 +163,29 @@ def populateBQ(bq_client, storage_client, bucket_name, dataset_name, table_name)
     except Exception as e:
         logging.error("An error occurred.", e)
 
-    gcs_source_prefix = 'raw_txt'
-    lst_blobs = storage_client.list_blobs(bucket_or_name=bucket_name,
+    src_bucket = os.environ['SRC_BUCKET']
+    dest_bucket = os.environ['DEST_BUCKET']
+    gcs_source_prefix = 'pdf'
+    lst_blobs = storage_client.list_blobs(bucket_or_name=src_bucket,
                                           prefix=gcs_source_prefix)
 
     for blob in lst_blobs:
         doc_title = blob.name.split('/')[-1].split('.txt')[0]
 
         # download as string
-        it_raw_blob = storage_client.get_bucket(bucket_name).get_blob('raw_txt/{}.txt'.format(doc_title))
+        it_raw_blob = storage_client.get_bucket(dest_bucket).get_blob('raw_txt/{}.txt'.format(doc_title))
 
         # set the GCS path
-        path_blob_eng_raw = 'eng_txt/{}/{}_raw_txt_{}_en_translations.txt'.format(doc_title, bucket_name, doc_title)
-        eng_raw_blob = storage_client.get_bucket(bucket_name).get_blob(path_blob_eng_raw)
+        try:
+            # Path in case using batch translation
+            path_blob_eng_raw = 'eng_txt/{}/{}_raw_txt_{}_en_translations.txt'.format(doc_title, dest_bucket, doc_title)
+            eng_raw_blob = storage_client.get_bucket(dest_bucket).get_blob(path_blob_eng_raw)
+            # If the file is not present, decoding a None Type will result in an error
+            eng_raw_txt = eng_raw_blob.download_as_string().decode('utf-8')
+        except:
+            # New path used for pdf update
+            path_blob_eng_raw = 'eng_txt/{}.txt'.format(doc_title)
+            eng_raw_blob = storage_client.get_bucket(dest_bucket).get_blob(path_blob_eng_raw)
 
         # Upload blob of interest
         curated_eng_blob = storage_client.get_bucket(bucket_name) \

+ 6 - 8
scripts/utils/ner_fcn.py

@@ -6,7 +6,7 @@ import re
 
 def importModel(model_name):
     """
-    Selective import of the required model from scispacy.
+    Selective import of the required model from scispacy. These models are quite heavy, hence this function.
     Args:
         model_name: str -
 
@@ -115,21 +115,19 @@ def getCases(datastore_client, filter_dict, limit=10):
     return results
 
 
-def populateDatastore(datastore_client, storage_client, bucket_name, model_name):
+def populateDatastore(datastore_client, storage_client, model_name, src_bucket='aketari-covid19-data-update'):
     """
     Extract UMLS entities and store them in a No-SQL db: Datastore.
     Args:
         datastore_client: Storage client instantiation -
         storage_client: Storage client instantiation -
-        bucket_name: str -
         model_name: str -
-
+        src_bucket: str - contains pdf of the newest files
     Returns:
         Queriable database
     """
-    curated_gcs_source_prefix = 'curated_eng_txt'
-    lst_curated_blobs = storage_client.list_blobs(bucket_or_name=bucket_name,
-                                                  prefix=curated_gcs_source_prefix)
+
+    lst_curated_blobs = storage_client.list_blobs(bucket_or_name=src_bucket)
 
     importModel(model_name)
 
@@ -143,7 +141,7 @@ def populateDatastore(datastore_client, storage_client, bucket_name, model_name)
         return False
 
     for blob in lst_curated_blobs:
-        doc_title = blob.name.split('/')[-1].split('.txt')[0]
+        doc_title = blob.name.split('/')[-1].split('.pdf')[0]
 
         # download as string
         eng_string = blob.download_as_string().decode('utf-8')

+ 2 - 2
scripts/utils/preprocessing_fcn.py

@@ -45,7 +45,7 @@ def async_detect_document(vision_client, gcs_source_uri, gcs_destination_uri, ba
     logging.info('Text extraction from document {} is completed.'.format(doc_title))
 
 
-def read_json_result(storage_client, bucket_name, doc_title):
+def readJsonResult(storage_client, bucket_name, doc_title):
     """
     Parsing the json files and extract text.
     Args:
@@ -79,7 +79,7 @@ def read_json_result(storage_client, bucket_name, doc_title):
     return all_text
 
 
-def upload_blob(storage_client, bucket_name, txt_content, destination_blob_name):
+def uploadBlob(storage_client, bucket_name, txt_content, destination_blob_name):
     """
     Uploads a file to the bucket.
     Args: