bq_fcn.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. from google.cloud import bigquery
  2. from google.oauth2 import service_account
  3. import logging
  4. import os
  5. project_id = os.getenv('PROJECT_ID')
  6. bucket_name = os.getenv('BUCKET_NAME')
  7. location = os.getenv('LOCATION')
  8. key_path = os.getenv('SA_KEY_PATH')
  9. credentials = service_account.Credentials.from_service_account_file(key_path)
  10. bq_client = bigquery.Client(credentials=credentials,
  11. project_id=project_id)
  12. def bqCreateDataset(dataset_name):
  13. """
  14. Creates a dataset on Google Cloud Platform.
  15. Args:
  16. dataset_name: str - Name of the dataset
  17. Returns:
  18. dataset_id: str - Reference id for the dataset just created
  19. """
  20. dataset_ref = bq_client.dataset(dataset_name)
  21. try:
  22. dataset_id = bq_client.get_dataset(dataset_ref).dataset_id
  23. logging.warning('This dataset name: {} is already used.'.format(dataset_id))
  24. return dataset_id
  25. except:
  26. dataset = bigquery.Dataset(dataset_ref)
  27. dataset = bq_client.create_dataset(dataset)
  28. logging.info('Dataset {} created.'.format(dataset.dataset_id))
  29. return dataset.dataset_id
  30. def bqCreateTable(dataset_id, table_name):
  31. """
  32. Create main table with all cases and the medical text.
  33. Args:
  34. dataset_id: str - Reference id for the dataset to use
  35. table_name: str - Name of the table to create
  36. Returns:
  37. table_id: str - Reference id for the table just created
  38. """
  39. dataset_ref = bq_client.dataset(dataset_id)
  40. # Prepares a reference to the table
  41. table_ref = dataset_ref.table(table_name)
  42. try:
  43. return bq_client.get_table(table_ref).table_id
  44. except:
  45. schema = [
  46. bigquery.SchemaField('case', 'STRING', mode='REQUIRED'),
  47. bigquery.SchemaField('it_raw_txt', 'STRING', mode='REQUIRED'),
  48. bigquery.SchemaField('eng_raw_txt', 'STRING', mode='REQUIRED'),
  49. bigquery.SchemaField('eng_txt', 'STRING', mode='REQUIRED',
  50. description='Output of preprocessing pipeline.')]
  51. table = bigquery.Table(table_ref, schema=schema)
  52. table = bq_client.create_table(table)
  53. logging.info('table {} has been created.'.format(table.table_id))
  54. return table.table_id
  55. def exportItems2BQ(dataset_id, table_id, case, it_raw_blob, eng_raw_blob, curated_eng_blob):
  56. """
  57. Export text data to BigQuery.
  58. Args:
  59. dataset_id:
  60. table_id:
  61. case:
  62. it_raw_blob:
  63. eng_raw_blob:
  64. curated_eng_blob:
  65. Returns:
  66. """
  67. # Prepares a reference to the dataset
  68. dataset_ref = bq_client.dataset(dataset_id)
  69. table_ref = dataset_ref.table(table_id)
  70. table = bq_client.get_table(table_ref) # API call
  71. # Download text from GCS
  72. it_raw_txt_string = it_raw_blob.download_as_string().decode('utf-8')
  73. eng_raw_txt_string = eng_raw_blob.download_as_string().decode('utf-8')
  74. curated_eng_string = curated_eng_blob.download_as_string().decode('utf-8')
  75. rows_to_insert = [{'case': case,
  76. 'it_raw_txt': it_raw_txt_string,
  77. 'eng_raw_txt': eng_raw_txt_string,
  78. 'eng_txt': curated_eng_string
  79. }]
  80. errors = bq_client.insert_rows(table, rows_to_insert) # API request
  81. assert errors == []
  82. logging.info('{} was added to {} dataset, specifically in {} table.'.format(case,
  83. dataset_id,
  84. table_id))
  85. def returnQueryResults(bq_client, project_id, dataset_id, table_id, case_id):
  86. """
  87. Get results from a BigQuery query.
  88. Args:
  89. bq_client:
  90. project_id:
  91. dataset_id:
  92. table_id:
  93. case_id:
  94. Returns:
  95. """
  96. query = ('SELECT * FROM `{}.{}.{}` WHERE `case`="{}" LIMIT 1'.format(project_id, dataset_id, table_id, case_id))
  97. try:
  98. query_job = bq_client.query(query)
  99. is_exist = len(list(query_job.result())) >= 1
  100. logging.info('Query case id: {}'.format(case_id) if is_exist \
  101. else "Case id: {} does NOT exist".format(case_id))
  102. print (list(query_job.result()))
  103. except Exception as e:
  104. logging.error("Error", e)