storing.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. from google.cloud import storage, bigquery, datastore
  2. from google.oauth2 import service_account
  3. from utils.bq_fcn import bqCreateDataset, bqCreateTable, exportItems2BQ
  4. from utils.ner_fcn import loadModel, addTask, extractMedEntities
  5. from scispacy.umls_linking import UmlsEntityLinker
  6. from scispacy.abbreviation import AbbreviationDetector
  7. import logging
  8. logging.getLogger().setLevel(logging.INFO)
  9. try:
  10. import en_core_sci_sm
  11. except:
  12. logging.warning("404: en_core_sci_sm NOT FOUND. Make sure the model was downloaded and installed.")
  13. try:
  14. import en_core_sci_lg
  15. except:
  16. logging.warning("404: en_core_sci_lg NOT FOUND. Make sure the model was downloaded and installed.")
  17. try:
  18. import en_ner_bionlp13cg_md
  19. except:
  20. logging.warning("404: en_ner_bionlp13cg_md NOT FOUND. Make sure the model was downloaded and installed.")
  21. import time
  22. import os
  23. import pandas as pd
  24. project_id = os.getenv('PROJECT_ID')
  25. bucket_name = os.getenv('BUCKET_NAME')
  26. location = os.getenv('LOCATION')
  27. key_path = os.getenv('SA_KEY_PATH')
  28. dataset_name = os.getenv('BQ_DATASET_NAME')
  29. table_name = os.getenv('BQ_TABLE_NAME')
  30. credentials = service_account.Credentials.from_service_account_file(key_path)
  31. storage_client = storage.Client(credentials=credentials)
  32. datastore_client = datastore.Client(credentials=credentials)
  33. bq_client = bigquery.Client(credentials=credentials)
  34. gcs_source_prefix = 'raw_txt'
  35. lst_blobs = storage_client.list_blobs(bucket_or_name=bucket_name,
  36. prefix=gcs_source_prefix)
  37. start_time = time.time()
  38. try:
  39. dataset_id = bqCreateDataset(bq_client, dataset_name)
  40. logging.info("The following dataset {} was successfully created/retrieved.".format(dataset_name))
  41. except Exception as e:
  42. logging.error("An error occurred.", e)
  43. try:
  44. table_id = bqCreateTable(bq_client, dataset_id, table_name)
  45. logging.info("The following table {} was successfully created/retrieved.".format(table_name))
  46. except Exception as e:
  47. logging.error("An error occurred.", e)
  48. for blob in lst_blobs:
  49. doc_title = blob.name.split('/')[-1].split('.txt')[0]
  50. # download as string
  51. it_raw_blob = storage_client.get_bucket(bucket_name).get_blob('raw_txt/{}.txt'.format(doc_title))
  52. # set the GCS path
  53. path_blob_eng_raw = 'eng_txt/{}/{}_raw_txt_{}_en_translations.txt'.format(doc_title, bucket_name, doc_title)
  54. eng_raw_blob = storage_client.get_bucket(bucket_name).get_blob(path_blob_eng_raw)
  55. # Upload blob of interest
  56. curated_eng_blob = storage_client.get_bucket(bucket_name) \
  57. .get_blob('curated_eng_txt/{}.txt'.format(doc_title))
  58. # populate to BQ dataset
  59. exportItems2BQ(bq_client, dataset_id, table_id, doc_title, it_raw_blob, eng_raw_blob, curated_eng_blob)
  60. total_time = time.time() - start_time
  61. logging.info('The export to BigQuery was completed successfully and took {} minutes.'.format(round(total_time / 60, 1)))
  62. curated_gcs_source_prefix = 'curated_eng_txt'
  63. lst_curated_blobs = storage_client.list_blobs(bucket_or_name=bucket_name,
  64. prefix=curated_gcs_source_prefix)
  65. nlp = loadModel(model=en_core_sci_sm)
  66. start_time = time.time()
  67. for blob in lst_curated_blobs:
  68. doc_title = blob.name.split('/')[-1].split('.txt')[0]
  69. # download as string
  70. eng_string = blob.download_as_string().decode('utf-8')
  71. # convert to vector
  72. doc = nlp(eng_string)
  73. # Extract medical entities
  74. UMLS_tuis_entity = extractMedEntities(doc)
  75. # Generate dataframes
  76. entities = list(UMLS_tuis_entity.keys())
  77. TUIs = list(UMLS_tuis_entity.values())
  78. df_entities = pd.DataFrame(data={'entity': entities, 'TUIs': TUIs})
  79. df_reference_TUIs = pd.read_csv('./utils/UMLS_tuis.csv')
  80. df_annotated_text_entities = pd.merge(df_entities, df_reference_TUIs, how='inner', on=['TUIs'])
  81. # Upload entities to datastore
  82. entities_dict = {}
  83. for idx in range(df_annotated_text_entities.shape[0]):
  84. category = df_annotated_text_entities.iloc[idx].values[2]
  85. med_entity = df_annotated_text_entities.iloc[idx].values[0]
  86. # Append to list of entities if the key,value pair already exist
  87. try:
  88. entities_dict[category].append(med_entity)
  89. except:
  90. entities_dict[category] = []
  91. entities_dict[category].append(med_entity)
  92. # API call
  93. key = addTask(datastore_client, doc_title, entities_dict)
  94. logging.info('The upload of {} entities is done.'.format(doc_title))
  95. total_time = time.time() - start_time
  96. logging.info(
  97. "The export to Datastore was completed successfully and took {} minutes.".format(round(total_time / 60, 1)))