123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 |
- from google.cloud import storage, bigquery, datastore
- from google.oauth2 import service_account
- from utils.bq_fcn import bqCreateDataset, bqCreateTable, exportItems2BQ
- from utils.ner_fcn import loadModel, addTask, extractMedEntities
- import en_core_sci_lg
- import logging
- import time
- import os
- import pandas as pd
- project_id = os.getenv('PROJECT_ID')
- bucket_name = os.getenv('BUCKET_NAME')
- location = os.getenv('LOCATION')
- key_path = os.getenv('SA_KEY_PATH')
- dataset_name = os.getenv('BQ_DATASET_NAME')
- table_name = os.getenv('BQ_TABLE_NAME')
- credentials = service_account.Credentials.from_service_account_file(key_path)
- storage_client = storage.Client(credentials=credentials)
- datastore_client = datastore.Client(credentials=credentials)
- gcs_source_prefix = 'raw_txt'
- lst_blobs = storage_client.list_blobs(bucket_or_name=bucket_name,
- prefix=gcs_source_prefix)
- start_time = time.time()
- try:
- dataset_id = bqCreateDataset(dataset_name)
- logging.info("The following dataset {} was successfully created/retrieved.".format(dataset_name))
- except Exception as e:
- logging.error("An error occurred.", e)
- try:
- table_id = bqCreateTable(dataset_id, table_name)
- logging.info("The following table {} was successfully created/retrieved.".format(table_name))
- except Exception as e:
- logging.error("An error occurred.", e)
- for blob in lst_blobs:
- doc_title = blob.name.split('/')[-1].split('.txt')[0]
- # download as string
- it_raw_blob = storage_client.get_bucket(bucket_name).get_blob('raw_txt/{}.txt'.format(doc_title))
- # set the GCS path
- path_blob_eng_raw = 'eng_txt/{}/{}_raw_txt_{}_en_translations.txt'.format(doc_title,
- bucket_name,
- doc_title)
- eng_raw_blob = storage_client.get_bucket(bucket_name).get_blob(path_blob_eng_raw)
- # Upload blob of interest
- curated_eng_blob = storage_client.get_bucket(bucket_name) \
- .get_blob('curated_eng_txt/{}.txt'.format(doc_title))
- # populate to BQ dataset
- exportItems2BQ(dataset_id, table_id, doc_title, it_raw_blob, eng_raw_blob, curated_eng_blob)
- total_time = time.time() - start_time
- logging.info('The export to BigQuery was completed successfully and took {} minutes.'.format(round(total_time / 60, 1)))
- curated_gcs_source_prefix = 'curated_eng_txt'
- lst_curated_blobs = storage_client.list_blobs(bucket_or_name=bucket_name,
- prefix=curated_gcs_source_prefix)
- nlp = loadModel(model=en_core_sci_lg)
- start_time = time.time()
- for blob in lst_curated_blobs:
- doc_title = blob.name.split('/')[-1].split('.txt')[0]
- # download as string
- eng_string = blob.download_as_string().decode('utf-8')
- # convert to vector
- doc = nlp(eng_string)
- # Extract medical entities
- UMLS_tuis_entity = extractMedEntities(doc)
- # Generate dataframes
- entities = list(UMLS_tuis_entity.keys())
- TUIs = list(UMLS_tuis_entity.values())
- df_entities = pd.DataFrame(data={'entity': entities, 'TUIs': TUIs})
- df_reference_TUIs = pd.read_csv('~/data/UMLS_tuis.csv')
- df_annotated_text_entities = pd.merge(df_entities, df_reference_TUIs, how='inner', on=['TUIs'])
- # Upload entities to datastore
- entities_dict = {}
- for idx in range(df_annotated_text_entities.shape[0]):
- category = df_annotated_text_entities.iloc[idx].values[2]
- med_entity = df_annotated_text_entities.iloc[idx].values[0]
- # Append to list of entities if the key,value pair already exist
- try:
- entities_dict[category].append(med_entity)
- except:
- entities_dict[category] = []
- entities_dict[category].append(med_entity)
- # API call
- key = addTask(datastore_client, doc_title, entities_dict)
- logging.info('The upload of {} entities is done.'.format(doc_title))
- total_time = time.time() - start_time
- logging.info(
- "The export to Datastore was completed successfully and took {} minutes.".format(round(total_time / 60, 1)))
|