bq_fcn.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. from google.cloud import bigquery
  2. from google.oauth2 import service_account
  3. import logging
  4. import os
  5. # project_id = os.getenv('PROJECT_ID')
  6. # bucket_name = os.getenv('BUCKET_NAME')
  7. # location = os.getenv('LOCATION')
  8. # key_path = os.getenv('SA_KEY_PATH')
  9. #
  10. # credentials = service_account.Credentials.from_service_account_file(key_path)
  11. #
  12. # bq_client = bigquery.Client(credentials=credentials,
  13. # project_id=project_id)
  14. def bqCreateDataset(dataset_name):
  15. """
  16. Creates a dataset on Google Cloud Platform.
  17. Args:
  18. dataset_name: str - Name of the dataset
  19. Returns:
  20. dataset_id: str - Reference id for the dataset just created
  21. """
  22. dataset_ref = bq_client.dataset(dataset_name)
  23. try:
  24. dataset_id = bq_client.get_dataset(dataset_ref).dataset_id
  25. logging.warning('This dataset name: {} is already used.'.format(dataset_id))
  26. return dataset_id
  27. except:
  28. dataset = bigquery.Dataset(dataset_ref)
  29. dataset = bq_client.create_dataset(dataset)
  30. logging.info('Dataset {} created.'.format(dataset.dataset_id))
  31. return dataset.dataset_id
  32. def bqCreateTable(dataset_id, table_name):
  33. """
  34. Create main table with all cases and the medical text.
  35. Args:
  36. dataset_id: str - Reference id for the dataset to use
  37. table_name: str - Name of the table to create
  38. Returns:
  39. table_id: str - Reference id for the table just created
  40. """
  41. dataset_ref = bq_client.dataset(dataset_id)
  42. # Prepares a reference to the table
  43. table_ref = dataset_ref.table(table_name)
  44. try:
  45. return bq_client.get_table(table_ref).table_id
  46. except:
  47. schema = [
  48. bigquery.SchemaField('case', 'STRING', mode='REQUIRED'),
  49. bigquery.SchemaField('it_raw_txt', 'STRING', mode='REQUIRED'),
  50. bigquery.SchemaField('eng_raw_txt', 'STRING', mode='REQUIRED'),
  51. bigquery.SchemaField('eng_txt', 'STRING', mode='REQUIRED',
  52. description='Output of preprocessing pipeline.')]
  53. table = bigquery.Table(table_ref, schema=schema)
  54. table = bq_client.create_table(table)
  55. logging.info('table {} has been created.'.format(table.table_id))
  56. return table.table_id
  57. def exportItems2BQ(dataset_id, table_id, case, it_raw_blob, eng_raw_blob, curated_eng_blob):
  58. """
  59. Export text data to BigQuery.
  60. Args:
  61. dataset_id:
  62. table_id:
  63. case:
  64. it_raw_blob:
  65. eng_raw_blob:
  66. curated_eng_blob:
  67. Returns:
  68. """
  69. # Prepares a reference to the dataset
  70. dataset_ref = bq_client.dataset(dataset_id)
  71. table_ref = dataset_ref.table(table_id)
  72. table = bq_client.get_table(table_ref) # API call
  73. # Download text from GCS
  74. it_raw_txt_string = it_raw_blob.download_as_string().decode('utf-8')
  75. eng_raw_txt_string = eng_raw_blob.download_as_string().decode('utf-8')
  76. curated_eng_string = curated_eng_blob.download_as_string().decode('utf-8')
  77. rows_to_insert = [{'case': case,
  78. 'it_raw_txt': it_raw_txt_string,
  79. 'eng_raw_txt': eng_raw_txt_string,
  80. 'eng_txt': curated_eng_string
  81. }]
  82. errors = bq_client.insert_rows(table, rows_to_insert) # API request
  83. assert errors == []
  84. logging.info('{} was added to {} dataset, specifically in {} table.'.format(case,
  85. dataset_id,
  86. table_id))
  87. def returnQueryResults(bq_client, project_id, dataset_id, table_id, case_id):
  88. """
  89. Get results from a BigQuery query.
  90. Args:
  91. bq_client:
  92. project_id:
  93. dataset_id:
  94. table_id:
  95. case_id:
  96. Returns:
  97. """
  98. query = ('SELECT * FROM `{}.{}.{}` WHERE `case`="{}" LIMIT 1'.format(project_id, dataset_id, table_id, case_id))
  99. try:
  100. query_job = bq_client.query(query)
  101. is_exist = len(list(query_job.result())) >= 1
  102. logging.info('Query case id: {}'.format(case_id) if is_exist \
  103. else "Case id: {} does NOT exist".format(case_id))
  104. logging.info(list(query_job.result()))
  105. except Exception as e:
  106. logging.error("Error", e)